[ISSUE #9701] Synchronize metrics shutdown to prevent JVM crashes during broker shutdown (#9702)

* fix: synchronize metrics shutdown to prevent JVM crash

- Change async shutdown to sync blocking wait in BrokerMetricsManager
- Ensure proper shutdown order to avoid race conditions
- Prevent accessing dependencies after they are shutdown
- Use join() with timeout to wait for CompletableFuture completion
- Apply fix to all metrics exporter types (OTLP_GRPC, PROM, LOG)

* fix codestyle

---------

Co-authored-by: guyinyou <guyinyou.gyy@alibaba-inc.com>
diff --git a/broker/src/main/java/org/apache/rocketmq/broker/metrics/BrokerMetricsManager.java b/broker/src/main/java/org/apache/rocketmq/broker/metrics/BrokerMetricsManager.java
index 5a32cf3..1b0f244 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/metrics/BrokerMetricsManager.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/metrics/BrokerMetricsManager.java
@@ -434,20 +434,20 @@
         );
 
         List<Double> commitLatencyBuckets = Arrays.asList(
-                1d * 1 * 1 * 5, //5s
-                1d * 1 * 1 * 60, //1min
-                1d * 1 * 10 * 60, //10min
-                1d * 1 * 60 * 60, //1h
-                1d * 12 * 60 * 60, //12h
-                1d * 24 * 60 * 60 //24h
+            1d * 1 * 1 * 5, //5s
+            1d * 1 * 1 * 60, //1min
+            1d * 1 * 10 * 60, //10min
+            1d * 1 * 60 * 60, //1h
+            1d * 12 * 60 * 60, //12h
+            1d * 24 * 60 * 60 //24h
         );
 
         List<Double> createTimeBuckets = Arrays.asList(
-                (double) Duration.ofMillis(10).toMillis(), //10ms
-                (double) Duration.ofMillis(100).toMillis(), //100ms
-                (double) Duration.ofSeconds(1).toMillis(), //1s
-                (double) Duration.ofSeconds(3).toMillis(), //3s
-                (double) Duration.ofSeconds(5).toMillis() //5s
+            (double) Duration.ofMillis(10).toMillis(), //10ms
+            (double) Duration.ofMillis(100).toMillis(), //100ms
+            (double) Duration.ofSeconds(1).toMillis(), //1s
+            (double) Duration.ofSeconds(3).toMillis(), //3s
+            (double) Duration.ofSeconds(5).toMillis() //5s
         );
         InstrumentSelector messageSizeSelector = InstrumentSelector.builder()
             .setType(InstrumentType.HISTOGRAM)
@@ -470,17 +470,17 @@
         providerBuilder.registerView(commitLatencySelector, commitLatencyViewBuilder.build());
 
         InstrumentSelector createTopicTimeSelector = InstrumentSelector.builder()
-                .setType(InstrumentType.HISTOGRAM)
-                .setName(HISTOGRAM_TOPIC_CREATE_EXECUTE_TIME)
-                .build();
+            .setType(InstrumentType.HISTOGRAM)
+            .setName(HISTOGRAM_TOPIC_CREATE_EXECUTE_TIME)
+            .build();
         InstrumentSelector createSubGroupTimeSelector = InstrumentSelector.builder()
-                .setType(InstrumentType.HISTOGRAM)
-                .setName(HISTOGRAM_CONSUMER_GROUP_CREATE_EXECUTE_TIME)
-                .build();
+            .setType(InstrumentType.HISTOGRAM)
+            .setName(HISTOGRAM_CONSUMER_GROUP_CREATE_EXECUTE_TIME)
+            .build();
         ViewBuilder createTopicTimeViewBuilder = View.builder()
-                .setAggregation(Aggregation.explicitBucketHistogram(createTimeBuckets));
+            .setAggregation(Aggregation.explicitBucketHistogram(createTimeBuckets));
         ViewBuilder createSubGroupTimeViewBuilder = View.builder()
-                .setAggregation(Aggregation.explicitBucketHistogram(createTimeBuckets));
+            .setAggregation(Aggregation.explicitBucketHistogram(createTimeBuckets));
         // To config the cardinalityLimit for openTelemetry metrics exporting.
         SdkMeterProviderUtil.setCardinalityLimit(createTopicTimeViewBuilder, brokerConfig.getMetricsOtelCardinalityLimit());
         providerBuilder.registerView(createTopicTimeSelector, createTopicTimeViewBuilder.build());
@@ -588,16 +588,16 @@
             .build();
 
         topicCreateExecuteTime = brokerMeter.histogramBuilder(HISTOGRAM_TOPIC_CREATE_EXECUTE_TIME)
-                .setDescription("The distribution of create topic time")
-                .ofLongs()
-                .setUnit("milliseconds")
-                .build();
+            .setDescription("The distribution of create topic time")
+            .ofLongs()
+            .setUnit("milliseconds")
+            .build();
 
         consumerGroupCreateExecuteTime = brokerMeter.histogramBuilder(HISTOGRAM_CONSUMER_GROUP_CREATE_EXECUTE_TIME)
