[ISSUE #6777] Support metric in controller (#6778)
* feat(controller): build metrics framework in controller module
1. build metrics framework in controller module
* feat(controller): record metrics about controller
1. record metrics about controller
* fix(controller): fix after review
1. fix after review
Closes https://github.com/apache/rocketmq/issues/6777
* fix(controller): fix after review
1. fix lack of dependency
Closes https://github.com/apache/rocketmq/issues/6777
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/metrics/BrokerMetricsManager.java b/broker/src/main/java/org/apache/rocketmq/broker/metrics/BrokerMetricsManager.java
index b9b3f76..04eb679 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/metrics/BrokerMetricsManager.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/metrics/BrokerMetricsManager.java
@@ -54,6 +54,7 @@
import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.common.message.MessageConst;
import org.apache.rocketmq.common.message.MessageDecoder;
+import org.apache.rocketmq.common.metrics.MetricsExporterType;
import org.apache.rocketmq.common.metrics.NopLongCounter;
import org.apache.rocketmq.common.metrics.NopLongHistogram;
import org.apache.rocketmq.common.metrics.NopObservableLongGauge;
@@ -214,7 +215,7 @@
if (brokerConfig == null) {
return false;
}
- BrokerConfig.MetricsExporterType exporterType = brokerConfig.getMetricsExporterType();
+ MetricsExporterType exporterType = brokerConfig.getMetricsExporterType();
if (!exporterType.isEnable()) {
return false;
}
@@ -231,8 +232,8 @@
}
private void init() {
- BrokerConfig.MetricsExporterType metricsExporterType = brokerConfig.getMetricsExporterType();
- if (metricsExporterType == BrokerConfig.MetricsExporterType.DISABLE) {
+ MetricsExporterType metricsExporterType = brokerConfig.getMetricsExporterType();
+ if (metricsExporterType == MetricsExporterType.DISABLE) {
return;
}
@@ -263,7 +264,7 @@
SdkMeterProviderBuilder providerBuilder = SdkMeterProvider.builder()
.setResource(Resource.empty());
- if (metricsExporterType == BrokerConfig.MetricsExporterType.OTLP_GRPC) {
+ if (metricsExporterType == MetricsExporterType.OTLP_GRPC) {
String endpoint = brokerConfig.getMetricsGrpcExporterTarget();
if (!endpoint.startsWith("http")) {
endpoint = "https://" + endpoint;
@@ -303,7 +304,7 @@
providerBuilder.registerMetricReader(periodicMetricReader);
}
- if (metricsExporterType == BrokerConfig.MetricsExporterType.PROM) {
+ if (metricsExporterType == MetricsExporterType.PROM) {
String promExporterHost = brokerConfig.getMetricsPromExporterHost();
if (StringUtils.isBlank(promExporterHost)) {
promExporterHost = brokerConfig.getBrokerIP1();
@@ -315,7 +316,7 @@
providerBuilder.registerMetricReader(prometheusHttpServer);
}
- if (metricsExporterType == BrokerConfig.MetricsExporterType.LOG) {
+ if (metricsExporterType == MetricsExporterType.LOG) {
SLF4JBridgeHandler.removeHandlersForRootLogger();
SLF4JBridgeHandler.install();
loggingMetricExporter = LoggingMetricExporter.create(brokerConfig.isMetricsInDelta() ? AggregationTemporality.DELTA : AggregationTemporality.CUMULATIVE);
@@ -533,16 +534,16 @@
}
public void shutdown() {
- if (brokerConfig.getMetricsExporterType() == BrokerConfig.MetricsExporterType.OTLP_GRPC) {
+ if (brokerConfig.getMetricsExporterType() == MetricsExporterType.OTLP_GRPC) {
periodicMetricReader.forceFlush();
periodicMetricReader.shutdown();
metricExporter.shutdown();
}
- if (brokerConfig.getMetricsExporterType() == BrokerConfig.MetricsExporterType.PROM) {
+ if (brokerConfig.getMetricsExporterType() == MetricsExporterType.PROM) {
prometheusHttpServer.forceFlush();
prometheusHttpServer.shutdown();
}
- if (brokerConfig.getMetricsExporterType() == BrokerConfig.MetricsExporterType.LOG) {
+ if (brokerConfig.getMetricsExporterType() == MetricsExporterType.LOG) {
periodicMetricReader.forceFlush();
periodicMetricReader.shutdown();
loggingMetricExporter.shutdown();
diff --git a/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java b/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java
index 2ce63a1..e9fad05 100644
--- a/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java
+++ b/common/src/main/java/org/apache/rocketmq/common/BrokerConfig.java
@@ -20,6 +20,7 @@
import org.apache.rocketmq.common.annotation.ImportantField;
import org.apache.rocketmq.common.constant.PermName;
import org.apache.rocketmq.common.message.MessageRequestMode;
+import org.apache.rocketmq.common.metrics.MetricsExporterType;
import org.apache.rocketmq.common.topic.TopicValidator;
import org.apache.rocketmq.common.utils.NetworkUtil;
@@ -345,40 +346,6 @@
private boolean useStaticSubscription = false;
- public enum MetricsExporterType {
- DISABLE(0),
- OTLP_GRPC(1),
- PROM(2),
- LOG(3);
-
- private final int value;
-
- MetricsExporterType(int value) {
- this.value = value;
- }
-
- public int getValue() {
- return value;
- }
-
- public static MetricsExporterType valueOf(int value) {
- switch (value) {
- case 1:
- return OTLP_GRPC;
- case 2:
- return PROM;
- case 3:
- return LOG;
- default:
- return DISABLE;
- }
- }
-
- public boolean isEnable() {
- return this.value > 0;
- }
- }
-
private MetricsExporterType metricsExporterType = MetricsExporterType.DISABLE;
private String metricsGrpcExporterTarget = "";
diff --git a/common/src/main/java/org/apache/rocketmq/common/ControllerConfig.java b/common/src/main/java/org/apache/rocketmq/common/ControllerConfig.java
index b35198f..1e9c80b 100644
--- a/common/src/main/java/org/apache/rocketmq/common/ControllerConfig.java
+++ b/common/src/main/java/org/apache/rocketmq/common/ControllerConfig.java
@@ -17,6 +17,8 @@
package org.apache.rocketmq.common;
import java.io.File;
+import java.util.Arrays;
+import org.apache.rocketmq.common.metrics.MetricsExporterType;
public class ControllerConfig {
@@ -65,6 +67,22 @@
*/
private long scanInactiveMasterInterval = 5 * 1000;
+ private MetricsExporterType metricsExporterType = MetricsExporterType.DISABLE;
+
+ private String metricsGrpcExporterTarget = "";
+ private String metricsGrpcExporterHeader = "";
+ private long metricGrpcExporterTimeOutInMills = 3 * 1000;
+ private long metricGrpcExporterIntervalInMills = 60 * 1000;
+ private long metricLoggingExporterIntervalInMills = 10 * 1000;
+
+ private int metricsPromExporterPort = 5557;
+ private String metricsPromExporterHost = "";
+
+ // Label pairs in CSV. Each label follows pattern of Key:Value. eg: instance_id:xxx,uid:xxx
+ private String metricsLabel = "";
+
+ private boolean metricsInDelta = false;
+
public String getRocketmqHome() {
return rocketmqHome;
}
@@ -176,4 +194,90 @@
public void setScanInactiveMasterInterval(long scanInactiveMasterInterval) {
this.scanInactiveMasterInterval = scanInactiveMasterInterval;
}
+
+ public String getDLedgerAddress() {
+ return Arrays.stream(this.controllerDLegerPeers.split(";"))
+ .filter(x -> this.controllerDLegerSelfId.equals(x.split("-")[0]))
+ .map(x -> x.split("-")[1]).findFirst().get();
+ }
+
+ public MetricsExporterType getMetricsExporterType() {
+ return metricsExporterType;
+ }
+
+ public void setMetricsExporterType(MetricsExporterType metricsExporterType) {
+ this.metricsExporterType = metricsExporterType;
+ }
+
+ public String getMetricsGrpcExporterTarget() {
+ return metricsGrpcExporterTarget;
+ }
+
+ public void setMetricsGrpcExporterTarget(String metricsGrpcExporterTarget) {
+ this.metricsGrpcExporterTarget = metricsGrpcExporterTarget;
+ }
+
+ public String getMetricsGrpcExporterHeader() {
+ return metricsGrpcExporterHeader;
+ }
+
+ public void setMetricsGrpcExporterHeader(String metricsGrpcExporterHeader) {
+ this.metricsGrpcExporterHeader = metricsGrpcExporterHeader;
+ }
+
+ public long getMetricGrpcExporterTimeOutInMills() {
+ return metricGrpcExporterTimeOutInMills;
+ }
+
+ public void setMetricGrpcExporterTimeOutInMills(long metricGrpcExporterTimeOutInMills) {
+ this.metricGrpcExporterTimeOutInMills = metricGrpcExporterTimeOutInMills;
+ }
+
+ public long getMetricGrpcExporterIntervalInMills() {
+ return metricGrpcExporterIntervalInMills;
+ }
+
+ public void setMetricGrpcExporterIntervalInMills(long metricGrpcExporterIntervalInMills) {
+ this.metricGrpcExporterIntervalInMills = metricGrpcExporterIntervalInMills;
+ }
+
+ public long getMetricLoggingExporterIntervalInMills() {
+ return metricLoggingExporterIntervalInMills;
+ }
+
+ public void setMetricLoggingExporterIntervalInMills(long metricLoggingExporterIntervalInMills) {
+ this.metricLoggingExporterIntervalInMills = metricLoggingExporterIntervalInMills;
+ }
+
+ public int getMetricsPromExporterPort() {
+ return metricsPromExporterPort;
+ }
+
+ public void setMetricsPromExporterPort(int metricsPromExporterPort) {
+ this.metricsPromExporterPort = metricsPromExporterPort;
+ }
+
+ public String getMetricsPromExporterHost() {
+ return metricsPromExporterHost;
+ }
+
+ public void setMetricsPromExporterHost(String metricsPromExporterHost) {
+ this.metricsPromExporterHost = metricsPromExporterHost;
+ }
+
+ public String getMetricsLabel() {
+ return metricsLabel;
+ }
+
+ public void setMetricsLabel(String metricsLabel) {
+ this.metricsLabel = metricsLabel;
+ }
+
+ public boolean isMetricsInDelta() {
+ return metricsInDelta;
+ }
+
+ public void setMetricsInDelta(boolean metricsInDelta) {
+ this.metricsInDelta = metricsInDelta;
+ }
}
diff --git a/common/src/main/java/org/apache/rocketmq/common/UtilAll.java b/common/src/main/java/org/apache/rocketmq/common/UtilAll.java
index 03d1624..d2b7c37 100644
--- a/common/src/main/java/org/apache/rocketmq/common/UtilAll.java
+++ b/common/src/main/java/org/apache/rocketmq/common/UtilAll.java
@@ -28,6 +28,7 @@
import java.net.InetAddress;
import java.net.NetworkInterface;
import java.nio.ByteBuffer;
+import java.nio.file.Files;
import java.security.AccessController;
import java.security.PrivilegedAction;
import java.text.NumberFormat;
@@ -765,4 +766,30 @@
STORE_LOG.info(dirName + " mkdir " + (result ? "OK" : "Failed"));
}
}
+
+ public static long calculateFileSizeInPath(File path) {
+ long size = 0;
+ try {
+ if (!path.exists() || Files.isSymbolicLink(path.toPath())) {
+ return 0;
+ }
+ if (path.isFile()) {
+ return path.length();
+ }
+ if (path.isDirectory()) {
+ File[] files = path.listFiles();
+ if (files != null && files.length > 0) {
+ for (File file : files) {
+ long fileSize = calculateFileSizeInPath(file);
+ if (fileSize == -1) return -1;
+ size += fileSize;
+ }
+ }
+ }
+ } catch (Exception e) {
+ log.error("calculate all file size in: {} error", path.getAbsolutePath(), e);
+ return -1;
+ }
+ return size;
+ }
}
diff --git a/common/src/main/java/org/apache/rocketmq/common/metrics/MetricsExporterType.java b/common/src/main/java/org/apache/rocketmq/common/metrics/MetricsExporterType.java
new file mode 100644
index 0000000..5f065b4
--- /dev/null
+++ b/common/src/main/java/org/apache/rocketmq/common/metrics/MetricsExporterType.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.common.metrics;
+
+
+public enum MetricsExporterType {
+ DISABLE(0),
+ OTLP_GRPC(1),
+ PROM(2),
+ LOG(3);
+
+ private final int value;
+
+ MetricsExporterType(int value) {
+ this.value = value;
+ }
+
+ public int getValue() {
+ return value;
+ }
+
+ public static MetricsExporterType valueOf(int value) {
+ switch (value) {
+ case 1:
+ return OTLP_GRPC;
+ case 2:
+ return PROM;
+ case 3:
+ return LOG;
+ default:
+ return DISABLE;
+ }
+ }
+
+ public boolean isEnable() {
+ return this.value > 0;
+ }
+}
diff --git a/common/src/test/java/org/apache/rocketmq/common/UtilAllTest.java b/common/src/test/java/org/apache/rocketmq/common/UtilAllTest.java
index 6306d56..f568a65 100644
--- a/common/src/test/java/org/apache/rocketmq/common/UtilAllTest.java
+++ b/common/src/test/java/org/apache/rocketmq/common/UtilAllTest.java
@@ -17,6 +17,8 @@
package org.apache.rocketmq.common;
+import java.io.File;
+import java.io.FileOutputStream;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.nio.ByteBuffer;
@@ -29,6 +31,7 @@
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.within;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
public class UtilAllTest {
@@ -220,4 +223,62 @@
public void testInvoke() throws Exception {
UtilAll.invoke(new Object(), "noMethod");
}
+
+ @Test
+ public void testCalculateFileSizeInPath() throws Exception {
+ /**
+ * testCalculateFileSizeInPath
+ * - file_0
+ * - dir_1
+ * - file_1_0
+ * - file_1_1
+ * - dir_1_2
+ * - file_1_2_0
+ * - dir_2
+ */
+ String basePath = System.getProperty("java.io.tmpdir") + File.separator + "testCalculateFileSizeInPath";
+ File baseFile = new File(basePath);
+ // test empty path
+ assertEquals(0, UtilAll.calculateFileSizeInPath(baseFile));
+
+ // create baseDir
+ assertTrue(baseFile.mkdirs());
+
+ File file0 = new File(baseFile, "file_0");
+ assertTrue(file0.createNewFile());
+ writeFixedBytesToFile(file0, 1313);
+
+ assertEquals(1313, UtilAll.calculateFileSizeInPath(baseFile));
+
+ // build a file tree like above
+ File dir1 = new File(baseFile, "dir_1");
+ dir1.mkdirs();
+ File file10 = new File(dir1, "file_1_0");
+ File file11 = new File(dir1, "file_1_1");
+ File dir12 = new File(dir1, "dir_1_2");
+ dir12.mkdirs();
+ File file120 = new File(dir12, "file_1_2_0");
+ File dir2 = new File(baseFile, "dir_2");
+ dir2.mkdirs();
+
+ // write all file with 1313 bytes data
+ assertTrue(file10.createNewFile());
+ writeFixedBytesToFile(file10, 1313);
+ assertTrue(file11.createNewFile());
+ writeFixedBytesToFile(file11, 1313);
+ assertTrue(file120.createNewFile());
+ writeFixedBytesToFile(file120, 1313);
+
+ assertEquals(1313 * 4, UtilAll.calculateFileSizeInPath(baseFile));
+
+ // clear all file
+ baseFile.deleteOnExit();
+ }
+
+ private void writeFixedBytesToFile(File file, int size) throws Exception {
+ FileOutputStream outputStream = new FileOutputStream(file);
+ byte[] bytes = new byte[size];
+ outputStream.write(bytes, 0, size);
+ outputStream.close();
+ }
}
diff --git a/controller/pom.xml b/controller/pom.xml
index 9e9ad28..f82ad2a 100644
--- a/controller/pom.xml
+++ b/controller/pom.xml
@@ -58,5 +58,9 @@
<groupId>${project.groupId}</groupId>
<artifactId>rocketmq-srvutil</artifactId>
</dependency>
+ <dependency>
+ <groupId>org.slf4j</groupId>
+ <artifactId>jul-to-slf4j</artifactId>
+ </dependency>
</dependencies>
</project>
\ No newline at end of file
diff --git a/controller/src/main/java/org/apache/rocketmq/controller/BrokerHeartbeatManager.java b/controller/src/main/java/org/apache/rocketmq/controller/BrokerHeartbeatManager.java
index 71b274c..f38a03a 100644
--- a/controller/src/main/java/org/apache/rocketmq/controller/BrokerHeartbeatManager.java
+++ b/controller/src/main/java/org/apache/rocketmq/controller/BrokerHeartbeatManager.java
@@ -17,6 +17,7 @@
package org.apache.rocketmq.controller;
import io.netty.channel.Channel;
+import java.util.Map;
import org.apache.rocketmq.controller.helper.BrokerLifecycleListener;
import org.apache.rocketmq.controller.impl.heartbeat.BrokerLiveInfo;
@@ -63,4 +64,10 @@
* Check whether broker active
*/
boolean isBrokerActive(final String clusterName, final String brokerName, final Long brokerId);
+
+ /**
+ * Count the number of active brokers in each broker-set of each cluster
+ * @return active brokers count
+ */
+ Map<String/*cluster*/, Map<String/*broker-set*/, Integer/*active broker num*/>> getActiveBrokersNum();
}
diff --git a/controller/src/main/java/org/apache/rocketmq/controller/ControllerManager.java b/controller/src/main/java/org/apache/rocketmq/controller/ControllerManager.java
index 4682651..7c91e70 100644
--- a/controller/src/main/java/org/apache/rocketmq/controller/ControllerManager.java
+++ b/controller/src/main/java/org/apache/rocketmq/controller/ControllerManager.java
@@ -39,6 +39,7 @@
import org.apache.rocketmq.controller.elect.impl.DefaultElectPolicy;
import org.apache.rocketmq.controller.impl.DLedgerController;
import org.apache.rocketmq.controller.impl.heartbeat.DefaultBrokerHeartbeatManager;
+import org.apache.rocketmq.controller.metrics.ControllerMetricsManager;
import org.apache.rocketmq.controller.processor.ControllerRequestProcessor;
import org.apache.rocketmq.logging.org.slf4j.Logger;
import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
@@ -75,6 +76,8 @@
private NotifyService notifyService;
+ private ControllerMetricsManager controllerMetricsManager;
+
public ControllerManager(ControllerConfig controllerConfig, NettyServerConfig nettyServerConfig,
NettyClientConfig nettyClientConfig) {
this.controllerConfig = controllerConfig;
@@ -120,6 +123,7 @@
this.heartbeatManager.registerBrokerLifecycleListener(this::onBrokerInactive);
this.controller.registerBrokerLifecycleListener(this::onBrokerInactive);
registerProcessor();
+ this.controllerMetricsManager = ControllerMetricsManager.getInstance(this);
return true;
}
diff --git a/controller/src/main/java/org/apache/rocketmq/controller/impl/DLedgerController.java b/controller/src/main/java/org/apache/rocketmq/controller/impl/DLedgerController.java
index b6007fe..fa91f28 100644
--- a/controller/src/main/java/org/apache/rocketmq/controller/impl/DLedgerController.java
+++ b/controller/src/main/java/org/apache/rocketmq/controller/impl/DLedgerController.java
@@ -16,6 +16,7 @@
*/
package org.apache.rocketmq.controller.impl;
+import com.google.common.base.Stopwatch;
import io.openmessaging.storage.dledger.AppendFuture;
import io.openmessaging.storage.dledger.DLedgerConfig;
import io.openmessaging.storage.dledger.DLedgerLeaderElector;
@@ -24,6 +25,7 @@
import io.openmessaging.storage.dledger.protocol.AppendEntryRequest;
import io.openmessaging.storage.dledger.protocol.AppendEntryResponse;
import io.openmessaging.storage.dledger.protocol.BatchAppendEntryRequest;
+import io.opentelemetry.api.common.AttributesBuilder;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
@@ -35,6 +37,7 @@
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Supplier;
import org.apache.rocketmq.common.ControllerConfig;
@@ -50,6 +53,8 @@
import org.apache.rocketmq.controller.impl.event.EventMessage;
import org.apache.rocketmq.controller.impl.event.EventSerializer;
import org.apache.rocketmq.controller.impl.manager.ReplicasInfoManager;
+import org.apache.rocketmq.controller.metrics.ControllerMetricsConstant;
+import org.apache.rocketmq.controller.metrics.ControllerMetricsManager;
import org.apache.rocketmq.logging.org.slf4j.Logger;
import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
import org.apache.rocketmq.remoting.ChannelEventListener;
@@ -61,6 +66,7 @@
import org.apache.rocketmq.remoting.protocol.ResponseCode;
import org.apache.rocketmq.remoting.protocol.body.SyncStateSet;
import org.apache.rocketmq.remoting.protocol.header.controller.AlterSyncStateSetRequestHeader;
+import org.apache.rocketmq.remoting.protocol.header.controller.ElectMasterResponseHeader;
import org.apache.rocketmq.remoting.protocol.header.controller.admin.CleanControllerBrokerDataRequestHeader;
import org.apache.rocketmq.remoting.protocol.header.controller.ElectMasterRequestHeader;
import org.apache.rocketmq.remoting.protocol.header.controller.GetMetaDataResponseHeader;
@@ -69,6 +75,12 @@
import org.apache.rocketmq.remoting.protocol.header.controller.register.GetNextBrokerIdRequestHeader;
import org.apache.rocketmq.remoting.protocol.header.controller.register.RegisterBrokerToControllerRequestHeader;
+import static org.apache.rocketmq.controller.metrics.ControllerMetricsConstant.LABEL_BROKER_SET;
+import static org.apache.rocketmq.controller.metrics.ControllerMetricsConstant.LABEL_CLUSTER_NAME;
+import static org.apache.rocketmq.controller.metrics.ControllerMetricsConstant.LABEL_DLEDGER_OPERATION;
+import static org.apache.rocketmq.controller.metrics.ControllerMetricsConstant.LABEL_DLEDGER_OPERATION_STATUS;
+import static org.apache.rocketmq.controller.metrics.ControllerMetricsConstant.LABEL_ELECTION_RESULT;
+
/**
* The implementation of controller, based on DLedger (raft).
*/
@@ -174,7 +186,30 @@
@Override
public CompletableFuture<RemotingCommand> electMaster(final ElectMasterRequestHeader request) {
return this.scheduler.appendEvent("electMaster",
- () -> this.replicasInfoManager.electMaster(request, this.electPolicy), true);
+ () -> {
+ ControllerResult<ElectMasterResponseHeader> electResult = this.replicasInfoManager.electMaster(request, this.electPolicy);
+ AttributesBuilder attributesBuilder = ControllerMetricsManager.newAttributesBuilder()
+ .put(LABEL_CLUSTER_NAME, request.getClusterName())
+ .put(LABEL_BROKER_SET, request.getBrokerName());
+ switch (electResult.getResponseCode()) {
+ case ResponseCode.SUCCESS:
+ ControllerMetricsManager.electionTotal.add(1,
+ attributesBuilder.put(LABEL_ELECTION_RESULT, ControllerMetricsConstant.ElectionResult.NEW_MASTER_ELECTED.getLowerCaseName()).build());
+ break;
+ case ResponseCode.CONTROLLER_MASTER_STILL_EXIST:
+ ControllerMetricsManager.electionTotal.add(1,
+ attributesBuilder.put(LABEL_ELECTION_RESULT, ControllerMetricsConstant.ElectionResult.KEEP_CURRENT_MASTER.getLowerCaseName()).build());
+ break;
+ case ResponseCode.CONTROLLER_MASTER_NOT_AVAILABLE:
+ case ResponseCode.CONTROLLER_ELECT_MASTER_FAILED:
+ ControllerMetricsManager.electionTotal.add(1,
+ attributesBuilder.put(LABEL_ELECTION_RESULT, ControllerMetricsConstant.ElectionResult.NO_MASTER_ELECTED.getLowerCaseName()).build());
+ break;
+ default:
+ break;
+ }
+ return electResult;
+ }, true);
}
@Override
@@ -258,14 +293,30 @@
if (request != null) {
request.setGroup(this.dLedgerConfig.getGroup());
request.setRemoteId(this.dLedgerConfig.getSelfId());
+ Stopwatch stopwatch = Stopwatch.createStarted();
+ AttributesBuilder attributesBuilder = ControllerMetricsManager.newAttributesBuilder()
+ .put(LABEL_DLEDGER_OPERATION, ControllerMetricsConstant.DLedgerOperation.APPEND.getLowerCaseName());
try {
final AppendFuture<AppendEntryResponse> dLedgerFuture = (AppendFuture<AppendEntryResponse>) dLedgerServer.handleAppend(request);
if (dLedgerFuture.getPos() == -1) {
+ ControllerMetricsManager.dLedgerOpTotal.add(1,
+ attributesBuilder.put(LABEL_DLEDGER_OPERATION_STATUS, ControllerMetricsConstant.DLedgerOperationStatus.FAILED.getLowerCaseName()).build());
return false;
}
dLedgerFuture.get(5, TimeUnit.SECONDS);
+ ControllerMetricsManager.dLedgerOpTotal.add(1,
+ attributesBuilder.put(LABEL_DLEDGER_OPERATION_STATUS, ControllerMetricsConstant.DLedgerOperationStatus.SUCCESS.getLowerCaseName()).build());
+ ControllerMetricsManager.dLedgerOpLatency.record(stopwatch.elapsed(TimeUnit.MICROSECONDS),
+ attributesBuilder.build());
} catch (Exception e) {
log.error("Failed to append entry to DLedger", e);
+ if (e instanceof TimeoutException) {
+ ControllerMetricsManager.dLedgerOpTotal.add(1,
+ attributesBuilder.put(LABEL_DLEDGER_OPERATION_STATUS, ControllerMetricsConstant.DLedgerOperationStatus.TIMEOUT.getLowerCaseName()).build());
+ } else {
+ ControllerMetricsManager.dLedgerOpTotal.add(1,
+ attributesBuilder.put(LABEL_DLEDGER_OPERATION_STATUS, ControllerMetricsConstant.DLedgerOperationStatus.FAILED.getLowerCaseName()).build());
+ }
return false;
}
return true;
@@ -474,12 +525,14 @@
Runnable runnable = () -> {
switch (role) {
case CANDIDATE:
+ ControllerMetricsManager.recordRole(role, this.currentRole);
this.currentRole = MemberState.Role.CANDIDATE;
log.info("Controller {} change role to candidate", this.selfId);
DLedgerController.this.stopScheduling();
DLedgerController.this.cancelScanInactiveFuture();
break;
case FOLLOWER:
+ ControllerMetricsManager.recordRole(role, this.currentRole);
this.currentRole = MemberState.Role.FOLLOWER;
log.info("Controller {} change role to Follower, leaderId:{}", this.selfId, getMemberState().getLeaderId());
DLedgerController.this.stopScheduling();
@@ -496,6 +549,7 @@
request.setBody(new byte[0]);
try {
if (appendToDLedgerAndWait(request)) {
+ ControllerMetricsManager.recordRole(role, this.currentRole);
this.currentRole = MemberState.Role.LEADER;
DLedgerController.this.startScheduling();
if (DLedgerController.this.scanInactiveMasterFuture == null) {
diff --git a/controller/src/main/java/org/apache/rocketmq/controller/impl/heartbeat/DefaultBrokerHeartbeatManager.java b/controller/src/main/java/org/apache/rocketmq/controller/impl/heartbeat/DefaultBrokerHeartbeatManager.java
index dc82428..2fbddb9 100644
--- a/controller/src/main/java/org/apache/rocketmq/controller/impl/heartbeat/DefaultBrokerHeartbeatManager.java
+++ b/controller/src/main/java/org/apache/rocketmq/controller/impl/heartbeat/DefaultBrokerHeartbeatManager.java
@@ -18,6 +18,7 @@
import io.netty.channel.Channel;
import java.util.ArrayList;
+import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
@@ -106,7 +107,8 @@
@Override
public void onBrokerHeartbeat(String clusterName, String brokerName, String brokerAddr, Long brokerId,
- Long timeoutMillis, Channel channel, Integer epoch, Long maxOffset, Long confirmOffset, Integer electionPriority) {
+ Long timeoutMillis, Channel channel, Integer epoch, Long maxOffset, Long confirmOffset,
+ Integer electionPriority) {
BrokerIdentityInfo brokerIdentityInfo = new BrokerIdentityInfo(clusterName, brokerName, brokerId);
BrokerLiveInfo prev = this.brokerLiveTable.get(brokerIdentityInfo);
int realEpoch = Optional.ofNullable(epoch).orElse(-1);
@@ -173,4 +175,17 @@
return false;
}
+ @Override
+ public Map<String, Map<String, Integer>> getActiveBrokersNum() {
+ Map<String, Map<String, Integer>> map = new HashMap<>();
+ this.brokerLiveTable.keySet().stream()
+ .filter(brokerIdentity -> this.isBrokerActive(brokerIdentity.getClusterName(), brokerIdentity.getBrokerName(), brokerIdentity.getBrokerId()))
+ .forEach(id -> {
+ map.computeIfAbsent(id.getClusterName(), k -> new HashMap<>());
+ map.get(id.getClusterName()).compute(id.getBrokerName(), (broker, num) ->
+ num == null ? 0 : num + 1
+ );
+ });
+ return map;
+ }
}
diff --git a/controller/src/main/java/org/apache/rocketmq/controller/metrics/ControllerMetricsConstant.java b/controller/src/main/java/org/apache/rocketmq/controller/metrics/ControllerMetricsConstant.java
new file mode 100644
index 0000000..1efae43
--- /dev/null
+++ b/controller/src/main/java/org/apache/rocketmq/controller/metrics/ControllerMetricsConstant.java
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.controller.metrics;
+
+import org.apache.rocketmq.remoting.protocol.RequestCode;
+
+public class ControllerMetricsConstant {
+
+ public static final String LABEL_ADDRESS = "address";
+ public static final String LABEL_GROUP = "group";
+ public static final String LABEL_PEER_ID = "peer_id";
+ public static final String LABEL_AGGREGATION = "aggregation";
+ public static final String AGGREGATION_DELTA = "delta";
+
+ public static final String OPEN_TELEMETRY_METER_NAME = "controller";
+
+ public static final String GAUGE_ROLE = "role";
+
+ // unit: B
+ public static final String GAUGE_DLEDGER_DISK_USAGE = "dledger_disk_usage";
+
+ public static final String GAUGE_ACTIVE_BROKER_NUM = "active_broker_num";
+
+ public static final String COUNTER_REQUEST_TOTAL = "request_total";
+
+ public static final String COUNTER_DLEDGER_OP_TOTAL = "dledger_op_total";
+
+ public static final String COUNTER_ELECTION_TOTAL = "election_total";
+
+ // unit: us
+ public static final String HISTOGRAM_REQUEST_LATENCY = "request_latency";
+
+ // unit: us
+ public static final String HISTOGRAM_DLEDGER_OP_LATENCY = "dledger_op_latency";
+
+ public static final String LABEL_CLUSTER_NAME = "cluster";
+
+ public static final String LABEL_BROKER_SET = "broker_set";
+
+ public static final String LABEL_REQUEST_TYPE = "request_type";
+
+ public static final String LABEL_REQUEST_HANDLE_STATUS = "request_handle_status";
+
+ public static final String LABEL_DLEDGER_OPERATION = "dledger_operation";
+
+ public static final String LABEL_DLEDGER_OPERATION_STATUS = "dLedger_operation_status";
+
+ public static final String LABEL_ELECTION_RESULT = "election_result";
+
+
+ public enum RequestType {
+ CONTROLLER_ALTER_SYNC_STATE_SET(RequestCode.CONTROLLER_ALTER_SYNC_STATE_SET),
+
+ CONTROLLER_ELECT_MASTER(RequestCode.CONTROLLER_ELECT_MASTER),
+
+ CONTROLLER_REGISTER_BROKER(RequestCode.CONTROLLER_REGISTER_BROKER),
+
+ CONTROLLER_GET_REPLICA_INFO(RequestCode.CONTROLLER_GET_REPLICA_INFO),
+
+ CONTROLLER_GET_METADATA_INFO(RequestCode.CONTROLLER_GET_METADATA_INFO),
+
+ CONTROLLER_GET_SYNC_STATE_DATA(RequestCode.CONTROLLER_GET_SYNC_STATE_DATA),
+
+ CONTROLLER_GET_BROKER_EPOCH_CACHE(RequestCode.GET_BROKER_EPOCH_CACHE),
+
+ CONTROLLER_NOTIFY_BROKER_ROLE_CHANGED(RequestCode.NOTIFY_BROKER_ROLE_CHANGED),
+
+ CONTROLLER_BROKER_HEARTBEAT(RequestCode.BROKER_HEARTBEAT),
+
+ CONTROLLER_UPDATE_CONTROLLER_CONFIG(RequestCode.UPDATE_CONTROLLER_CONFIG),
+
+ CONTROLLER_GET_CONTROLLER_CONFIG(RequestCode.GET_CONTROLLER_CONFIG),
+
+ CONTROLLER_CLEAN_BROKER_DATA(RequestCode.CLEAN_BROKER_DATA),
+
+ CONTROLLER_GET_NEXT_BROKER_ID(RequestCode.CONTROLLER_GET_NEXT_BROKER_ID),
+
+ CONTROLLER_APPLY_BROKER_ID(RequestCode.CONTROLLER_APPLY_BROKER_ID);
+
+ private final int code;
+
+ RequestType(int code) {
+ this.code = code;
+ }
+
+ public static String getLowerCaseNameByCode(int code) {
+ for (RequestType requestType : RequestType.values()) {
+ if (requestType.code == code) {
+ return requestType.name();
+ }
+ }
+ return null;
+ }
+ }
+
+ public enum RequestHandleStatus {
+ SUCCESS,
+ FAILED,
+ TIMEOUT;
+ public String getLowerCaseName() {
+ return this.name().toLowerCase();
+ }
+ }
+
+ public enum DLedgerOperation {
+ APPEND;
+
+ public String getLowerCaseName() {
+ return this.name().toLowerCase();
+ }
+ }
+
+ public enum DLedgerOperationStatus {
+ SUCCESS,
+ FAILED,
+ TIMEOUT;
+
+ public String getLowerCaseName() {
+ return this.name().toLowerCase();
+ }
+ }
+
+ public enum ElectionResult {
+ NEW_MASTER_ELECTED,
+ KEEP_CURRENT_MASTER,
+ NO_MASTER_ELECTED;
+
+ public String getLowerCaseName() {
+ return this.name().toLowerCase();
+ }
+ }
+
+}
diff --git a/controller/src/main/java/org/apache/rocketmq/controller/metrics/ControllerMetricsManager.java b/controller/src/main/java/org/apache/rocketmq/controller/metrics/ControllerMetricsManager.java
new file mode 100644
index 0000000..9b30a3b
--- /dev/null
+++ b/controller/src/main/java/org/apache/rocketmq/controller/metrics/ControllerMetricsManager.java
@@ -0,0 +1,383 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.controller.metrics;
+
+import com.google.common.base.Splitter;
+import io.openmessaging.storage.dledger.MemberState;
+import io.opentelemetry.api.common.Attributes;
+import io.opentelemetry.api.common.AttributesBuilder;
+import io.opentelemetry.api.metrics.LongCounter;
+import io.opentelemetry.api.metrics.LongHistogram;
+import io.opentelemetry.api.metrics.LongUpDownCounter;
+import io.opentelemetry.api.metrics.Meter;
+import io.opentelemetry.api.metrics.ObservableLongGauge;
+import io.opentelemetry.exporter.logging.LoggingMetricExporter;
+import io.opentelemetry.exporter.otlp.metrics.OtlpGrpcMetricExporter;
+import io.opentelemetry.exporter.otlp.metrics.OtlpGrpcMetricExporterBuilder;
+import io.opentelemetry.exporter.prometheus.PrometheusHttpServer;
+import io.opentelemetry.sdk.OpenTelemetrySdk;
+import io.opentelemetry.sdk.metrics.Aggregation;
+import io.opentelemetry.sdk.metrics.InstrumentSelector;
+import io.opentelemetry.sdk.metrics.InstrumentType;
+import io.opentelemetry.sdk.metrics.SdkMeterProvider;
+import io.opentelemetry.sdk.metrics.SdkMeterProviderBuilder;
+import io.opentelemetry.sdk.metrics.View;
+import io.opentelemetry.sdk.metrics.data.AggregationTemporality;
+import io.opentelemetry.sdk.metrics.export.PeriodicMetricReader;
+import io.opentelemetry.sdk.resources.Resource;
+import java.io.File;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.rocketmq.common.ControllerConfig;
+import org.apache.rocketmq.common.UtilAll;
+import org.apache.rocketmq.common.constant.LoggerName;
+import org.apache.rocketmq.common.metrics.MetricsExporterType;
+import org.apache.rocketmq.common.metrics.NopLongCounter;
+import org.apache.rocketmq.common.metrics.NopLongHistogram;
+import org.apache.rocketmq.common.metrics.NopLongUpDownCounter;
+import org.apache.rocketmq.common.metrics.NopObservableLongGauge;
+import org.apache.rocketmq.controller.ControllerManager;
+import org.apache.rocketmq.logging.org.slf4j.Logger;
+import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
+import org.slf4j.bridge.SLF4JBridgeHandler;
+
+import static org.apache.rocketmq.controller.metrics.ControllerMetricsConstant.AGGREGATION_DELTA;
+import static org.apache.rocketmq.controller.metrics.ControllerMetricsConstant.COUNTER_DLEDGER_OP_TOTAL;
+import static org.apache.rocketmq.controller.metrics.ControllerMetricsConstant.COUNTER_ELECTION_TOTAL;
+import static org.apache.rocketmq.controller.metrics.ControllerMetricsConstant.COUNTER_REQUEST_TOTAL;
+import static org.apache.rocketmq.controller.metrics.ControllerMetricsConstant.GAUGE_ACTIVE_BROKER_NUM;
+import static org.apache.rocketmq.controller.metrics.ControllerMetricsConstant.GAUGE_DLEDGER_DISK_USAGE;
+import static org.apache.rocketmq.controller.metrics.ControllerMetricsConstant.GAUGE_ROLE;
+import static org.apache.rocketmq.controller.metrics.ControllerMetricsConstant.HISTOGRAM_DLEDGER_OP_LATENCY;
+import static org.apache.rocketmq.controller.metrics.ControllerMetricsConstant.HISTOGRAM_REQUEST_LATENCY;
+import static org.apache.rocketmq.controller.metrics.ControllerMetricsConstant.LABEL_ADDRESS;
+import static org.apache.rocketmq.controller.metrics.ControllerMetricsConstant.LABEL_AGGREGATION;
+import static org.apache.rocketmq.controller.metrics.ControllerMetricsConstant.LABEL_BROKER_SET;
+import static org.apache.rocketmq.controller.metrics.ControllerMetricsConstant.LABEL_CLUSTER_NAME;
+import static org.apache.rocketmq.controller.metrics.ControllerMetricsConstant.LABEL_GROUP;
+import static org.apache.rocketmq.controller.metrics.ControllerMetricsConstant.LABEL_PEER_ID;
+import static org.apache.rocketmq.controller.metrics.ControllerMetricsConstant.OPEN_TELEMETRY_METER_NAME;
+
+public class ControllerMetricsManager {
+
+ private static final Logger logger = LoggerFactory.getLogger(LoggerName.CONTROLLER_LOGGER_NAME);
+
+ private static volatile ControllerMetricsManager instance;
+
+ private static final Map<String, String> LABEL_MAP = new HashMap<>();
+
+ // metrics about node status
+ public static LongUpDownCounter role = new NopLongUpDownCounter();
+
+ public static ObservableLongGauge dLedgerDiskUsage = new NopObservableLongGauge();
+
+ public static ObservableLongGauge activeBrokerNum = new NopObservableLongGauge();
+
+ public static LongCounter requestTotal = new NopLongCounter();
+
+ public static LongCounter dLedgerOpTotal = new NopLongCounter();
+
+ public static LongCounter electionTotal = new NopLongCounter();
+
+ // metrics about latency
+ public static LongHistogram requestLatency = new NopLongHistogram();
+
+ public static LongHistogram dLedgerOpLatency = new NopLongHistogram();
+
+ private static double us = 1d;
+
+ private static double ms = 1000 * us;
+
+ private static double s = 1000 * ms;
+
+ private final ControllerManager controllerManager;
+
+ private final ControllerConfig config;
+
+ private Meter controllerMeter;
+
+ private OtlpGrpcMetricExporter metricExporter;
+
+ private PeriodicMetricReader periodicMetricReader;
+
+ private PrometheusHttpServer prometheusHttpServer;
+
+ private LoggingMetricExporter loggingMetricExporter;
+
+ public static ControllerMetricsManager getInstance(ControllerManager controllerManager) {
+ if (instance == null) {
+ synchronized (ControllerMetricsManager.class) {
+ if (instance == null) {
+ instance = new ControllerMetricsManager(controllerManager);
+ }
+ }
+ }
+ return instance;
+ }
+
+ public static AttributesBuilder newAttributesBuilder() {
+ AttributesBuilder builder = Attributes.builder();
+ LABEL_MAP.forEach(builder::put);
+ return builder;
+ }
+
+ public static void recordRole(MemberState.Role newRole, MemberState.Role oldRole) {
+ role.add(getRoleValue(newRole) - getRoleValue(oldRole),
+ newAttributesBuilder().build());
+ }
+
+ private static int getRoleValue(MemberState.Role role) {
+ switch (role) {
+ case UNKNOWN:
+ return 0;
+ case CANDIDATE:
+ return 1;
+ case FOLLOWER:
+ return 2;
+ case LEADER:
+ return 3;
+ default:
+ logger.error("Unknown role {}", role);
+ return 0;
+ }
+ }
+
+ private ControllerMetricsManager(ControllerManager controllerManager) {
+ this.controllerManager = controllerManager;
+ this.config = this.controllerManager.getControllerConfig();
+ this.LABEL_MAP.put(LABEL_ADDRESS, this.config.getDLedgerAddress());
+ this.LABEL_MAP.put(LABEL_GROUP, this.config.getControllerDLegerGroup());
+ this.LABEL_MAP.put(LABEL_PEER_ID, this.config.getControllerDLegerSelfId());
+ this.init();
+ }
+
+ private boolean checkConfig() {
+ if (config == null) {
+ return false;
+ }
+ MetricsExporterType exporterType = config.getMetricsExporterType();
+ if (!exporterType.isEnable()) {
+ return false;
+ }
+
+ switch (exporterType) {
+ case OTLP_GRPC:
+ return StringUtils.isNotBlank(config.getMetricsGrpcExporterTarget());
+ case PROM:
+ return true;
+ case LOG:
+ return true;
+ }
+ return false;
+ }
+
+ private void registerMetricsView(SdkMeterProviderBuilder providerBuilder) {
+ // define latency bucket
+ List<Double> latencyBuckets = Arrays.asList(
+ 1 * us, 3 * us, 5 * us,
+ 10 * us, 30 * us, 50 * us,
+ 100 * us, 300 * us, 500 * us,
+ 1 * ms, 3 * ms, 5 * ms,
+ 10 * ms, 30 * ms, 50 * ms,
+ 100 * ms, 300 * ms, 500 * ms,
+ 1 * s, 3 * s, 5 * s,
+ 10 * s
+ );
+
+ View latecyView = View.builder()
+ .setAggregation(Aggregation.explicitBucketHistogram(latencyBuckets))
+ .build();
+
+ InstrumentSelector requestLatencySelector = InstrumentSelector.builder()
+ .setType(InstrumentType.HISTOGRAM)
+ .setName(HISTOGRAM_REQUEST_LATENCY)
+ .build();
+
+ InstrumentSelector dLedgerOpLatencySelector = InstrumentSelector.builder()
+ .setType(InstrumentType.HISTOGRAM)
+ .setName(HISTOGRAM_DLEDGER_OP_LATENCY)
+ .build();
+
+ providerBuilder.registerView(requestLatencySelector, latecyView);
+ providerBuilder.registerView(dLedgerOpLatencySelector, latecyView);
+ }
+
+ private void initMetric(Meter meter) {
+ role = meter.upDownCounterBuilder(GAUGE_ROLE)
+ .setDescription("role of current node")
+ .build();
+
+ dLedgerDiskUsage = meter.gaugeBuilder(GAUGE_DLEDGER_DISK_USAGE)
+ .setDescription("disk usage of dledger")
+ .setUnit("bytes")
+ .ofLongs()
+ .buildWithCallback(measurement -> {
+ String path = config.getControllerStorePath();
+ if (!UtilAll.isPathExists(path)) {
+ return;
+ }
+ File file = new File(path);
+ Long diskUsage = UtilAll.calculateFileSizeInPath(file);
+ if (diskUsage == -1) {
+ logger.error("calculateFileSizeInPath error, path: {}", path);
+ return;
+ }
+ measurement.record(diskUsage, newAttributesBuilder().build());
+ });
+
+ activeBrokerNum = meter.gaugeBuilder(GAUGE_ACTIVE_BROKER_NUM)
+ .setDescription("now active brokers num")
+ .ofLongs()
+ .buildWithCallback(measurement -> {
+ Map<String, Map<String, Integer>> activeBrokersNum = controllerManager.getHeartbeatManager().getActiveBrokersNum();
+ activeBrokersNum.forEach((cluster, brokerSetAndNum) -> {
+ brokerSetAndNum.forEach((brokerSet, num) -> measurement.record(num,
+ newAttributesBuilder().put(LABEL_CLUSTER_NAME, cluster).put(LABEL_BROKER_SET, brokerSet).build()));
+ });
+ });
+
+ requestTotal = meter.counterBuilder(COUNTER_REQUEST_TOTAL)
+ .setDescription("total request num")
+ .build();
+
+ dLedgerOpTotal = meter.counterBuilder(COUNTER_DLEDGER_OP_TOTAL)
+ .setDescription("total dledger operation num")
+ .build();
+
+ electionTotal = meter.counterBuilder(COUNTER_ELECTION_TOTAL)
+ .setDescription("total elect num")
+ .build();
+
+ requestLatency = meter.histogramBuilder(HISTOGRAM_REQUEST_LATENCY)
+ .setDescription("request latency")
+ .setUnit("us")
+ .ofLongs()
+ .build();
+
+ dLedgerOpLatency = meter.histogramBuilder(HISTOGRAM_DLEDGER_OP_LATENCY)
+ .setDescription("dledger operation latency")
+ .setUnit("us")
+ .ofLongs()
+ .build();
+
+ }
+
+ public void init() {
+ MetricsExporterType type = this.config.getMetricsExporterType();
+ if (type == MetricsExporterType.DISABLE) {
+ return;
+ }
+ if (!checkConfig()) {
+ logger.error("check metric config failed, will not export metrics");
+ return;
+ }
+
+ String labels = config.getMetricsLabel();
+ if (StringUtils.isNotBlank(labels)) {
+ List<String> labelList = Splitter.on(',').omitEmptyStrings().splitToList(labels);
+ for (String label : labelList) {
+ String[] pair = label.split(":");
+ if (pair.length != 2) {
+ logger.warn("metrics label is not valid: {}", label);
+ continue;
+ }
+ LABEL_MAP.put(pair[0], pair[1]);
+ }
+ }
+ if (config.isMetricsInDelta()) {
+ LABEL_MAP.put(LABEL_AGGREGATION, AGGREGATION_DELTA);
+ }
+
+ SdkMeterProviderBuilder providerBuilder = SdkMeterProvider.builder().setResource(Resource.empty());
+
+ if (type == MetricsExporterType.OTLP_GRPC) {
+ String endpoint = config.getMetricsGrpcExporterTarget();
+ if (!endpoint.startsWith("http")) {
+ endpoint = "https://" + endpoint;
+ }
+ OtlpGrpcMetricExporterBuilder metricExporterBuilder = OtlpGrpcMetricExporter.builder()
+ .setEndpoint(endpoint)
+ .setTimeout(config.getMetricGrpcExporterTimeOutInMills(), TimeUnit.MILLISECONDS)
+ .setAggregationTemporalitySelector(x -> {
+ if (config.isMetricsInDelta() &&
+ (x == InstrumentType.COUNTER || x == InstrumentType.OBSERVABLE_COUNTER || x == InstrumentType.HISTOGRAM)) {
+ return AggregationTemporality.DELTA;
+ }
+ return AggregationTemporality.CUMULATIVE;
+ });
+
+ String headers = config.getMetricsGrpcExporterHeader();
+ if (StringUtils.isNotBlank(headers)) {
+ Map<String, String> headerMap = new HashMap<>();
+ List<String> headerList = Splitter.on(',').omitEmptyStrings().splitToList(headers);
+ for (String header : headerList) {
+ String[] pair = header.split(":");
+ if (pair.length != 2) {
+ logger.warn("metricsGrpcExporterHeader is not valid: {}", headers);
+ continue;
+ }
+ headerMap.put(pair[0], pair[1]);
+ }
+ headerMap.forEach(metricExporterBuilder::addHeader);
+ }
+
+ metricExporter = metricExporterBuilder.build();
+
+ periodicMetricReader = PeriodicMetricReader.builder(metricExporter)
+ .setInterval(config.getMetricGrpcExporterIntervalInMills(), TimeUnit.MILLISECONDS)
+ .build();
+
+ providerBuilder.registerMetricReader(periodicMetricReader);
+ }
+
+ if (type == MetricsExporterType.PROM) {
+ String promExporterHost = config.getMetricsPromExporterHost();
+ if (StringUtils.isBlank(promExporterHost)) {
+ promExporterHost = "0.0.0.0";
+ }
+ prometheusHttpServer = PrometheusHttpServer.builder()
+ .setHost(promExporterHost)
+ .setPort(config.getMetricsPromExporterPort())
+ .build();
+ providerBuilder.registerMetricReader(prometheusHttpServer);
+ }
+
+ if (type == MetricsExporterType.LOG) {
+ SLF4JBridgeHandler.removeHandlersForRootLogger();
+ SLF4JBridgeHandler.install();
+ loggingMetricExporter = LoggingMetricExporter.create(config.isMetricsInDelta() ? AggregationTemporality.DELTA : AggregationTemporality.CUMULATIVE);
+ java.util.logging.Logger.getLogger(LoggingMetricExporter.class.getName()).setLevel(java.util.logging.Level.FINEST);
+ periodicMetricReader = PeriodicMetricReader.builder(loggingMetricExporter)
+ .setInterval(config.getMetricLoggingExporterIntervalInMills(), TimeUnit.MILLISECONDS)
+ .build();
+ providerBuilder.registerMetricReader(periodicMetricReader);
+ }
+
+ registerMetricsView(providerBuilder);
+
+ controllerMeter = OpenTelemetrySdk.builder().setMeterProvider(providerBuilder.build())
+ .build().getMeter(OPEN_TELEMETRY_METER_NAME);
+
+ initMetric(controllerMeter);
+ }
+
+}
diff --git a/controller/src/main/java/org/apache/rocketmq/controller/processor/ControllerRequestProcessor.java b/controller/src/main/java/org/apache/rocketmq/controller/processor/ControllerRequestProcessor.java
index 6010f81..93ecbbd 100644
--- a/controller/src/main/java/org/apache/rocketmq/controller/processor/ControllerRequestProcessor.java
+++ b/controller/src/main/java/org/apache/rocketmq/controller/processor/ControllerRequestProcessor.java
@@ -16,17 +16,22 @@
*/
package org.apache.rocketmq.controller.processor;
+import com.google.common.base.Stopwatch;
import io.netty.channel.ChannelHandlerContext;
+import io.opentelemetry.api.common.Attributes;
import java.io.UnsupportedEncodingException;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.controller.BrokerHeartbeatManager;
import org.apache.rocketmq.controller.ControllerManager;
+import org.apache.rocketmq.controller.metrics.ControllerMetricsConstant;
+import org.apache.rocketmq.controller.metrics.ControllerMetricsManager;
import org.apache.rocketmq.logging.org.slf4j.Logger;
import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
import org.apache.rocketmq.remoting.common.RemotingHelper;
@@ -45,6 +50,8 @@
import org.apache.rocketmq.remoting.protocol.header.controller.ElectMasterRequestHeader;
import org.apache.rocketmq.remoting.protocol.header.controller.GetReplicaInfoRequestHeader;
+import static org.apache.rocketmq.controller.metrics.ControllerMetricsConstant.LABEL_REQUEST_HANDLE_STATUS;
+import static org.apache.rocketmq.controller.metrics.ControllerMetricsConstant.LABEL_REQUEST_TYPE;
import static org.apache.rocketmq.remoting.protocol.RequestCode.CONTROLLER_APPLY_BROKER_ID;
import static org.apache.rocketmq.remoting.protocol.RequestCode.BROKER_HEARTBEAT;
import static org.apache.rocketmq.remoting.protocol.RequestCode.CLEAN_BROKER_DATA;
@@ -80,6 +87,39 @@
RemotingHelper.parseChannelRemoteAddr(ctx.channel()),
request);
}
+ Stopwatch stopwatch = Stopwatch.createStarted();
+ try {
+ RemotingCommand resp = handleRequest(ctx, request);
+ Attributes attributes = ControllerMetricsManager.newAttributesBuilder()
+ .put(LABEL_REQUEST_TYPE, ControllerMetricsConstant.RequestType.getLowerCaseNameByCode(request.getCode()))
+ .put(LABEL_REQUEST_HANDLE_STATUS, ControllerMetricsConstant.RequestHandleStatus.SUCCESS.getLowerCaseName())
+ .build();
+ ControllerMetricsManager.requestTotal.add(1, attributes);
+ attributes = ControllerMetricsManager.newAttributesBuilder()
+ .put(LABEL_REQUEST_TYPE, ControllerMetricsConstant.RequestType.getLowerCaseNameByCode(request.getCode()))
+ .build();
+ ControllerMetricsManager.requestLatency.record(stopwatch.elapsed(TimeUnit.MICROSECONDS), attributes);
+ return resp;
+ } catch (Exception e) {
+ log.error("process request: {} error, ", request, e);
+ Attributes attributes;
+ if (e instanceof TimeoutException) {
+ attributes = ControllerMetricsManager.newAttributesBuilder()
+ .put(LABEL_REQUEST_TYPE, ControllerMetricsConstant.RequestType.getLowerCaseNameByCode(request.getCode()))
+ .put(LABEL_REQUEST_HANDLE_STATUS, ControllerMetricsConstant.RequestHandleStatus.TIMEOUT.getLowerCaseName())
+ .build();
+ } else {
+ attributes = ControllerMetricsManager.newAttributesBuilder()
+ .put(LABEL_REQUEST_TYPE, ControllerMetricsConstant.RequestType.getLowerCaseNameByCode(request.getCode()))
+ .put(LABEL_REQUEST_HANDLE_STATUS, ControllerMetricsConstant.RequestHandleStatus.FAILED.getLowerCaseName())
+ .build();
+ }
+ ControllerMetricsManager.requestTotal.add(1, attributes);
+ throw e;
+ }
+ }
+
+ private RemotingCommand handleRequest(ChannelHandlerContext ctx, RemotingCommand request) throws Exception {
switch (request.getCode()) {
case CONTROLLER_ALTER_SYNC_STATE_SET:
return this.handleAlterSyncStateSet(ctx, request);
diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/config/ProxyConfig.java b/proxy/src/main/java/org/apache/rocketmq/proxy/config/ProxyConfig.java
index 560cd89..4f57a70 100644
--- a/proxy/src/main/java/org/apache/rocketmq/proxy/config/ProxyConfig.java
+++ b/proxy/src/main/java/org/apache/rocketmq/proxy/config/ProxyConfig.java
@@ -28,9 +28,9 @@
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import org.apache.commons.lang3.StringUtils;
-import org.apache.rocketmq.common.BrokerConfig;
import org.apache.rocketmq.common.MixAll;
import org.apache.rocketmq.common.constant.LoggerName;
+import org.apache.rocketmq.common.metrics.MetricsExporterType;
import org.apache.rocketmq.common.utils.NetworkUtil;
import org.apache.rocketmq.logging.org.slf4j.Logger;
import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
@@ -205,7 +205,7 @@
private boolean traceOn = false;
- private BrokerConfig.MetricsExporterType metricsExporterType = BrokerConfig.MetricsExporterType.DISABLE;
+ private MetricsExporterType metricsExporterType = MetricsExporterType.DISABLE;
private String metricsGrpcExporterTarget = "";
private String metricsGrpcExporterHeader = "";
@@ -1116,20 +1116,20 @@
this.remotingAccessAddr = remotingAccessAddr;
}
- public BrokerConfig.MetricsExporterType getMetricsExporterType() {
+ public MetricsExporterType getMetricsExporterType() {
return metricsExporterType;
}
- public void setMetricsExporterType(BrokerConfig.MetricsExporterType metricsExporterType) {
+ public void setMetricsExporterType(MetricsExporterType metricsExporterType) {
this.metricsExporterType = metricsExporterType;
}
public void setMetricsExporterType(int metricsExporterType) {
- this.metricsExporterType = BrokerConfig.MetricsExporterType.valueOf(metricsExporterType);
+ this.metricsExporterType = MetricsExporterType.valueOf(metricsExporterType);
}
public void setMetricsExporterType(String metricsExporterType) {
- this.metricsExporterType = BrokerConfig.MetricsExporterType.valueOf(metricsExporterType);
+ this.metricsExporterType = MetricsExporterType.valueOf(metricsExporterType);
}
public String getMetricsGrpcExporterTarget() {
diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/metrics/ProxyMetricsManager.java b/proxy/src/main/java/org/apache/rocketmq/proxy/metrics/ProxyMetricsManager.java
index 8474a68..f505085 100644
--- a/proxy/src/main/java/org/apache/rocketmq/proxy/metrics/ProxyMetricsManager.java
+++ b/proxy/src/main/java/org/apache/rocketmq/proxy/metrics/ProxyMetricsManager.java
@@ -39,8 +39,8 @@
import java.util.function.Supplier;
import org.apache.commons.lang3.StringUtils;
import org.apache.rocketmq.broker.metrics.BrokerMetricsManager;
-import org.apache.rocketmq.common.BrokerConfig;
import org.apache.rocketmq.common.constant.LoggerName;
+import org.apache.rocketmq.common.metrics.MetricsExporterType;
import org.apache.rocketmq.common.utils.StartAndShutdown;
import org.apache.rocketmq.proxy.config.ProxyConfig;
import org.apache.rocketmq.logging.org.slf4j.Logger;
@@ -72,7 +72,7 @@
public static ObservableLongGauge proxyUp = null;
public static void initLocalMode(BrokerMetricsManager brokerMetricsManager, ProxyConfig proxyConfig) {
- if (proxyConfig.getMetricsExporterType() == BrokerConfig.MetricsExporterType.DISABLE) {
+ if (proxyConfig.getMetricsExporterType() == MetricsExporterType.DISABLE) {
return;
}
ProxyMetricsManager.proxyConfig = proxyConfig;
@@ -116,7 +116,7 @@
if (proxyConfig == null) {
return false;
}
- BrokerConfig.MetricsExporterType exporterType = proxyConfig.getMetricsExporterType();
+ MetricsExporterType exporterType = proxyConfig.getMetricsExporterType();
if (!exporterType.isEnable()) {
return false;
}
@@ -134,8 +134,8 @@
@Override
public void start() throws Exception {
- BrokerConfig.MetricsExporterType metricsExporterType = proxyConfig.getMetricsExporterType();
- if (metricsExporterType == BrokerConfig.MetricsExporterType.DISABLE) {
+ MetricsExporterType metricsExporterType = proxyConfig.getMetricsExporterType();
+ if (metricsExporterType == MetricsExporterType.DISABLE) {
return;
}
if (!checkConfig()) {
@@ -166,7 +166,7 @@
SdkMeterProviderBuilder providerBuilder = SdkMeterProvider.builder()
.setResource(Resource.empty());
- if (metricsExporterType == BrokerConfig.MetricsExporterType.OTLP_GRPC) {
+ if (metricsExporterType == MetricsExporterType.OTLP_GRPC) {
String endpoint = proxyConfig.getMetricsGrpcExporterTarget();
if (!endpoint.startsWith("http")) {
endpoint = "https://" + endpoint;
@@ -206,7 +206,7 @@
providerBuilder.registerMetricReader(periodicMetricReader);
}
- if (metricsExporterType == BrokerConfig.MetricsExporterType.PROM) {
+ if (metricsExporterType == MetricsExporterType.PROM) {
String promExporterHost = proxyConfig.getMetricsPromExporterHost();
if (StringUtils.isBlank(promExporterHost)) {
promExporterHost = "0.0.0.0";
@@ -218,7 +218,7 @@
providerBuilder.registerMetricReader(prometheusHttpServer);
}
- if (metricsExporterType == BrokerConfig.MetricsExporterType.LOG) {
+ if (metricsExporterType == MetricsExporterType.LOG) {
SLF4JBridgeHandler.removeHandlersForRootLogger();
SLF4JBridgeHandler.install();
loggingMetricExporter = LoggingMetricExporter.create(proxyConfig.isMetricsInDelta() ? AggregationTemporality.DELTA : AggregationTemporality.CUMULATIVE);
@@ -239,16 +239,16 @@
@Override
public void shutdown() throws Exception {
- if (proxyConfig.getMetricsExporterType() == BrokerConfig.MetricsExporterType.OTLP_GRPC) {
+ if (proxyConfig.getMetricsExporterType() == MetricsExporterType.OTLP_GRPC) {
periodicMetricReader.forceFlush();
periodicMetricReader.shutdown();
metricExporter.shutdown();
}
- if (proxyConfig.getMetricsExporterType() == BrokerConfig.MetricsExporterType.PROM) {
+ if (proxyConfig.getMetricsExporterType() == MetricsExporterType.PROM) {
prometheusHttpServer.forceFlush();
prometheusHttpServer.shutdown();
}
- if (proxyConfig.getMetricsExporterType() == BrokerConfig.MetricsExporterType.LOG) {
+ if (proxyConfig.getMetricsExporterType() == MetricsExporterType.LOG) {
periodicMetricReader.forceFlush();
periodicMetricReader.shutdown();
loggingMetricExporter.shutdown();