[ISSUE #9701] Synchronize metrics shutdown to prevent JVM crashes during broker shutdown (#9702)
* fix: synchronize metrics shutdown to prevent JVM crash
- Change async shutdown to sync blocking wait in BrokerMetricsManager
- Ensure proper shutdown order to avoid race conditions
- Prevent accessing dependencies after they are shutdown
- Use join() with timeout to wait for CompletableFuture completion
- Apply fix to all metrics exporter types (OTLP_GRPC, PROM, LOG)
* fix codestyle
---------
Co-authored-by: guyinyou <guyinyou.gyy@alibaba-inc.com>
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/metrics/BrokerMetricsManager.java b/broker/src/main/java/org/apache/rocketmq/broker/metrics/BrokerMetricsManager.java
index 5a32cf3..1b0f244 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/metrics/BrokerMetricsManager.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/metrics/BrokerMetricsManager.java
@@ -434,20 +434,20 @@
);
List<Double> commitLatencyBuckets = Arrays.asList(
- 1d * 1 * 1 * 5, //5s
- 1d * 1 * 1 * 60, //1min
- 1d * 1 * 10 * 60, //10min
- 1d * 1 * 60 * 60, //1h
- 1d * 12 * 60 * 60, //12h
- 1d * 24 * 60 * 60 //24h
+ 1d * 1 * 1 * 5, //5s
+ 1d * 1 * 1 * 60, //1min
+ 1d * 1 * 10 * 60, //10min
+ 1d * 1 * 60 * 60, //1h
+ 1d * 12 * 60 * 60, //12h
+ 1d * 24 * 60 * 60 //24h
);
List<Double> createTimeBuckets = Arrays.asList(
- (double) Duration.ofMillis(10).toMillis(), //10ms
- (double) Duration.ofMillis(100).toMillis(), //100ms
- (double) Duration.ofSeconds(1).toMillis(), //1s
- (double) Duration.ofSeconds(3).toMillis(), //3s
- (double) Duration.ofSeconds(5).toMillis() //5s
+ (double) Duration.ofMillis(10).toMillis(), //10ms
+ (double) Duration.ofMillis(100).toMillis(), //100ms
+ (double) Duration.ofSeconds(1).toMillis(), //1s
+ (double) Duration.ofSeconds(3).toMillis(), //3s
+ (double) Duration.ofSeconds(5).toMillis() //5s
);
InstrumentSelector messageSizeSelector = InstrumentSelector.builder()
.setType(InstrumentType.HISTOGRAM)
@@ -470,17 +470,17 @@
providerBuilder.registerView(commitLatencySelector, commitLatencyViewBuilder.build());
InstrumentSelector createTopicTimeSelector = InstrumentSelector.builder()
- .setType(InstrumentType.HISTOGRAM)
- .setName(HISTOGRAM_TOPIC_CREATE_EXECUTE_TIME)
- .build();
+ .setType(InstrumentType.HISTOGRAM)
+ .setName(HISTOGRAM_TOPIC_CREATE_EXECUTE_TIME)
+ .build();
InstrumentSelector createSubGroupTimeSelector = InstrumentSelector.builder()
- .setType(InstrumentType.HISTOGRAM)
- .setName(HISTOGRAM_CONSUMER_GROUP_CREATE_EXECUTE_TIME)
- .build();
+ .setType(InstrumentType.HISTOGRAM)
+ .setName(HISTOGRAM_CONSUMER_GROUP_CREATE_EXECUTE_TIME)
+ .build();
ViewBuilder createTopicTimeViewBuilder = View.builder()
- .setAggregation(Aggregation.explicitBucketHistogram(createTimeBuckets));
+ .setAggregation(Aggregation.explicitBucketHistogram(createTimeBuckets));
ViewBuilder createSubGroupTimeViewBuilder = View.builder()
- .setAggregation(Aggregation.explicitBucketHistogram(createTimeBuckets));
+ .setAggregation(Aggregation.explicitBucketHistogram(createTimeBuckets));
// To config the cardinalityLimit for openTelemetry metrics exporting.
SdkMeterProviderUtil.setCardinalityLimit(createTopicTimeViewBuilder, brokerConfig.getMetricsOtelCardinalityLimit());
providerBuilder.registerView(createTopicTimeSelector, createTopicTimeViewBuilder.build());
@@ -588,16 +588,16 @@
.build();
topicCreateExecuteTime = brokerMeter.histogramBuilder(HISTOGRAM_TOPIC_CREATE_EXECUTE_TIME)
- .setDescription("The distribution of create topic time")
- .ofLongs()
- .setUnit("milliseconds")
- .build();
+ .setDescription("The distribution of create topic time")
+ .ofLongs()
+ .setUnit("milliseconds")
+ .build();
consumerGroupCreateExecuteTime = brokerMeter.histogramBuilder(HISTOGRAM_CONSUMER_GROUP_CREATE_EXECUTE_TIME)
- .setDescription("The distribution of create subscription time")
- .ofLongs()
- .setUnit("milliseconds")
- .build();
+ .setDescription("The distribution of create subscription time")
+ .ofLongs()
+ .setUnit("milliseconds")
+ .build();
}
private void initConnectionMetrics() {
@@ -720,32 +720,33 @@
}
commitMessagesTotal = brokerMeter.counterBuilder(COUNTER_COMMIT_MESSAGES_TOTAL)
- .setDescription("Total number of commit messages")
- .build();
+ .setDescription("Total number of commit messages")
+ .build();
rollBackMessagesTotal = brokerMeter.counterBuilder(COUNTER_ROLLBACK_MESSAGES_TOTAL)
- .setDescription("Total number of rollback messages")
- .build();
+ .setDescription("Total number of rollback messages")
+ .build();
transactionFinishLatency = brokerMeter.histogramBuilder(HISTOGRAM_FINISH_MSG_LATENCY)
- .setDescription("Transaction finish latency")
- .ofLongs()
- .setUnit("ms")
- .build();
+ .setDescription("Transaction finish latency")
+ .ofLongs()
+ .setUnit("ms")
+ .build();
halfMessages = brokerMeter.gaugeBuilder(GAUGE_HALF_MESSAGES)
- .setDescription("Half messages of all topics")
- .ofLongs()
- .buildWithCallback(measurement -> {
- brokerController.getTransactionalMessageService().getTransactionMetrics().getTransactionCounts()
- .forEach((topic, metric) -> {
- measurement.record(
- metric.getCount().get(),
- newAttributesBuilder().put(DefaultStoreMetricsConstant.LABEL_TOPIC, topic).build()
- );
- });
- });
+ .setDescription("Half messages of all topics")
+ .ofLongs()
+ .buildWithCallback(measurement -> {
+ brokerController.getTransactionalMessageService().getTransactionMetrics().getTransactionCounts()
+ .forEach((topic, metric) -> {
+ measurement.record(
+ metric.getCount().get(),
+ newAttributesBuilder().put(DefaultStoreMetricsConstant.LABEL_TOPIC, topic).build()
+ );
+ });
+ });
}
+
private void initOtherMetrics() {
if (brokerConfig.isEnableRemotingMetrics()) {
RemotingMetricsManager.initMetrics(brokerMeter, this::newAttributesBuilder);
@@ -759,19 +760,45 @@
}
public void shutdown() {
- if (brokerConfig.getMetricsExporterType() == MetricsExporterType.OTLP_GRPC) {
- periodicMetricReader.forceFlush();
- periodicMetricReader.shutdown();
- metricExporter.shutdown();
- }
- if (brokerConfig.getMetricsExporterType() == MetricsExporterType.PROM) {
- prometheusHttpServer.forceFlush();
- prometheusHttpServer.shutdown();
- }
- if (brokerConfig.getMetricsExporterType() == MetricsExporterType.LOG) {
- periodicMetricReader.forceFlush();
- periodicMetricReader.shutdown();
- loggingMetricExporter.shutdown();
+ if (brokerConfig.isInBrokerContainer()) {
+ // only rto need
+ if (brokerConfig.getMetricsExporterType() == MetricsExporterType.OTLP_GRPC) {
+ while (!periodicMetricReader.forceFlush().join(60, TimeUnit.SECONDS).isDone()) {
+ }
+ while (!periodicMetricReader.shutdown().join(60, TimeUnit.SECONDS).isSuccess()) {
+ }
+ while (!metricExporter.shutdown().join(60, TimeUnit.SECONDS).isSuccess()) {
+ }
+ }
+ if (brokerConfig.getMetricsExporterType() == MetricsExporterType.PROM) {
+ while (!prometheusHttpServer.forceFlush().join(60, TimeUnit.SECONDS).isDone()) {
+ }
+ while (!prometheusHttpServer.shutdown().join(60, TimeUnit.SECONDS).isSuccess()) {
+ }
+ }
+ if (brokerConfig.getMetricsExporterType() == MetricsExporterType.LOG) {
+ while (!periodicMetricReader.forceFlush().join(60, TimeUnit.SECONDS).isDone()) {
+ }
+ while (!periodicMetricReader.shutdown().join(60, TimeUnit.SECONDS).isSuccess()) {
+ }
+ while (!loggingMetricExporter.shutdown().join(60, TimeUnit.SECONDS).isSuccess()) {
+ }
+ }
+ } else {
+ if (brokerConfig.getMetricsExporterType() == MetricsExporterType.OTLP_GRPC) {
+ periodicMetricReader.forceFlush();
+ periodicMetricReader.shutdown();
+ metricExporter.shutdown();
+ }
+ if (brokerConfig.getMetricsExporterType() == MetricsExporterType.PROM) {
+ prometheusHttpServer.forceFlush();
+ prometheusHttpServer.shutdown();
+ }
+ if (brokerConfig.getMetricsExporterType() == MetricsExporterType.LOG) {
+ periodicMetricReader.forceFlush();
+ periodicMetricReader.shutdown();
+ loggingMetricExporter.shutdown();
+ }
}
}