[ISSUE #6777] Support metric in controller (#6778)

* feat(controller): build metrics framework in controller module

1. build metrics framework in controller module

* feat(controller): record metrics about controller

1. record metrics about controller

* fix(controller): fix after review

1. fix after review

Closes https://github.com/apache/rocketmq/issues/6777

* fix(controller): fix after review

1. fix lack of dependency

Closes https://github.com/apache/rocketmq/issues/6777
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/metrics/BrokerMetricsManager.java b/broker/src/main/java/org/apache/rocketmq/broker/metrics/BrokerMetricsManager.java
index b9b3f76..04eb679 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/metrics/BrokerMetricsManager.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/metrics/BrokerMetricsManager.java
@@ -54,6 +54,7 @@
 import org.apache.rocketmq.common.constant.LoggerName;
 import org.apache.rocketmq.common.message.MessageConst;
 import org.apache.rocketmq.common.message.MessageDecoder;
+import org.apache.rocketmq.common.metrics.MetricsExporterType;
 import org.apache.rocketmq.common.metrics.NopLongCounter;
 import org.apache.rocketmq.common.metrics.NopLongHistogram;
 import org.apache.rocketmq.common.metrics.NopObservableLongGauge;
@@ -214,7 +215,7 @@
         if (brokerConfig == null) {
             return false;
         }
-        BrokerConfig.MetricsExporterType exporterType = brokerConfig.getMetricsExporterType();
+        MetricsExporterType exporterType = brokerConfig.getMetricsExporterType();
         if (!exporterType.isEnable()) {
             return false;
         }
@@ -231,8 +232,8 @@
     }
 
     private void init() {
-        BrokerConfig.MetricsExporterType metricsExporterType = brokerConfig.getMetricsExporterType();
-        if (metricsExporterType == BrokerConfig.MetricsExporterType.DISABLE) {
+        MetricsExporterType metricsExporterType = brokerConfig.getMetricsExporterType();
+        if (metricsExporterType == MetricsExporterType.DISABLE) {
             return;
         }
 
@@ -263,7 +264,7 @@
         SdkMeterProviderBuilder providerBuilder = SdkMeterProvider.builder()
             .setResource(Resource.empty());
 
-        if (metricsExporterType == BrokerConfig.MetricsExporterType.OTLP_GRPC) {
+        if (metricsExporterType == MetricsExporterType.OTLP_GRPC) {
             String endpoint = brokerConfig.getMetricsGrpcExporterTarget();
             if (!endpoint.startsWith("http")) {
                 endpoint = "https://" + endpoint;
@@ -303,7 +304,7 @@
             providerBuilder.registerMetricReader(periodicMetricReader);
         }
 
-        if (metricsExporterType == BrokerConfig.MetricsExporterType.PROM) {
+        if (metricsExporterType == MetricsExporterType.PROM) {
             String promExporterHost = brokerConfig.getMetricsPromExporterHost();
             if (StringUtils.isBlank(promExporterHost)) {
                 promExporterHost = brokerConfig.getBrokerIP1();
@@ -315,7 +316,7 @@
             providerBuilder.registerMetricReader(prometheusHttpServer);
         }
 
-        if (metricsExporterType == BrokerConfig.MetricsExporterType.LOG) {
+        if (metricsExporterType == MetricsExporterType.LOG) {
             SLF4JBridgeHandler.removeHandlersForRootLogger();
             SLF4JBridgeHandler.install();
             loggingMetricExporter = LoggingMetricExporter.create(brokerConfig.isMetricsInDelta() ? AggregationTemporality.DELTA : AggregationTemporality.CUMULATIVE);
@@ -533,16 +534,16 @@
     }
 
     public void shutdown() {
-        if (brokerConfig.getMetricsExporterType() == BrokerConfig.MetricsExporterType.OTLP_GRPC) {
+        if (brokerConfig.getMetricsExporterType() == MetricsExporterType.OTLP_GRPC) {
             periodicMetricReader.forceFlush();
             periodicMetricReader.shutdown();
             metricExporter.shutdown();
         }
-        if (brokerConfig.getMetricsExporterType() == BrokerConfig.MetricsExporterType.PROM) {
+        if (brokerConfig.getMetricsExporterType() == MetricsExporterType.PROM) {
             prometheusHttpServer.forceFlush();
             prometheusHttpServer.shutdown();
         }
-        if (brokerConfig.getMetricsExporterType() == BrokerConfig.MetricsExporterType.LOG) {
+        if (brokerConfig.getMetricsExporterType() == MetricsExporterType.LOG) {
             periodicMetricReader.forceFlush();
             periodicMetricReader.shutdown();
             loggingMetricExporter.shutdown();
diff --git a/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java b/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java
index 2ce63a1..e9fad05 100644
--- a/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java
+++ b/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java
@@ -20,6 +20,7 @@
 import org.apache.rocketmq.common.annotation.ImportantField;
 import org.apache.rocketmq.common.constant.PermName;
 import org.apache.rocketmq.common.message.MessageRequestMode;
+import org.apache.rocketmq.common.metrics.MetricsExporterType;
 import org.apache.rocketmq.common.topic.TopicValidator;
 import org.apache.rocketmq.common.utils.NetworkUtil;
 
@@ -345,40 +346,6 @@
 
     private boolean useStaticSubscription = false;
 
-    public enum MetricsExporterType {
-        DISABLE(0),
-        OTLP_GRPC(1),
-        PROM(2),
-        LOG(3);
-
-        private final int value;
-
-        MetricsExporterType(int value) {
-            this.value = value;
-        }
-
-        public int getValue() {
-            return value;
-        }
-
-        public static MetricsExporterType valueOf(int value) {
-            switch (value) {
-                case 1:
-                    return OTLP_GRPC;
-                case 2:
-                    return PROM;
-                case 3:
-                    return LOG;
-                default:
-                    return DISABLE;
-            }
-        }
-
-        public boolean isEnable() {
-            return this.value > 0;
-        }
-    }
-
     private MetricsExporterType metricsExporterType = MetricsExporterType.DISABLE;
 
     private String metricsGrpcExporterTarget = "";
diff --git a/common/src/main/java/org/apache/rocketmq/common/ControllerConfig.java b/common/src/main/java/org/apache/rocketmq/common/ControllerConfig.java
index b35198f..1e9c80b 100644
--- a/common/src/main/java/org/apache/rocketmq/common/ControllerConfig.java
+++ b/common/src/main/java/org/apache/rocketmq/common/ControllerConfig.java
@@ -17,6 +17,8 @@
 package org.apache.rocketmq.common;
 
 import java.io.File;
+import java.util.Arrays;
+import org.apache.rocketmq.common.metrics.MetricsExporterType;
 
 public class ControllerConfig {
 
@@ -65,6 +67,22 @@
      */
     private long scanInactiveMasterInterval = 5 * 1000;
 
+    private MetricsExporterType metricsExporterType = MetricsExporterType.DISABLE;
+
+    private String metricsGrpcExporterTarget = "";
+    private String metricsGrpcExporterHeader = "";
+    private long metricGrpcExporterTimeOutInMills = 3 * 1000;
+    private long metricGrpcExporterIntervalInMills = 60 * 1000;
+    private long metricLoggingExporterIntervalInMills = 10 * 1000;
+
+    private int metricsPromExporterPort = 5557;
+    private String metricsPromExporterHost = "";
+
+    // Label pairs in CSV. Each label follows pattern of Key:Value. eg: instance_id:xxx,uid:xxx
+    private String metricsLabel = "";
+
+    private boolean metricsInDelta = false;
+
     public String getRocketmqHome() {
         return rocketmqHome;
     }
@@ -176,4 +194,90 @@
     public void setScanInactiveMasterInterval(long scanInactiveMasterInterval) {
         this.scanInactiveMasterInterval = scanInactiveMasterInterval;
     }
+
+    public String getDLedgerAddress() {
+        return Arrays.stream(this.controllerDLegerPeers.split(";"))
+            .filter(x -> this.controllerDLegerSelfId.equals(x.split("-")[0]))
+            .map(x -> x.split("-")[1]).findFirst().get();
+    }
+
+    public MetricsExporterType getMetricsExporterType() {
+        return metricsExporterType;
+    }
+
+    public void setMetricsExporterType(MetricsExporterType metricsExporterType) {
+        this.metricsExporterType = metricsExporterType;
+    }
+
+    public String getMetricsGrpcExporterTarget() {
+        return metricsGrpcExporterTarget;
+    }
+
+    public void setMetricsGrpcExporterTarget(String metricsGrpcExporterTarget) {
+        this.metricsGrpcExporterTarget = metricsGrpcExporterTarget;
+    }
+
+    public String getMetricsGrpcExporterHeader() {
+        return metricsGrpcExporterHeader;
+    }
+
+    public void setMetricsGrpcExporterHeader(String metricsGrpcExporterHeader) {
+        this.metricsGrpcExporterHeader = metricsGrpcExporterHeader;
+    }
+
+    public long getMetricGrpcExporterTimeOutInMills() {
+        return metricGrpcExporterTimeOutInMills;
+    }
+
+    public void setMetricGrpcExporterTimeOutInMills(long metricGrpcExporterTimeOutInMills) {
+        this.metricGrpcExporterTimeOutInMills = metricGrpcExporterTimeOutInMills;
+    }
+
+    public long getMetricGrpcExporterIntervalInMills() {
+        return metricGrpcExporterIntervalInMills;
+    }
+
+    public void setMetricGrpcExporterIntervalInMills(long metricGrpcExporterIntervalInMills) {
+        this.metricGrpcExporterIntervalInMills = metricGrpcExporterIntervalInMills;
+    }
+
+    public long getMetricLoggingExporterIntervalInMills() {
+        return metricLoggingExporterIntervalInMills;
+    }
+
+    public void setMetricLoggingExporterIntervalInMills(long metricLoggingExporterIntervalInMills) {
+        this.metricLoggingExporterIntervalInMills = metricLoggingExporterIntervalInMills;
+    }
+
+    public int getMetricsPromExporterPort() {
+        return metricsPromExporterPort;
+    }
+
+    public void setMetricsPromExporterPort(int metricsPromExporterPort) {
+        this.metricsPromExporterPort = metricsPromExporterPort;
+    }
+
+    public String getMetricsPromExporterHost() {
+        return metricsPromExporterHost;
+    }
+
+    public void setMetricsPromExporterHost(String metricsPromExporterHost) {
+        this.metricsPromExporterHost = metricsPromExporterHost;
+    }
+
+    public String getMetricsLabel() {
+        return metricsLabel;
+    }
+
+    public void setMetricsLabel(String metricsLabel) {
+        this.metricsLabel = metricsLabel;
+    }
+
+    public boolean isMetricsInDelta() {
+        return metricsInDelta;
+    }
+
+    public void setMetricsInDelta(boolean metricsInDelta) {
+        this.metricsInDelta = metricsInDelta;
+    }
 }
diff --git a/common/src/main/java/org/apache/rocketmq/common/UtilAll.java b/common/src/main/java/org/apache/rocketmq/common/UtilAll.java
index 03d1624..d2b7c37 100644
--- a/common/src/main/java/org/apache/rocketmq/common/UtilAll.java
+++ b/common/src/main/java/org/apache/rocketmq/common/UtilAll.java
@@ -28,6 +28,7 @@
 import java.net.InetAddress;
 import java.net.NetworkInterface;
 import java.nio.ByteBuffer;
+import java.nio.file.Files;
 import java.security.AccessController;
 import java.security.PrivilegedAction;
 import java.text.NumberFormat;
@@ -765,4 +766,30 @@
             STORE_LOG.info(dirName + " mkdir " + (result ? "OK" : "Failed"));
         }
     }
+
+    public static long calculateFileSizeInPath(File path) {
+        long size = 0;
+        try {
+            if (!path.exists() || Files.isSymbolicLink(path.toPath())) {
+                return 0;
+            }
+            if (path.isFile()) {
+                return path.length();
+            }
+            if (path.isDirectory()) {
+                File[] files = path.listFiles();
+                if (files != null && files.length > 0) {
+                    for (File file : files) {
+                        long fileSize = calculateFileSizeInPath(file);
+                        if (fileSize == -1) return -1;
+                        size += fileSize;
+                    }
+                }
+            }
+        } catch (Exception e) {
+            log.error("calculate all file size in: {} error", path.getAbsolutePath(), e);
+            return -1;
+        }
+        return size;
+    }
 }
diff --git a/common/src/main/java/org/apache/rocketmq/common/metrics/MetricsExporterType.java b/common/src/main/java/org/apache/rocketmq/common/metrics/MetricsExporterType.java
new file mode 100644
index 0000000..5f065b4
--- /dev/null
+++ b/common/src/main/java/org/apache/rocketmq/common/metrics/MetricsExporterType.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.common.metrics;
+
+
+public enum MetricsExporterType {
+    DISABLE(0),
+    OTLP_GRPC(1),
+    PROM(2),
+    LOG(3);
+
+    private final int value;
+
+    MetricsExporterType(int value) {
+        this.value = value;
+    }
+
+    public int getValue() {
+        return value;
+    }
+
+    public static MetricsExporterType valueOf(int value) {
+        switch (value) {
+            case 1:
+                return OTLP_GRPC;
+            case 2:
+                return PROM;
+            case 3:
+                return LOG;
+            default:
+                return DISABLE;
+        }
+    }
+
+    public boolean isEnable() {
+        return this.value > 0;
+    }
+}
diff --git a/common/src/test/java/org/apache/rocketmq/common/UtilAllTest.java b/common/src/test/java/org/apache/rocketmq/common/UtilAllTest.java
index 6306d56..f568a65 100644
--- a/common/src/test/java/org/apache/rocketmq/common/UtilAllTest.java
+++ b/common/src/test/java/org/apache/rocketmq/common/UtilAllTest.java
@@ -17,6 +17,8 @@
 
 package org.apache.rocketmq.common;
 
+import java.io.File;
+import java.io.FileOutputStream;
 import java.net.InetAddress;
 import java.net.UnknownHostException;
 import java.nio.ByteBuffer;
@@ -29,6 +31,7 @@
 import static org.assertj.core.api.Assertions.assertThat;
 import static org.assertj.core.api.Assertions.within;
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
 
 public class UtilAllTest {
 
@@ -220,4 +223,62 @@
     public void testInvoke() throws Exception {
         UtilAll.invoke(new Object(), "noMethod");
     }
+
+    @Test
+    public void testCalculateFileSizeInPath() throws Exception {
+        /**
+         * testCalculateFileSizeInPath
+         *  - file_0
+         *  - dir_1
+         *      - file_1_0
+         *      - file_1_1
+         *      - dir_1_2
+         *          - file_1_2_0
+         *  - dir_2
+         */
+        String basePath = System.getProperty("java.io.tmpdir") + File.separator + "testCalculateFileSizeInPath";
+        File baseFile = new File(basePath);
+        // test empty path
+        assertEquals(0, UtilAll.calculateFileSizeInPath(baseFile));
+
+        // create baseDir
+        assertTrue(baseFile.mkdirs());
+
+        File file0 = new File(baseFile, "file_0");
+        assertTrue(file0.createNewFile());
+        writeFixedBytesToFile(file0, 1313);
+
+        assertEquals(1313, UtilAll.calculateFileSizeInPath(baseFile));
+
+        // build a file tree like above
+        File dir1 = new File(baseFile, "dir_1");
+        dir1.mkdirs();
+        File file10 = new File(dir1, "file_1_0");
+        File file11 = new File(dir1, "file_1_1");
+        File dir12 = new File(dir1, "dir_1_2");
+        dir12.mkdirs();
+        File file120 = new File(dir12, "file_1_2_0");
+        File dir2 = new File(baseFile, "dir_2");
+        dir2.mkdirs();
+
+        // write all file with 1313 bytes data
+        assertTrue(file10.createNewFile());
+        writeFixedBytesToFile(file10, 1313);
+        assertTrue(file11.createNewFile());
+        writeFixedBytesToFile(file11, 1313);
+        assertTrue(file120.createNewFile());
+        writeFixedBytesToFile(file120, 1313);
+
+        assertEquals(1313 * 4, UtilAll.calculateFileSizeInPath(baseFile));
+
+        // clear all file
+        baseFile.deleteOnExit();
+    }
+
+    private void writeFixedBytesToFile(File file, int size) throws Exception {
+        FileOutputStream outputStream = new FileOutputStream(file);
+        byte[] bytes = new byte[size];
+        outputStream.write(bytes, 0, size);
+        outputStream.close();
+    }
 }
diff --git a/controller/pom.xml b/controller/pom.xml
index 9e9ad28..f82ad2a 100644
--- a/controller/pom.xml
+++ b/controller/pom.xml
@@ -58,5 +58,9 @@
             <groupId>${project.groupId}</groupId>
             <artifactId>rocketmq-srvutil</artifactId>
         </dependency>
+        <dependency>
+            <groupId>org.slf4j</groupId>
+            <artifactId>jul-to-slf4j</artifactId>
+        </dependency>
     </dependencies>
 </project>
\ No newline at end of file
diff --git a/controller/src/main/java/org/apache/rocketmq/controller/BrokerHeartbeatManager.java b/controller/src/main/java/org/apache/rocketmq/controller/BrokerHeartbeatManager.java
index 71b274c..f38a03a 100644
--- a/controller/src/main/java/org/apache/rocketmq/controller/BrokerHeartbeatManager.java
+++ b/controller/src/main/java/org/apache/rocketmq/controller/BrokerHeartbeatManager.java
@@ -17,6 +17,7 @@
 package org.apache.rocketmq.controller;
 
 import io.netty.channel.Channel;
+import java.util.Map;
 import org.apache.rocketmq.controller.helper.BrokerLifecycleListener;
 import org.apache.rocketmq.controller.impl.heartbeat.BrokerLiveInfo;
 
@@ -63,4 +64,10 @@
      * Check whether broker active
      */
     boolean isBrokerActive(final String clusterName, final String brokerName, final Long brokerId);
+
+    /**
+     * Count the number of active brokers in each broker-set of each cluster
+     * @return active brokers count
+     */
+    Map<String/*cluster*/, Map<String/*broker-set*/, Integer/*active broker num*/>> getActiveBrokersNum();
 }
diff --git a/controller/src/main/java/org/apache/rocketmq/controller/ControllerManager.java b/controller/src/main/java/org/apache/rocketmq/controller/ControllerManager.java
index 4682651..7c91e70 100644
--- a/controller/src/main/java/org/apache/rocketmq/controller/ControllerManager.java
+++ b/controller/src/main/java/org/apache/rocketmq/controller/ControllerManager.java
@@ -39,6 +39,7 @@
 import org.apache.rocketmq.controller.elect.impl.DefaultElectPolicy;
 import org.apache.rocketmq.controller.impl.DLedgerController;
 import org.apache.rocketmq.controller.impl.heartbeat.DefaultBrokerHeartbeatManager;
+import org.apache.rocketmq.controller.metrics.ControllerMetricsManager;
 import org.apache.rocketmq.controller.processor.ControllerRequestProcessor;
 import org.apache.rocketmq.logging.org.slf4j.Logger;
 import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
@@ -75,6 +76,8 @@
 
     private NotifyService notifyService;
 
+    private ControllerMetricsManager controllerMetricsManager;
+
     public ControllerManager(ControllerConfig controllerConfig, NettyServerConfig nettyServerConfig,
         NettyClientConfig nettyClientConfig) {
         this.controllerConfig = controllerConfig;
@@ -120,6 +123,7 @@
         this.heartbeatManager.registerBrokerLifecycleListener(this::onBrokerInactive);
         this.controller.registerBrokerLifecycleListener(this::onBrokerInactive);
         registerProcessor();
+        this.controllerMetricsManager = ControllerMetricsManager.getInstance(this);
         return true;
     }
 
diff --git a/controller/src/main/java/org/apache/rocketmq/controller/impl/DLedgerController.java b/controller/src/main/java/org/apache/rocketmq/controller/impl/DLedgerController.java
index b6007fe..fa91f28 100644
--- a/controller/src/main/java/org/apache/rocketmq/controller/impl/DLedgerController.java
+++ b/controller/src/main/java/org/apache/rocketmq/controller/impl/DLedgerController.java
@@ -16,6 +16,7 @@
  */
 package org.apache.rocketmq.controller.impl;
 
+import com.google.common.base.Stopwatch;
 import io.openmessaging.storage.dledger.AppendFuture;
 import io.openmessaging.storage.dledger.DLedgerConfig;
 import io.openmessaging.storage.dledger.DLedgerLeaderElector;
@@ -24,6 +25,7 @@
 import io.openmessaging.storage.dledger.protocol.AppendEntryRequest;
 import io.openmessaging.storage.dledger.protocol.AppendEntryResponse;
 import io.openmessaging.storage.dledger.protocol.BatchAppendEntryRequest;
+import io.opentelemetry.api.common.AttributesBuilder;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
@@ -35,6 +37,7 @@
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.function.Supplier;
 import org.apache.rocketmq.common.ControllerConfig;
@@ -50,6 +53,8 @@
 import org.apache.rocketmq.controller.impl.event.EventMessage;
 import org.apache.rocketmq.controller.impl.event.EventSerializer;
 import org.apache.rocketmq.controller.impl.manager.ReplicasInfoManager;
+import org.apache.rocketmq.controller.metrics.ControllerMetricsConstant;
+import org.apache.rocketmq.controller.metrics.ControllerMetricsManager;
 import org.apache.rocketmq.logging.org.slf4j.Logger;
 import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
 import org.apache.rocketmq.remoting.ChannelEventListener;
@@ -61,6 +66,7 @@
 import org.apache.rocketmq.remoting.protocol.ResponseCode;
 import org.apache.rocketmq.remoting.protocol.body.SyncStateSet;
 import org.apache.rocketmq.remoting.protocol.header.controller.AlterSyncStateSetRequestHeader;
+import org.apache.rocketmq.remoting.protocol.header.controller.ElectMasterResponseHeader;
 import org.apache.rocketmq.remoting.protocol.header.controller.admin.CleanControllerBrokerDataRequestHeader;
 import org.apache.rocketmq.remoting.protocol.header.controller.ElectMasterRequestHeader;
 import org.apache.rocketmq.remoting.protocol.header.controller.GetMetaDataResponseHeader;
@@ -69,6 +75,12 @@
 import org.apache.rocketmq.remoting.protocol.header.controller.register.GetNextBrokerIdRequestHeader;
 import org.apache.rocketmq.remoting.protocol.header.controller.register.RegisterBrokerToControllerRequestHeader;
 
+import static org.apache.rocketmq.controller.metrics.ControllerMetricsConstant.LABEL_BROKER_SET;
+import static org.apache.rocketmq.controller.metrics.ControllerMetricsConstant.LABEL_CLUSTER_NAME;
+import static org.apache.rocketmq.controller.metrics.ControllerMetricsConstant.LABEL_DLEDGER_OPERATION;
+import static org.apache.rocketmq.controller.metrics.ControllerMetricsConstant.LABEL_DLEDGER_OPERATION_STATUS;
+import static org.apache.rocketmq.controller.metrics.ControllerMetricsConstant.LABEL_ELECTION_RESULT;
+
 /**
  * The implementation of controller, based on DLedger (raft).
  */
@@ -174,7 +186,30 @@
     @Override
     public CompletableFuture<RemotingCommand> electMaster(final ElectMasterRequestHeader request) {
         return this.scheduler.appendEvent("electMaster",
-            () -> this.replicasInfoManager.electMaster(request, this.electPolicy), true);
+            () -> {
+                ControllerResult<ElectMasterResponseHeader> electResult = this.replicasInfoManager.electMaster(request, this.electPolicy);
+                AttributesBuilder attributesBuilder = ControllerMetricsManager.newAttributesBuilder()
+                    .put(LABEL_CLUSTER_NAME, request.getClusterName())
+                    .put(LABEL_BROKER_SET, request.getBrokerName());
+                switch (electResult.getResponseCode()) {
+                    case ResponseCode.SUCCESS:
+                        ControllerMetricsManager.electionTotal.add(1,
+                            attributesBuilder.put(LABEL_ELECTION_RESULT, ControllerMetricsConstant.ElectionResult.NEW_MASTER_ELECTED.getLowerCaseName()).build());
+                        break;
+                    case ResponseCode.CONTROLLER_MASTER_STILL_EXIST:
+                        ControllerMetricsManager.electionTotal.add(1,
+                            attributesBuilder.put(LABEL_ELECTION_RESULT, ControllerMetricsConstant.ElectionResult.KEEP_CURRENT_MASTER.getLowerCaseName()).build());
+                        break;
+                    case ResponseCode.CONTROLLER_MASTER_NOT_AVAILABLE:
+                    case ResponseCode.CONTROLLER_ELECT_MASTER_FAILED:
+                        ControllerMetricsManager.electionTotal.add(1,
+                            attributesBuilder.put(LABEL_ELECTION_RESULT, ControllerMetricsConstant.ElectionResult.NO_MASTER_ELECTED.getLowerCaseName()).build());
+                        break;
+                    default:
+                        break;
+                }
+                return electResult;
+            }, true);
     }
 
     @Override
@@ -258,14 +293,30 @@
         if (request != null) {
             request.setGroup(this.dLedgerConfig.getGroup());
             request.setRemoteId(this.dLedgerConfig.getSelfId());
+            Stopwatch stopwatch = Stopwatch.createStarted();
+            AttributesBuilder attributesBuilder = ControllerMetricsManager.newAttributesBuilder()
+                .put(LABEL_DLEDGER_OPERATION, ControllerMetricsConstant.DLedgerOperation.APPEND.getLowerCaseName());
             try {
                 final AppendFuture<AppendEntryResponse> dLedgerFuture = (AppendFuture<AppendEntryResponse>) dLedgerServer.handleAppend(request);
                 if (dLedgerFuture.getPos() == -1) {
+                    ControllerMetricsManager.dLedgerOpTotal.add(1,
+                        attributesBuilder.put(LABEL_DLEDGER_OPERATION_STATUS, ControllerMetricsConstant.DLedgerOperationStatus.FAILED.getLowerCaseName()).build());
                     return false;
                 }
                 dLedgerFuture.get(5, TimeUnit.SECONDS);
+                ControllerMetricsManager.dLedgerOpTotal.add(1,
+                    attributesBuilder.put(LABEL_DLEDGER_OPERATION_STATUS, ControllerMetricsConstant.DLedgerOperationStatus.SUCCESS.getLowerCaseName()).build());
+                ControllerMetricsManager.dLedgerOpLatency.record(stopwatch.elapsed(TimeUnit.MICROSECONDS),
+                    attributesBuilder.build());
             } catch (Exception e) {
                 log.error("Failed to append entry to DLedger", e);
+                if (e instanceof TimeoutException) {
+                    ControllerMetricsManager.dLedgerOpTotal.add(1,
+                        attributesBuilder.put(LABEL_DLEDGER_OPERATION_STATUS, ControllerMetricsConstant.DLedgerOperationStatus.TIMEOUT.getLowerCaseName()).build());
+                } else {
+                    ControllerMetricsManager.dLedgerOpTotal.add(1,
+                        attributesBuilder.put(LABEL_DLEDGER_OPERATION_STATUS, ControllerMetricsConstant.DLedgerOperationStatus.FAILED.getLowerCaseName()).build());
+                }
                 return false;
             }
             return true;
@@ -474,12 +525,14 @@
             Runnable runnable = () -> {
                 switch (role) {
                     case CANDIDATE:
+                        ControllerMetricsManager.recordRole(role, this.currentRole);
                         this.currentRole = MemberState.Role.CANDIDATE;
                         log.info("Controller {} change role to candidate", this.selfId);
                         DLedgerController.this.stopScheduling();
                         DLedgerController.this.cancelScanInactiveFuture();
                         break;
                     case FOLLOWER:
+                        ControllerMetricsManager.recordRole(role, this.currentRole);
                         this.currentRole = MemberState.Role.FOLLOWER;
                         log.info("Controller {} change role to Follower, leaderId:{}", this.selfId, getMemberState().getLeaderId());
                         DLedgerController.this.stopScheduling();
@@ -496,6 +549,7 @@
                             request.setBody(new byte[0]);
                             try {
                                 if (appendToDLedgerAndWait(request)) {
+                                    ControllerMetricsManager.recordRole(role, this.currentRole);
                                     this.currentRole = MemberState.Role.LEADER;
                                     DLedgerController.this.startScheduling();
                                     if (DLedgerController.this.scanInactiveMasterFuture == null) {
diff --git a/controller/src/main/java/org/apache/rocketmq/controller/impl/heartbeat/DefaultBrokerHeartbeatManager.java b/controller/src/main/java/org/apache/rocketmq/controller/impl/heartbeat/DefaultBrokerHeartbeatManager.java
index dc82428..2fbddb9 100644
--- a/controller/src/main/java/org/apache/rocketmq/controller/impl/heartbeat/DefaultBrokerHeartbeatManager.java
+++ b/controller/src/main/java/org/apache/rocketmq/controller/impl/heartbeat/DefaultBrokerHeartbeatManager.java
@@ -18,6 +18,7 @@
 
 import io.netty.channel.Channel;
 import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
@@ -106,7 +107,8 @@
 
     @Override
     public void onBrokerHeartbeat(String clusterName, String brokerName, String brokerAddr, Long brokerId,
-        Long timeoutMillis, Channel channel, Integer epoch, Long maxOffset, Long confirmOffset, Integer electionPriority) {
+        Long timeoutMillis, Channel channel, Integer epoch, Long maxOffset, Long confirmOffset,
+        Integer electionPriority) {
         BrokerIdentityInfo brokerIdentityInfo = new BrokerIdentityInfo(clusterName, brokerName, brokerId);
         BrokerLiveInfo prev = this.brokerLiveTable.get(brokerIdentityInfo);
         int realEpoch = Optional.ofNullable(epoch).orElse(-1);
@@ -173,4 +175,17 @@
         return false;
     }
 
+    @Override
+    public Map<String, Map<String, Integer>> getActiveBrokersNum() {
+        Map<String, Map<String, Integer>> map = new HashMap<>();
+        this.brokerLiveTable.keySet().stream()
+            .filter(brokerIdentity -> this.isBrokerActive(brokerIdentity.getClusterName(), brokerIdentity.getBrokerName(), brokerIdentity.getBrokerId()))
+            .forEach(id -> {
+                map.computeIfAbsent(id.getClusterName(), k -> new HashMap<>());
+                map.get(id.getClusterName()).compute(id.getBrokerName(), (broker, num) ->
+                    num == null ? 0 : num + 1
+                );
+            });
+        return map;
+    }
 }
diff --git a/controller/src/main/java/org/apache/rocketmq/controller/metrics/ControllerMetricsConstant.java b/controller/src/main/java/org/apache/rocketmq/controller/metrics/ControllerMetricsConstant.java
new file mode 100644
index 0000000..1efae43
--- /dev/null
+++ b/controller/src/main/java/org/apache/rocketmq/controller/metrics/ControllerMetricsConstant.java
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.controller.metrics;
+
+import org.apache.rocketmq.remoting.protocol.RequestCode;
+
+public class ControllerMetricsConstant {
+
+    public static final String LABEL_ADDRESS = "address";
+    public static final String LABEL_GROUP = "group";
+    public static final String LABEL_PEER_ID = "peer_id";
+    public static final String LABEL_AGGREGATION = "aggregation";
+    public static final String AGGREGATION_DELTA = "delta";
+
+    public static final String OPEN_TELEMETRY_METER_NAME = "controller";
+
+    public static final String GAUGE_ROLE = "role";
+
+    // unit: B
+    public static final String GAUGE_DLEDGER_DISK_USAGE = "dledger_disk_usage";
+
+    public static final String GAUGE_ACTIVE_BROKER_NUM = "active_broker_num";
+
+    public static final String COUNTER_REQUEST_TOTAL = "request_total";
+
+    public static final String COUNTER_DLEDGER_OP_TOTAL = "dledger_op_total";
+
+    public static final String COUNTER_ELECTION_TOTAL = "election_total";
+
+    // unit: us
+    public static final String HISTOGRAM_REQUEST_LATENCY = "request_latency";
+
+    // unit: us
+    public static final String HISTOGRAM_DLEDGER_OP_LATENCY = "dledger_op_latency";
+
+    public static final String LABEL_CLUSTER_NAME = "cluster";
+
+    public static final String LABEL_BROKER_SET = "broker_set";
+
+    public static final String LABEL_REQUEST_TYPE = "request_type";
+
+    public static final String LABEL_REQUEST_HANDLE_STATUS = "request_handle_status";
+
+    public static final String LABEL_DLEDGER_OPERATION = "dledger_operation";
+
+    public static final String LABEL_DLEDGER_OPERATION_STATUS = "dLedger_operation_status";
+
+    public static final String LABEL_ELECTION_RESULT = "election_result";
+
+
+    public enum RequestType {
+        CONTROLLER_ALTER_SYNC_STATE_SET(RequestCode.CONTROLLER_ALTER_SYNC_STATE_SET),
+
+        CONTROLLER_ELECT_MASTER(RequestCode.CONTROLLER_ELECT_MASTER),
+
+        CONTROLLER_REGISTER_BROKER(RequestCode.CONTROLLER_REGISTER_BROKER),
+
+        CONTROLLER_GET_REPLICA_INFO(RequestCode.CONTROLLER_GET_REPLICA_INFO),
+
+        CONTROLLER_GET_METADATA_INFO(RequestCode.CONTROLLER_GET_METADATA_INFO),
+
+        CONTROLLER_GET_SYNC_STATE_DATA(RequestCode.CONTROLLER_GET_SYNC_STATE_DATA),
+
+        CONTROLLER_GET_BROKER_EPOCH_CACHE(RequestCode.GET_BROKER_EPOCH_CACHE),
+
+        CONTROLLER_NOTIFY_BROKER_ROLE_CHANGED(RequestCode.NOTIFY_BROKER_ROLE_CHANGED),
+
+        CONTROLLER_BROKER_HEARTBEAT(RequestCode.BROKER_HEARTBEAT),
+
+        CONTROLLER_UPDATE_CONTROLLER_CONFIG(RequestCode.UPDATE_CONTROLLER_CONFIG),
+
+        CONTROLLER_GET_CONTROLLER_CONFIG(RequestCode.GET_CONTROLLER_CONFIG),
+
+        CONTROLLER_CLEAN_BROKER_DATA(RequestCode.CLEAN_BROKER_DATA),
+
+        CONTROLLER_GET_NEXT_BROKER_ID(RequestCode.CONTROLLER_GET_NEXT_BROKER_ID),
+
+        CONTROLLER_APPLY_BROKER_ID(RequestCode.CONTROLLER_APPLY_BROKER_ID);
+
+        private final int code;
+
+        RequestType(int code) {
+            this.code = code;
+        }
+
+        public static String getLowerCaseNameByCode(int code) {
+            for (RequestType requestType : RequestType.values()) {
+                if (requestType.code == code) {
+                    return requestType.name();
+                }
+            }
+            return null;
+        }
+    }
+
+    public enum RequestHandleStatus {
+        SUCCESS,
+        FAILED,
+        TIMEOUT;
+        public String getLowerCaseName() {
+            return this.name().toLowerCase();
+        }
+    }
+
+    public enum DLedgerOperation {
+        APPEND;
+
+        public String getLowerCaseName() {
+            return this.name().toLowerCase();
+        }
+    }
+
+    public enum DLedgerOperationStatus {
+        SUCCESS,
+        FAILED,
+        TIMEOUT;
+
+        public String getLowerCaseName() {
+            return this.name().toLowerCase();
+        }
+    }
+
+    public enum ElectionResult {
+        NEW_MASTER_ELECTED,
+        KEEP_CURRENT_MASTER,
+        NO_MASTER_ELECTED;
+
+        public String getLowerCaseName() {
+            return this.name().toLowerCase();
+        }
+    }
+
+}
diff --git a/controller/src/main/java/org/apache/rocketmq/controller/metrics/ControllerMetricsManager.java b/controller/src/main/java/org/apache/rocketmq/controller/metrics/ControllerMetricsManager.java
new file mode 100644
index 0000000..9b30a3b
--- /dev/null
+++ b/controller/src/main/java/org/apache/rocketmq/controller/metrics/ControllerMetricsManager.java
@@ -0,0 +1,383 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.controller.metrics;
+
+import com.google.common.base.Splitter;
+import io.openmessaging.storage.dledger.MemberState;
+import io.opentelemetry.api.common.Attributes;
+import io.opentelemetry.api.common.AttributesBuilder;
+import io.opentelemetry.api.metrics.LongCounter;
+import io.opentelemetry.api.metrics.LongHistogram;
+import io.opentelemetry.api.metrics.LongUpDownCounter;
+import io.opentelemetry.api.metrics.Meter;
+import io.opentelemetry.api.metrics.ObservableLongGauge;
+import io.opentelemetry.exporter.logging.LoggingMetricExporter;
+import io.opentelemetry.exporter.otlp.metrics.OtlpGrpcMetricExporter;
+import io.opentelemetry.exporter.otlp.metrics.OtlpGrpcMetricExporterBuilder;
+import io.opentelemetry.exporter.prometheus.PrometheusHttpServer;
+import io.opentelemetry.sdk.OpenTelemetrySdk;
+import io.opentelemetry.sdk.metrics.Aggregation;
+import io.opentelemetry.sdk.metrics.InstrumentSelector;
+import io.opentelemetry.sdk.metrics.InstrumentType;
+import io.opentelemetry.sdk.metrics.SdkMeterProvider;
+import io.opentelemetry.sdk.metrics.SdkMeterProviderBuilder;
+import io.opentelemetry.sdk.metrics.View;
+import io.opentelemetry.sdk.metrics.data.AggregationTemporality;
+import io.opentelemetry.sdk.metrics.export.PeriodicMetricReader;
+import io.opentelemetry.sdk.resources.Resource;
+import java.io.File;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.rocketmq.common.ControllerConfig;
+import org.apache.rocketmq.common.UtilAll;
+import org.apache.rocketmq.common.constant.LoggerName;
+import org.apache.rocketmq.common.metrics.MetricsExporterType;
+import org.apache.rocketmq.common.metrics.NopLongCounter;
+import org.apache.rocketmq.common.metrics.NopLongHistogram;
+import org.apache.rocketmq.common.metrics.NopLongUpDownCounter;
+import org.apache.rocketmq.common.metrics.NopObservableLongGauge;
+import org.apache.rocketmq.controller.ControllerManager;
+import org.apache.rocketmq.logging.org.slf4j.Logger;
+import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
+import org.slf4j.bridge.SLF4JBridgeHandler;
+
+import static org.apache.rocketmq.controller.metrics.ControllerMetricsConstant.AGGREGATION_DELTA;
+import static org.apache.rocketmq.controller.metrics.ControllerMetricsConstant.COUNTER_DLEDGER_OP_TOTAL;
+import static org.apache.rocketmq.controller.metrics.ControllerMetricsConstant.COUNTER_ELECTION_TOTAL;
+import static org.apache.rocketmq.controller.metrics.ControllerMetricsConstant.COUNTER_REQUEST_TOTAL;
+import static org.apache.rocketmq.controller.metrics.ControllerMetricsConstant.GAUGE_ACTIVE_BROKER_NUM;
+import static org.apache.rocketmq.controller.metrics.ControllerMetricsConstant.GAUGE_DLEDGER_DISK_USAGE;
+import static org.apache.rocketmq.controller.metrics.ControllerMetricsConstant.GAUGE_ROLE;
+import static org.apache.rocketmq.controller.metrics.ControllerMetricsConstant.HISTOGRAM_DLEDGER_OP_LATENCY;
+import static org.apache.rocketmq.controller.metrics.ControllerMetricsConstant.HISTOGRAM_REQUEST_LATENCY;
+import static org.apache.rocketmq.controller.metrics.ControllerMetricsConstant.LABEL_ADDRESS;
+import static org.apache.rocketmq.controller.metrics.ControllerMetricsConstant.LABEL_AGGREGATION;
+import static org.apache.rocketmq.controller.metrics.ControllerMetricsConstant.LABEL_BROKER_SET;
+import static org.apache.rocketmq.controller.metrics.ControllerMetricsConstant.LABEL_CLUSTER_NAME;
+import static org.apache.rocketmq.controller.metrics.ControllerMetricsConstant.LABEL_GROUP;
+import static org.apache.rocketmq.controller.metrics.ControllerMetricsConstant.LABEL_PEER_ID;
+import static org.apache.rocketmq.controller.metrics.ControllerMetricsConstant.OPEN_TELEMETRY_METER_NAME;
+
+public class ControllerMetricsManager {
+
+    private static final Logger logger = LoggerFactory.getLogger(LoggerName.CONTROLLER_LOGGER_NAME);
+
+    private static volatile ControllerMetricsManager instance;
+
+    private static final Map<String, String> LABEL_MAP = new HashMap<>();
+
+    // metrics about node status
+    public static LongUpDownCounter role = new NopLongUpDownCounter();
+
+    public static ObservableLongGauge dLedgerDiskUsage = new NopObservableLongGauge();
+
+    public static ObservableLongGauge activeBrokerNum = new NopObservableLongGauge();
+
+    public static LongCounter requestTotal = new NopLongCounter();
+
+    public static LongCounter dLedgerOpTotal = new NopLongCounter();
+
+    public static LongCounter electionTotal = new NopLongCounter();
+
+    // metrics about latency
+    public static LongHistogram requestLatency = new NopLongHistogram();
+
+    public static LongHistogram dLedgerOpLatency = new NopLongHistogram();
+
+    private static double us = 1d;
+
+    private static double ms = 1000 * us;
+
+    private static double s = 1000 * ms;
+
+    private final ControllerManager controllerManager;
+
+    private final ControllerConfig config;
+
+    private Meter controllerMeter;
+
+    private OtlpGrpcMetricExporter metricExporter;
+
+    private PeriodicMetricReader periodicMetricReader;
+
+    private PrometheusHttpServer prometheusHttpServer;
+
+    private LoggingMetricExporter loggingMetricExporter;
+
+    public static ControllerMetricsManager getInstance(ControllerManager controllerManager) {
+        if (instance == null) {
+            synchronized (ControllerMetricsManager.class) {
+                if (instance == null) {
+                    instance = new ControllerMetricsManager(controllerManager);
+                }
+            }
+        }
+        return instance;
+    }
+
+    public static AttributesBuilder newAttributesBuilder() {
+        AttributesBuilder builder = Attributes.builder();
+        LABEL_MAP.forEach(builder::put);
+        return builder;
+    }
+
+    public static void recordRole(MemberState.Role newRole, MemberState.Role oldRole) {
+        role.add(getRoleValue(newRole) - getRoleValue(oldRole),
+            newAttributesBuilder().build());
+    }
+
+    private static int getRoleValue(MemberState.Role role) {
+        switch (role) {
+            case UNKNOWN:
+                return 0;
+            case CANDIDATE:
+                return 1;
+            case FOLLOWER:
+                return 2;
+            case LEADER:
+                return 3;
+            default:
+                logger.error("Unknown role {}", role);
+                return 0;
+        }
+    }
+
+    private ControllerMetricsManager(ControllerManager controllerManager) {
+        this.controllerManager = controllerManager;
+        this.config = this.controllerManager.getControllerConfig();
+        this.LABEL_MAP.put(LABEL_ADDRESS, this.config.getDLedgerAddress());
+        this.LABEL_MAP.put(LABEL_GROUP, this.config.getControllerDLegerGroup());
+        this.LABEL_MAP.put(LABEL_PEER_ID, this.config.getControllerDLegerSelfId());
+        this.init();
+    }
+
+    private boolean checkConfig() {
+        if (config == null) {
+            return false;
+        }
+        MetricsExporterType exporterType = config.getMetricsExporterType();
+        if (!exporterType.isEnable()) {
+            return false;
+        }
+
+        switch (exporterType) {
+            case OTLP_GRPC:
+                return StringUtils.isNotBlank(config.getMetricsGrpcExporterTarget());
+            case PROM:
+                return true;
+            case LOG:
+                return true;
+        }
+        return false;
+    }
+
+    private void registerMetricsView(SdkMeterProviderBuilder providerBuilder) {
+        // define latency bucket
+        List<Double> latencyBuckets = Arrays.asList(
+            1 * us, 3 * us, 5 * us,
+            10 * us, 30 * us, 50 * us,
+            100 * us, 300 * us, 500 * us,
+            1 * ms, 3 * ms, 5 * ms,
+            10 * ms, 30 * ms, 50 * ms,
+            100 * ms, 300 * ms, 500 * ms,
+            1 * s, 3 * s, 5 * s,
+            10 * s
+        );
+
+        View latecyView = View.builder()
+            .setAggregation(Aggregation.explicitBucketHistogram(latencyBuckets))
+            .build();
+
+        InstrumentSelector requestLatencySelector = InstrumentSelector.builder()
+            .setType(InstrumentType.HISTOGRAM)
+            .setName(HISTOGRAM_REQUEST_LATENCY)
+            .build();
+
+        InstrumentSelector dLedgerOpLatencySelector = InstrumentSelector.builder()
+            .setType(InstrumentType.HISTOGRAM)
+            .setName(HISTOGRAM_DLEDGER_OP_LATENCY)
+            .build();
+
+        providerBuilder.registerView(requestLatencySelector, latecyView);
+        providerBuilder.registerView(dLedgerOpLatencySelector, latecyView);
+    }
+
+    private void initMetric(Meter meter) {
+        role = meter.upDownCounterBuilder(GAUGE_ROLE)
+            .setDescription("role of current node")
+            .build();
+
+        dLedgerDiskUsage = meter.gaugeBuilder(GAUGE_DLEDGER_DISK_USAGE)
+            .setDescription("disk usage of dledger")
+            .setUnit("bytes")
+            .ofLongs()
+            .buildWithCallback(measurement -> {
+                String path = config.getControllerStorePath();
+                if (!UtilAll.isPathExists(path)) {
+                    return;
+                }
+                File file = new File(path);
+                Long diskUsage = UtilAll.calculateFileSizeInPath(file);
+                if (diskUsage == -1) {
+                    logger.error("calculateFileSizeInPath error, path: {}", path);
+                    return;
+                }
+                measurement.record(diskUsage, newAttributesBuilder().build());
+            });
+
+        activeBrokerNum = meter.gaugeBuilder(GAUGE_ACTIVE_BROKER_NUM)
+            .setDescription("now active brokers num")
+            .ofLongs()
+            .buildWithCallback(measurement -> {
+                Map<String, Map<String, Integer>> activeBrokersNum = controllerManager.getHeartbeatManager().getActiveBrokersNum();
+                activeBrokersNum.forEach((cluster, brokerSetAndNum) -> {
+                    brokerSetAndNum.forEach((brokerSet, num) -> measurement.record(num,
+                        newAttributesBuilder().put(LABEL_CLUSTER_NAME, cluster).put(LABEL_BROKER_SET, brokerSet).build()));
+                });
+            });
+
+        requestTotal = meter.counterBuilder(COUNTER_REQUEST_TOTAL)
+            .setDescription("total request num")
+            .build();
+
+        dLedgerOpTotal = meter.counterBuilder(COUNTER_DLEDGER_OP_TOTAL)
+            .setDescription("total dledger operation num")
+            .build();
+
+        electionTotal = meter.counterBuilder(COUNTER_ELECTION_TOTAL)
+            .setDescription("total elect num")
+            .build();
+
+        requestLatency = meter.histogramBuilder(HISTOGRAM_REQUEST_LATENCY)
+            .setDescription("request latency")
+            .setUnit("us")
+            .ofLongs()
+            .build();
+
+        dLedgerOpLatency = meter.histogramBuilder(HISTOGRAM_DLEDGER_OP_LATENCY)
+            .setDescription("dledger operation latency")
+            .setUnit("us")
+            .ofLongs()
+            .build();
+
+    }
+
+    public void init() {
+        MetricsExporterType type = this.config.getMetricsExporterType();
+        if (type == MetricsExporterType.DISABLE) {
+            return;
+        }
+        if (!checkConfig()) {
+            logger.error("check metric config failed, will not export metrics");
+            return;
+        }
+
+        String labels = config.getMetricsLabel();
+        if (StringUtils.isNotBlank(labels)) {
+            List<String> labelList = Splitter.on(',').omitEmptyStrings().splitToList(labels);
+            for (String label : labelList) {
+                String[] pair = label.split(":");
+                if (pair.length != 2) {
+                    logger.warn("metrics label is not valid: {}", label);
+                    continue;
+                }
+                LABEL_MAP.put(pair[0], pair[1]);
+            }
+        }
+        if (config.isMetricsInDelta()) {
+            LABEL_MAP.put(LABEL_AGGREGATION, AGGREGATION_DELTA);
+        }
+
+        SdkMeterProviderBuilder providerBuilder = SdkMeterProvider.builder().setResource(Resource.empty());
+
+        if (type == MetricsExporterType.OTLP_GRPC) {
+            String endpoint = config.getMetricsGrpcExporterTarget();
+            if (!endpoint.startsWith("http")) {
+                endpoint = "https://" + endpoint;
+            }
+            OtlpGrpcMetricExporterBuilder metricExporterBuilder = OtlpGrpcMetricExporter.builder()
+                .setEndpoint(endpoint)
+                .setTimeout(config.getMetricGrpcExporterTimeOutInMills(), TimeUnit.MILLISECONDS)
+                .setAggregationTemporalitySelector(x -> {
+                    if (config.isMetricsInDelta() &&
+                        (x == InstrumentType.COUNTER || x == InstrumentType.OBSERVABLE_COUNTER || x == InstrumentType.HISTOGRAM)) {
+                        return AggregationTemporality.DELTA;
+                    }
+                    return AggregationTemporality.CUMULATIVE;
+                });
+
+            String headers = config.getMetricsGrpcExporterHeader();
+            if (StringUtils.isNotBlank(headers)) {
+                Map<String, String> headerMap = new HashMap<>();
+                List<String> headerList = Splitter.on(',').omitEmptyStrings().splitToList(headers);
+                for (String header : headerList) {
+                    String[] pair = header.split(":");
+                    if (pair.length != 2) {
+                        logger.warn("metricsGrpcExporterHeader is not valid: {}", headers);
+                        continue;
+                    }
+                    headerMap.put(pair[0], pair[1]);
+                }
+                headerMap.forEach(metricExporterBuilder::addHeader);
+            }
+
+            metricExporter = metricExporterBuilder.build();
+
+            periodicMetricReader = PeriodicMetricReader.builder(metricExporter)
+                .setInterval(config.getMetricGrpcExporterIntervalInMills(), TimeUnit.MILLISECONDS)
+                .build();
+
+            providerBuilder.registerMetricReader(periodicMetricReader);
+        }
+
+        if (type == MetricsExporterType.PROM) {
+            String promExporterHost = config.getMetricsPromExporterHost();
+            if (StringUtils.isBlank(promExporterHost)) {
+                promExporterHost = "0.0.0.0";
+            }
+            prometheusHttpServer = PrometheusHttpServer.builder()
+                .setHost(promExporterHost)
+                .setPort(config.getMetricsPromExporterPort())
+                .build();
+            providerBuilder.registerMetricReader(prometheusHttpServer);
+        }
+
+        if (type == MetricsExporterType.LOG) {
+            SLF4JBridgeHandler.removeHandlersForRootLogger();
+            SLF4JBridgeHandler.install();
+            loggingMetricExporter = LoggingMetricExporter.create(config.isMetricsInDelta() ? AggregationTemporality.DELTA : AggregationTemporality.CUMULATIVE);
+            java.util.logging.Logger.getLogger(LoggingMetricExporter.class.getName()).setLevel(java.util.logging.Level.FINEST);
+            periodicMetricReader = PeriodicMetricReader.builder(loggingMetricExporter)
+                .setInterval(config.getMetricLoggingExporterIntervalInMills(), TimeUnit.MILLISECONDS)
+                .build();
+            providerBuilder.registerMetricReader(periodicMetricReader);
+        }
+
+        registerMetricsView(providerBuilder);
+
+        controllerMeter = OpenTelemetrySdk.builder().setMeterProvider(providerBuilder.build())
+            .build().getMeter(OPEN_TELEMETRY_METER_NAME);
+
+        initMetric(controllerMeter);
+    }
+
+}
diff --git a/controller/src/main/java/org/apache/rocketmq/controller/processor/ControllerRequestProcessor.java b/controller/src/main/java/org/apache/rocketmq/controller/processor/ControllerRequestProcessor.java
index 6010f81..93ecbbd 100644
--- a/controller/src/main/java/org/apache/rocketmq/controller/processor/ControllerRequestProcessor.java
+++ b/controller/src/main/java/org/apache/rocketmq/controller/processor/ControllerRequestProcessor.java
@@ -16,17 +16,22 @@
  */
 package org.apache.rocketmq.controller.processor;
 
+import com.google.common.base.Stopwatch;
 import io.netty.channel.ChannelHandlerContext;
+import io.opentelemetry.api.common.Attributes;
 import java.io.UnsupportedEncodingException;
 import java.util.List;
 import java.util.Properties;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.TimeUnit;
 
+import java.util.concurrent.TimeoutException;
 import org.apache.rocketmq.common.MixAll;
 import org.apache.rocketmq.common.constant.LoggerName;
 import org.apache.rocketmq.controller.BrokerHeartbeatManager;
 import org.apache.rocketmq.controller.ControllerManager;
+import org.apache.rocketmq.controller.metrics.ControllerMetricsConstant;
+import org.apache.rocketmq.controller.metrics.ControllerMetricsManager;
 import org.apache.rocketmq.logging.org.slf4j.Logger;
 import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
 import org.apache.rocketmq.remoting.common.RemotingHelper;
@@ -45,6 +50,8 @@
 import org.apache.rocketmq.remoting.protocol.header.controller.ElectMasterRequestHeader;
 import org.apache.rocketmq.remoting.protocol.header.controller.GetReplicaInfoRequestHeader;
 
+import static org.apache.rocketmq.controller.metrics.ControllerMetricsConstant.LABEL_REQUEST_HANDLE_STATUS;
+import static org.apache.rocketmq.controller.metrics.ControllerMetricsConstant.LABEL_REQUEST_TYPE;
 import static org.apache.rocketmq.remoting.protocol.RequestCode.CONTROLLER_APPLY_BROKER_ID;
 import static org.apache.rocketmq.remoting.protocol.RequestCode.BROKER_HEARTBEAT;
 import static org.apache.rocketmq.remoting.protocol.RequestCode.CLEAN_BROKER_DATA;
@@ -80,6 +87,39 @@
                 RemotingHelper.parseChannelRemoteAddr(ctx.channel()),
                 request);
         }
+        Stopwatch stopwatch = Stopwatch.createStarted();
+        try {
+            RemotingCommand resp = handleRequest(ctx, request);
+            Attributes attributes = ControllerMetricsManager.newAttributesBuilder()
+                .put(LABEL_REQUEST_TYPE, ControllerMetricsConstant.RequestType.getLowerCaseNameByCode(request.getCode()))
+                .put(LABEL_REQUEST_HANDLE_STATUS, ControllerMetricsConstant.RequestHandleStatus.SUCCESS.getLowerCaseName())
+                .build();
+            ControllerMetricsManager.requestTotal.add(1, attributes);
+            attributes = ControllerMetricsManager.newAttributesBuilder()
+                .put(LABEL_REQUEST_TYPE, ControllerMetricsConstant.RequestType.getLowerCaseNameByCode(request.getCode()))
+                .build();
+            ControllerMetricsManager.requestLatency.record(stopwatch.elapsed(TimeUnit.MICROSECONDS), attributes);
+            return resp;
+        } catch (Exception e) {
+            log.error("process request: {} error, ", request, e);
+            Attributes attributes;
+            if (e instanceof TimeoutException) {
+                attributes = ControllerMetricsManager.newAttributesBuilder()
+                    .put(LABEL_REQUEST_TYPE, ControllerMetricsConstant.RequestType.getLowerCaseNameByCode(request.getCode()))
+                    .put(LABEL_REQUEST_HANDLE_STATUS, ControllerMetricsConstant.RequestHandleStatus.TIMEOUT.getLowerCaseName())
+                    .build();
+            } else {
+                attributes = ControllerMetricsManager.newAttributesBuilder()
+                    .put(LABEL_REQUEST_TYPE, ControllerMetricsConstant.RequestType.getLowerCaseNameByCode(request.getCode()))
+                    .put(LABEL_REQUEST_HANDLE_STATUS, ControllerMetricsConstant.RequestHandleStatus.FAILED.getLowerCaseName())
+                    .build();
+            }
+            ControllerMetricsManager.requestTotal.add(1, attributes);
+            throw e;
+        }
+    }
+
+    private RemotingCommand handleRequest(ChannelHandlerContext ctx, RemotingCommand request) throws Exception {
         switch (request.getCode()) {
             case CONTROLLER_ALTER_SYNC_STATE_SET:
                 return this.handleAlterSyncStateSet(ctx, request);
diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/config/ProxyConfig.java b/proxy/src/main/java/org/apache/rocketmq/proxy/config/ProxyConfig.java
index 560cd89..4f57a70 100644
--- a/proxy/src/main/java/org/apache/rocketmq/proxy/config/ProxyConfig.java
+++ b/proxy/src/main/java/org/apache/rocketmq/proxy/config/ProxyConfig.java
@@ -28,9 +28,9 @@
 import java.util.concurrent.TimeUnit;
 import java.util.stream.Collectors;
 import org.apache.commons.lang3.StringUtils;
-import org.apache.rocketmq.common.BrokerConfig;
 import org.apache.rocketmq.common.MixAll;
 import org.apache.rocketmq.common.constant.LoggerName;
+import org.apache.rocketmq.common.metrics.MetricsExporterType;
 import org.apache.rocketmq.common.utils.NetworkUtil;
 import org.apache.rocketmq.logging.org.slf4j.Logger;
 import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
@@ -205,7 +205,7 @@
 
     private boolean traceOn = false;
 
-    private BrokerConfig.MetricsExporterType metricsExporterType = BrokerConfig.MetricsExporterType.DISABLE;
+    private MetricsExporterType metricsExporterType = MetricsExporterType.DISABLE;
 
     private String metricsGrpcExporterTarget = "";
     private String metricsGrpcExporterHeader = "";
@@ -1116,20 +1116,20 @@
         this.remotingAccessAddr = remotingAccessAddr;
     }
 
-    public BrokerConfig.MetricsExporterType getMetricsExporterType() {
+    public MetricsExporterType getMetricsExporterType() {
         return metricsExporterType;
     }
 
-    public void setMetricsExporterType(BrokerConfig.MetricsExporterType metricsExporterType) {
+    public void setMetricsExporterType(MetricsExporterType metricsExporterType) {
         this.metricsExporterType = metricsExporterType;
     }
 
     public void setMetricsExporterType(int metricsExporterType) {
-        this.metricsExporterType = BrokerConfig.MetricsExporterType.valueOf(metricsExporterType);
+        this.metricsExporterType = MetricsExporterType.valueOf(metricsExporterType);
     }
 
     public void setMetricsExporterType(String metricsExporterType) {
-        this.metricsExporterType = BrokerConfig.MetricsExporterType.valueOf(metricsExporterType);
+        this.metricsExporterType = MetricsExporterType.valueOf(metricsExporterType);
     }
 
     public String getMetricsGrpcExporterTarget() {
diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/metrics/ProxyMetricsManager.java b/proxy/src/main/java/org/apache/rocketmq/proxy/metrics/ProxyMetricsManager.java
index 8474a68..f505085 100644
--- a/proxy/src/main/java/org/apache/rocketmq/proxy/metrics/ProxyMetricsManager.java
+++ b/proxy/src/main/java/org/apache/rocketmq/proxy/metrics/ProxyMetricsManager.java
@@ -39,8 +39,8 @@
 import java.util.function.Supplier;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.rocketmq.broker.metrics.BrokerMetricsManager;
-import org.apache.rocketmq.common.BrokerConfig;
 import org.apache.rocketmq.common.constant.LoggerName;
+import org.apache.rocketmq.common.metrics.MetricsExporterType;
 import org.apache.rocketmq.common.utils.StartAndShutdown;
 import org.apache.rocketmq.proxy.config.ProxyConfig;
 import org.apache.rocketmq.logging.org.slf4j.Logger;
@@ -72,7 +72,7 @@
     public static ObservableLongGauge proxyUp = null;
 
     public static void initLocalMode(BrokerMetricsManager brokerMetricsManager, ProxyConfig proxyConfig) {
-        if (proxyConfig.getMetricsExporterType() == BrokerConfig.MetricsExporterType.DISABLE) {
+        if (proxyConfig.getMetricsExporterType() == MetricsExporterType.DISABLE) {
             return;
         }
         ProxyMetricsManager.proxyConfig = proxyConfig;
@@ -116,7 +116,7 @@
         if (proxyConfig == null) {
             return false;
         }
-        BrokerConfig.MetricsExporterType exporterType = proxyConfig.getMetricsExporterType();
+        MetricsExporterType exporterType = proxyConfig.getMetricsExporterType();
         if (!exporterType.isEnable()) {
             return false;
         }
@@ -134,8 +134,8 @@
 
     @Override
     public void start() throws Exception {
-        BrokerConfig.MetricsExporterType metricsExporterType = proxyConfig.getMetricsExporterType();
-        if (metricsExporterType == BrokerConfig.MetricsExporterType.DISABLE) {
+        MetricsExporterType metricsExporterType = proxyConfig.getMetricsExporterType();
+        if (metricsExporterType == MetricsExporterType.DISABLE) {
             return;
         }
         if (!checkConfig()) {
@@ -166,7 +166,7 @@
         SdkMeterProviderBuilder providerBuilder = SdkMeterProvider.builder()
             .setResource(Resource.empty());
 
-        if (metricsExporterType == BrokerConfig.MetricsExporterType.OTLP_GRPC) {
+        if (metricsExporterType == MetricsExporterType.OTLP_GRPC) {
             String endpoint = proxyConfig.getMetricsGrpcExporterTarget();
             if (!endpoint.startsWith("http")) {
                 endpoint = "https://" + endpoint;
@@ -206,7 +206,7 @@
             providerBuilder.registerMetricReader(periodicMetricReader);
         }
 
-        if (metricsExporterType == BrokerConfig.MetricsExporterType.PROM) {
+        if (metricsExporterType == MetricsExporterType.PROM) {
             String promExporterHost = proxyConfig.getMetricsPromExporterHost();
             if (StringUtils.isBlank(promExporterHost)) {
                 promExporterHost = "0.0.0.0";
@@ -218,7 +218,7 @@
             providerBuilder.registerMetricReader(prometheusHttpServer);
         }
 
-        if (metricsExporterType == BrokerConfig.MetricsExporterType.LOG) {
+        if (metricsExporterType == MetricsExporterType.LOG) {
             SLF4JBridgeHandler.removeHandlersForRootLogger();
             SLF4JBridgeHandler.install();
             loggingMetricExporter = LoggingMetricExporter.create(proxyConfig.isMetricsInDelta() ? AggregationTemporality.DELTA : AggregationTemporality.CUMULATIVE);
@@ -239,16 +239,16 @@
 
     @Override
     public void shutdown() throws Exception {
-        if (proxyConfig.getMetricsExporterType() == BrokerConfig.MetricsExporterType.OTLP_GRPC) {
+        if (proxyConfig.getMetricsExporterType() == MetricsExporterType.OTLP_GRPC) {
             periodicMetricReader.forceFlush();
             periodicMetricReader.shutdown();
             metricExporter.shutdown();
         }
-        if (proxyConfig.getMetricsExporterType() == BrokerConfig.MetricsExporterType.PROM) {
+        if (proxyConfig.getMetricsExporterType() == MetricsExporterType.PROM) {
             prometheusHttpServer.forceFlush();
             prometheusHttpServer.shutdown();
         }
-        if (proxyConfig.getMetricsExporterType() == BrokerConfig.MetricsExporterType.LOG) {
+        if (proxyConfig.getMetricsExporterType() == MetricsExporterType.LOG) {
             periodicMetricReader.forceFlush();
             periodicMetricReader.shutdown();
             loggingMetricExporter.shutdown();