-                .setDescription("The distribution of create subscription time")
-                .ofLongs()
-                .setUnit("milliseconds")
-                .build();
+            .setDescription("The distribution of create subscription time")
+            .ofLongs()
+            .setUnit("milliseconds")
+            .build();
     }
 
     private void initConnectionMetrics() {
@@ -720,32 +720,33 @@
         }
 
         commitMessagesTotal = brokerMeter.counterBuilder(COUNTER_COMMIT_MESSAGES_TOTAL)
-                .setDescription("Total number of commit messages")
-                .build();
+            .setDescription("Total number of commit messages")
+            .build();
 
         rollBackMessagesTotal = brokerMeter.counterBuilder(COUNTER_ROLLBACK_MESSAGES_TOTAL)
-                .setDescription("Total number of rollback messages")
-                .build();
+            .setDescription("Total number of rollback messages")
+            .build();
 
         transactionFinishLatency = brokerMeter.histogramBuilder(HISTOGRAM_FINISH_MSG_LATENCY)
-                .setDescription("Transaction finish latency")
-                .ofLongs()
-                .setUnit("ms")
-                .build();
+            .setDescription("Transaction finish latency")
+            .ofLongs()
+            .setUnit("ms")
+            .build();
 
         halfMessages = brokerMeter.gaugeBuilder(GAUGE_HALF_MESSAGES)
-                .setDescription("Half messages of all topics")
-                .ofLongs()
-                .buildWithCallback(measurement -> {
-                    brokerController.getTransactionalMessageService().getTransactionMetrics().getTransactionCounts()
-                            .forEach((topic, metric) -> {
-                                measurement.record(
-                                        metric.getCount().get(),
-                                        newAttributesBuilder().put(DefaultStoreMetricsConstant.LABEL_TOPIC, topic).build()
-                                );
-                            });
-                });
+            .setDescription("Half messages of all topics")
+            .ofLongs()
+            .buildWithCallback(measurement -> {
+                brokerController.getTransactionalMessageService().getTransactionMetrics().getTransactionCounts()
+                    .forEach((topic, metric) -> {
+                        measurement.record(
+                            metric.getCount().get(),
+                            newAttributesBuilder().put(DefaultStoreMetricsConstant.LABEL_TOPIC, topic).build()
+                        );
+                    });
+            });
     }
+
     private void initOtherMetrics() {
         if (brokerConfig.isEnableRemotingMetrics()) {
             RemotingMetricsManager.initMetrics(brokerMeter, this::newAttributesBuilder);
@@ -759,19 +760,45 @@
     }
 
     public void shutdown() {
-        if (brokerConfig.getMetricsExporterType() == MetricsExporterType.OTLP_GRPC) {
-            periodicMetricReader.forceFlush();
-            periodicMetricReader.shutdown();
-            metricExporter.shutdown();
-        }
-        if (brokerConfig.getMetricsExporterType() == MetricsExporterType.PROM) {
-            prometheusHttpServer.forceFlush();
-            prometheusHttpServer.shutdown();
-        }
-        if (brokerConfig.getMetricsExporterType() == MetricsExporterType.LOG) {
-            periodicMetricReader.forceFlush();
-            periodicMetricReader.shutdown();
-            loggingMetricExporter.shutdown();
+        if (brokerConfig.isInBrokerContainer()) {
+            // only rto need
+            if (brokerConfig.getMetricsExporterType() == MetricsExporterType.OTLP_GRPC) {
+                while (!periodicMetricReader.forceFlush().join(60, TimeUnit.SECONDS).isDone()) {
+                }
+                while (!periodicMetricReader.shutdown().join(60, TimeUnit.SECONDS).isSuccess()) {
+                }
+                while (!metricExporter.shutdown().join(60, TimeUnit.SECONDS).isSuccess()) {
+                }
+            }
+            if (brokerConfig.getMetricsExporterType() == MetricsExporterType.PROM) {
+                while (!prometheusHttpServer.forceFlush().join(60, TimeUnit.SECONDS).isDone()) {
+                }
+                while (!prometheusHttpServer.shutdown().join(60, TimeUnit.SECONDS).isSuccess()) {
+                }
+            }
+            if (brokerConfig.getMetricsExporterType() == MetricsExporterType.LOG) {
+                while (!periodicMetricReader.forceFlush().join(60, TimeUnit.SECONDS).isDone()) {
+                }
+                while (!periodicMetricReader.shutdown().join(60, TimeUnit.SECONDS).isSuccess()) {
+                }
+                while (!loggingMetricExporter.shutdown().join(60, TimeUnit.SECONDS).isSuccess()) {
+                }
+            }
+        } else {
+            if (brokerConfig.getMetricsExporterType() == MetricsExporterType.OTLP_GRPC) {
+                periodicMetricReader.forceFlush();
+                periodicMetricReader.shutdown();
+                metricExporter.shutdown();
+            }
+            if (brokerConfig.getMetricsExporterType() == MetricsExporterType.PROM) {
+                prometheusHttpServer.forceFlush();
+                prometheusHttpServer.shutdown();
+            }
+            if (brokerConfig.getMetricsExporterType() == MetricsExporterType.LOG) {
+                periodicMetricReader.forceFlush();
+                periodicMetricReader.shutdown();
+                loggingMetricExporter.shutdown();
+            }
         }
     }