[feat][broker] PIP-264: Add managed ledger cache metrics (#22898)

diff --git a/managed-ledger/pom.xml b/managed-ledger/pom.xml
index d8b3122..60a4eda 100644
--- a/managed-ledger/pom.xml
+++ b/managed-ledger/pom.xml
@@ -72,6 +72,12 @@
     </dependency>
 
     <dependency>
+      <groupId>${project.groupId}</groupId>
+      <artifactId>pulsar-opentelemetry</artifactId>
+      <version>${project.version}</version>
+    </dependency>
+
+    <dependency>
       <groupId>com.google.guava</groupId>
       <artifactId>guava</artifactId>
     </dependency>
@@ -121,6 +127,12 @@
     </dependency>
 
     <dependency>
+      <groupId>io.opentelemetry</groupId>
+      <artifactId>opentelemetry-sdk-testing</artifactId>
+      <scope>test</scope>
+    </dependency>
+
+    <dependency>
       <groupId>org.slf4j</groupId>
       <artifactId>slf4j-api</artifactId>
     </dependency>
diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerFactoryMXBean.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerFactoryMXBean.java
index 35c26c5..43e8196 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerFactoryMXBean.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerFactoryMXBean.java
@@ -48,26 +48,51 @@
     double getCacheHitsRate();
 
     /**
+     * Cumulative number of cache hits.
+     */
+    long getCacheHitsTotal();
+
+    /**
      * Get the number of cache misses per second.
      */
     double getCacheMissesRate();
 
     /**
+     * Cumulative number of cache misses.
+     */
+    long getCacheMissesTotal();
+
+    /**
      * Get the amount of data is retrieved from the cache in byte/s.
      */
     double getCacheHitsThroughput();
 
     /**
+     * Cumulative amount of data retrieved from the cache in bytes.
+     */
+    long getCacheHitsBytesTotal();
+
+    /**
      * Get the amount of data is retrieved from the bookkeeper in byte/s.
      */
     double getCacheMissesThroughput();
 
     /**
+     * Cumulative amount of data retrieved from the bookkeeper in bytes.
+     */
+    long getCacheMissesBytesTotal();
+
+    /**
      * Get the number of cache evictions during the last minute.
      */
     long getNumberOfCacheEvictions();
 
     /**
+     * Cumulative number of cache evictions.
+     */
+    long getNumberOfCacheEvictionsTotal();
+
+    /**
      * Cumulative number of entries inserted into the cache.
      */
     long getCacheInsertedEntriesCount();
diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/OpenTelemetryManagedLedgerCacheStats.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/OpenTelemetryManagedLedgerCacheStats.java
new file mode 100644
index 0000000..13e7ed6
--- /dev/null
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/OpenTelemetryManagedLedgerCacheStats.java
@@ -0,0 +1,172 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.bookkeeper.mledger;
+
+import io.opentelemetry.api.OpenTelemetry;
+import io.opentelemetry.api.metrics.BatchCallback;
+import io.opentelemetry.api.metrics.ObservableLongMeasurement;
+import org.apache.bookkeeper.mledger.impl.ManagedLedgerFactoryImpl;
+import org.apache.bookkeeper.mledger.impl.cache.PooledByteBufAllocatorStats;
+import org.apache.bookkeeper.mledger.impl.cache.RangeEntryCacheImpl;
+import org.apache.pulsar.opentelemetry.Constants;
+import org.apache.pulsar.opentelemetry.OpenTelemetryAttributes.CacheEntryStatus;
+import org.apache.pulsar.opentelemetry.OpenTelemetryAttributes.CacheOperationStatus;
+import org.apache.pulsar.opentelemetry.OpenTelemetryAttributes.PoolArenaType;
+import org.apache.pulsar.opentelemetry.OpenTelemetryAttributes.PoolChunkAllocationType;
+
+public class OpenTelemetryManagedLedgerCacheStats implements AutoCloseable {
+
+    // Replaces pulsar_ml_count
+    public static final String MANAGED_LEDGER_COUNTER = "pulsar.broker.managed_ledger.count";
+    private final ObservableLongMeasurement managedLedgerCounter;
+
+    // Replaces pulsar_ml_cache_evictions
+    public static final String CACHE_EVICTION_OPERATION_COUNTER = "pulsar.broker.managed_ledger.cache.eviction.count";
+    private final ObservableLongMeasurement cacheEvictionOperationCounter;
+
+    // Replaces 'pulsar_ml_cache_entries',
+    //          'pulsar_ml_cache_inserted_entries_total',
+    //          'pulsar_ml_cache_evicted_entries_total'
+    public static final String CACHE_ENTRY_COUNTER = "pulsar.broker.managed_ledger.cache.entry.count";
+    private final ObservableLongMeasurement cacheEntryCounter;
+
+    // Replaces pulsar_ml_cache_used_size
+    public static final String CACHE_SIZE_COUNTER = "pulsar.broker.managed_ledger.cache.entry.size";
+    private final ObservableLongMeasurement cacheSizeCounter;
+
+    // Replaces pulsar_ml_cache_hits_rate, pulsar_ml_cache_misses_rate
+    public static final String CACHE_OPERATION_COUNTER = "pulsar.broker.managed_ledger.cache.operation.count";
+    private final ObservableLongMeasurement cacheOperationCounter;
+
+    // Replaces pulsar_ml_cache_hits_throughput, pulsar_ml_cache_misses_throughput
+    public static final String CACHE_OPERATION_BYTES_COUNTER = "pulsar.broker.managed_ledger.cache.operation.size";
+    private final ObservableLongMeasurement cacheOperationBytesCounter;
+
+    // Replaces 'pulsar_ml_cache_pool_active_allocations',
+    //          'pulsar_ml_cache_pool_active_allocations_huge',
+    //          'pulsar_ml_cache_pool_active_allocations_normal',
+    //          'pulsar_ml_cache_pool_active_allocations_small'
+    public static final String CACHE_POOL_ACTIVE_ALLOCATION_COUNTER =
+            "pulsar.broker.managed_ledger.cache.pool.allocation.active.count";
+    private final ObservableLongMeasurement cachePoolActiveAllocationCounter;
+
+    // Replaces ['pulsar_ml_cache_pool_allocated', 'pulsar_ml_cache_pool_used']
+    public static final String CACHE_POOL_ACTIVE_ALLOCATION_SIZE_COUNTER =
+            "pulsar.broker.managed_ledger.cache.pool.allocation.size";
+    private final ObservableLongMeasurement cachePoolActiveAllocationSizeCounter;
+
+    private final BatchCallback batchCallback;
+
+    public OpenTelemetryManagedLedgerCacheStats(OpenTelemetry openTelemetry, ManagedLedgerFactoryImpl factory) {
+        var meter = openTelemetry.getMeter(Constants.BROKER_INSTRUMENTATION_SCOPE_NAME);
+
+        managedLedgerCounter = meter
+                .upDownCounterBuilder(MANAGED_LEDGER_COUNTER)
+                .setUnit("{managed_ledger}")
+                .setDescription("The total number of managed ledgers.")
+                .buildObserver();
+
+        cacheEvictionOperationCounter = meter
+                .counterBuilder(CACHE_EVICTION_OPERATION_COUNTER)
+                .setUnit("{eviction}")
+                .setDescription("The total number of cache eviction operations.")
+                .buildObserver();
+
+        cacheEntryCounter = meter
+                .upDownCounterBuilder(CACHE_ENTRY_COUNTER)
+                .setUnit("{entry}")
+                .setDescription("The number of entries in the entry cache.")
+                .buildObserver();
+
+        cacheSizeCounter = meter
+                .upDownCounterBuilder(CACHE_SIZE_COUNTER)
+                .setUnit("{By}")
+                .setDescription("The byte amount of entries stored in the entry cache.")
+                .buildObserver();
+
+        cacheOperationCounter = meter
+                .counterBuilder(CACHE_OPERATION_COUNTER)
+                .setUnit("{entry}")
+                .setDescription("The number of cache operations.")
+                .buildObserver();
+
+        cacheOperationBytesCounter = meter
+                .counterBuilder(CACHE_OPERATION_BYTES_COUNTER)
+                .setUnit("{By}")
+                .setDescription("The byte amount of data retrieved from cache operations.")
+                .buildObserver();
+
+        cachePoolActiveAllocationCounter = meter
+                .upDownCounterBuilder(CACHE_POOL_ACTIVE_ALLOCATION_COUNTER)
+                .setUnit("{allocation}")
+                .setDescription("The number of currently active allocations in the direct arena.")
+                .buildObserver();
+
+        cachePoolActiveAllocationSizeCounter = meter
+                .upDownCounterBuilder(CACHE_POOL_ACTIVE_ALLOCATION_SIZE_COUNTER)
+                .setUnit("{By}")
+                .setDescription("The memory allocated in the direct arena.")
+                .buildObserver();
+
+
+        batchCallback = meter.batchCallback(() -> recordMetrics(factory),
+                managedLedgerCounter,
+                cacheEvictionOperationCounter,
+                cacheEntryCounter,
+                cacheSizeCounter,
+                cacheOperationCounter,
+                cacheOperationBytesCounter,
+                cachePoolActiveAllocationCounter,
+                cachePoolActiveAllocationSizeCounter);
+    }
+
+    @Override
+    public void close() {
+        batchCallback.close();
+    }
+
+    private void recordMetrics(ManagedLedgerFactoryImpl factory) {
+        var stats = factory.getCacheStats();
+
+        managedLedgerCounter.record(stats.getNumberOfManagedLedgers());
+        cacheEvictionOperationCounter.record(stats.getNumberOfCacheEvictionsTotal());
+
+        var entriesOut = stats.getCacheEvictedEntriesCount();
+        var entriesIn = stats.getCacheInsertedEntriesCount();
+        var entriesActive = entriesIn - entriesOut;
+        cacheEntryCounter.record(entriesActive, CacheEntryStatus.ACTIVE.attributes);
+        cacheEntryCounter.record(entriesIn, CacheEntryStatus.INSERTED.attributes);
+        cacheEntryCounter.record(entriesOut, CacheEntryStatus.EVICTED.attributes);
+        cacheSizeCounter.record(stats.getCacheUsedSize());
+
+        cacheOperationCounter.record(stats.getCacheHitsTotal(), CacheOperationStatus.HIT.attributes);
+        cacheOperationBytesCounter.record(stats.getCacheHitsBytesTotal(), CacheOperationStatus.HIT.attributes);
+        cacheOperationCounter.record(stats.getCacheMissesTotal(), CacheOperationStatus.MISS.attributes);
+        cacheOperationBytesCounter.record(stats.getCacheMissesBytesTotal(), CacheOperationStatus.MISS.attributes);
+
+        var allocatorStats = new PooledByteBufAllocatorStats(RangeEntryCacheImpl.ALLOCATOR);
+        cachePoolActiveAllocationCounter.record(allocatorStats.activeAllocationsSmall, PoolArenaType.SMALL.attributes);
+        cachePoolActiveAllocationCounter.record(allocatorStats.activeAllocationsNormal,
+                PoolArenaType.NORMAL.attributes);
+        cachePoolActiveAllocationCounter.record(allocatorStats.activeAllocationsHuge, PoolArenaType.HUGE.attributes);
+        cachePoolActiveAllocationSizeCounter.record(allocatorStats.totalAllocated,
+                PoolChunkAllocationType.ALLOCATED.attributes);
+        cachePoolActiveAllocationSizeCounter.record(allocatorStats.totalUsed, PoolChunkAllocationType.USED.attributes);
+    }
+}
\ No newline at end of file
diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java
index 0b0f66d..fc291b8 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java
@@ -24,6 +24,7 @@
 import com.google.common.base.Predicates;
 import com.google.common.collect.Maps;
 import io.netty.util.concurrent.DefaultThreadFactory;
+import io.opentelemetry.api.OpenTelemetry;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.HashMap;
@@ -67,6 +68,7 @@
 import org.apache.bookkeeper.mledger.ManagedLedgerInfo.MessageRangeInfo;
 import org.apache.bookkeeper.mledger.ManagedLedgerInfo.PositionInfo;
 import org.apache.bookkeeper.mledger.MetadataCompressionConfig;
+import org.apache.bookkeeper.mledger.OpenTelemetryManagedLedgerCacheStats;
 import org.apache.bookkeeper.mledger.Position;
 import org.apache.bookkeeper.mledger.ReadOnlyCursor;
 import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl.ManagedLedgerInitializeLedgerCallback;
@@ -118,6 +120,8 @@
     private volatile long cacheEvictionTimeThresholdNanos;
     private final MetadataStore metadataStore;
 
+    private final OpenTelemetryManagedLedgerCacheStats openTelemetryCacheStats;
+
     //indicate whether shutdown() is called.
     private volatile boolean closed;
 
@@ -149,7 +153,7 @@
                                     ManagedLedgerFactoryConfig config)
             throws Exception {
         this(metadataStore, new DefaultBkFactory(bkClientConfiguration),
-                true /* isBookkeeperManaged */, config, NullStatsLogger.INSTANCE);
+                true /* isBookkeeperManaged */, config, NullStatsLogger.INSTANCE, OpenTelemetry.noop());
     }
 
     public ManagedLedgerFactoryImpl(MetadataStoreExtended metadataStore, BookKeeper bookKeeper)
@@ -168,21 +172,24 @@
                                     ManagedLedgerFactoryConfig config)
             throws Exception {
         this(metadataStore, bookKeeperGroupFactory, false /* isBookkeeperManaged */,
-                config, NullStatsLogger.INSTANCE);
+                config, NullStatsLogger.INSTANCE, OpenTelemetry.noop());
     }
 
     public ManagedLedgerFactoryImpl(MetadataStoreExtended metadataStore,
                                     BookkeeperFactoryForCustomEnsemblePlacementPolicy bookKeeperGroupFactory,
-                                    ManagedLedgerFactoryConfig config, StatsLogger statsLogger)
+                                    ManagedLedgerFactoryConfig config, StatsLogger statsLogger,
+                                    OpenTelemetry openTelemetry)
             throws Exception {
         this(metadataStore, bookKeeperGroupFactory, false /* isBookkeeperManaged */,
-                config, statsLogger);
+                config, statsLogger, openTelemetry);
     }
 
     private ManagedLedgerFactoryImpl(MetadataStoreExtended metadataStore,
                                      BookkeeperFactoryForCustomEnsemblePlacementPolicy bookKeeperGroupFactory,
                                      boolean isBookkeeperManaged,
-                                     ManagedLedgerFactoryConfig config, StatsLogger statsLogger) throws Exception {
+                                     ManagedLedgerFactoryConfig config,
+                                     StatsLogger statsLogger,
+                                     OpenTelemetry openTelemetry) throws Exception {
         MetadataCompressionConfig compressionConfigForManagedLedgerInfo =
                 config.getCompressionConfigForManagedLedgerInfo();
         MetadataCompressionConfig compressionConfigForManagedCursorInfo =
@@ -220,6 +227,8 @@
         closed = false;
 
         metadataStore.registerSessionListener(this::handleMetadataStoreNotification);
+
+        openTelemetryCacheStats = new OpenTelemetryManagedLedgerCacheStats(openTelemetry, this);
     }
 
     static class DefaultBkFactory implements BookkeeperFactoryForCustomEnsemblePlacementPolicy {
@@ -611,6 +620,7 @@
                     }));
                 }).thenAcceptAsync(__ -> {
                     //wait for tasks in scheduledExecutor executed.
+                    openTelemetryCacheStats.close();
                     scheduledExecutor.shutdownNow();
                     entryCacheManager.clear();
                 });
diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryMBeanImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryMBeanImpl.java
index cf3d714..a3038a0 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryMBeanImpl.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryMBeanImpl.java
@@ -100,25 +100,50 @@
     }
 
     @Override
+    public long getCacheHitsTotal() {
+        return cacheHits.getTotalCount();
+    }
+
+    @Override
     public double getCacheMissesRate() {
         return cacheMisses.getRate();
     }
 
     @Override
+    public long getCacheMissesTotal() {
+        return cacheMisses.getTotalCount();
+    }
+
+    @Override
     public double getCacheHitsThroughput() {
         return cacheHits.getValueRate();
     }
 
     @Override
+    public long getCacheHitsBytesTotal() {
+        return cacheHits.getTotalValue();
+    }
+
+    @Override
     public double getCacheMissesThroughput() {
         return cacheMisses.getValueRate();
     }
 
     @Override
+    public long getCacheMissesBytesTotal() {
+        return cacheMisses.getTotalValue();
+    }
+
+    @Override
     public long getNumberOfCacheEvictions() {
         return cacheEvictions.getCount();
     }
 
+    @Override
+    public long getNumberOfCacheEvictionsTotal() {
+        return cacheEvictions.getTotalCount();
+    }
+
     public long getCacheInsertedEntriesCount() {
         return insertedEntryCount.sum();
     }
diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/PooledByteBufAllocatorStats.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/PooledByteBufAllocatorStats.java
new file mode 100644
index 0000000..4f6a18c
--- /dev/null
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/PooledByteBufAllocatorStats.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.bookkeeper.mledger.impl.cache;
+
+import io.netty.buffer.PooledByteBufAllocator;
+import lombok.Value;
+
+@Value
+public class PooledByteBufAllocatorStats {
+
+    public long activeAllocations;
+    public long activeAllocationsSmall;
+    public long activeAllocationsNormal;
+    public long activeAllocationsHuge;
+
+    public long totalAllocated;
+    public long totalUsed;
+
+    public PooledByteBufAllocatorStats(PooledByteBufAllocator allocator) {
+        long activeAllocations = 0;
+        long activeAllocationsSmall = 0;
+        long activeAllocationsNormal = 0;
+        long activeAllocationsHuge = 0;
+        long totalAllocated = 0;
+        long totalUsed = 0;
+
+        for (var arena : allocator.metric().directArenas()) {
+            activeAllocations += arena.numActiveAllocations();
+            activeAllocationsSmall += arena.numActiveSmallAllocations();
+            activeAllocationsNormal += arena.numActiveNormalAllocations();
+            activeAllocationsHuge += arena.numActiveHugeAllocations();
+
+            for (var list : arena.chunkLists()) {
+                for (var chunk : list) {
+                    int size = chunk.chunkSize();
+                    int used = size - chunk.freeBytes();
+
+                    totalAllocated += size;
+                    totalUsed += used;
+                }
+            }
+        }
+
+        this.activeAllocations = activeAllocations;
+        this.activeAllocationsSmall = activeAllocationsSmall;
+        this.activeAllocationsNormal = activeAllocationsNormal;
+        this.activeAllocationsHuge = activeAllocationsHuge;
+
+        this.totalAllocated = totalAllocated;
+        this.totalUsed = totalUsed;
+    }
+}
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/ManagedLedgerClientFactory.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/ManagedLedgerClientFactory.java
index 6ed95f1..9bbc285 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/ManagedLedgerClientFactory.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/ManagedLedgerClientFactory.java
@@ -22,6 +22,7 @@
 import com.github.benmanes.caffeine.cache.Caffeine;
 import com.google.common.annotations.VisibleForTesting;
 import io.netty.channel.EventLoopGroup;
+import io.opentelemetry.api.OpenTelemetry;
 import java.io.IOException;
 import java.util.Map;
 import java.util.Optional;
@@ -54,9 +55,11 @@
             bkEnsemblePolicyToBkClientMap = Caffeine.newBuilder().buildAsync();
     private StatsProvider statsProvider = new NullStatsProvider();
 
+    @Override
     public void initialize(ServiceConfiguration conf, MetadataStoreExtended metadataStore,
                            BookKeeperClientFactory bookkeeperProvider,
-                           EventLoopGroup eventLoopGroup) throws Exception {
+                           EventLoopGroup eventLoopGroup,
+                           OpenTelemetry openTelemetry) throws Exception {
         ManagedLedgerFactoryConfig managedLedgerFactoryConfig = new ManagedLedgerFactoryConfig();
         managedLedgerFactoryConfig.setMaxCacheSize(conf.getManagedLedgerCacheSizeMB() * 1024L * 1024L);
         managedLedgerFactoryConfig.setCacheEvictionWatermark(conf.getManagedLedgerCacheEvictionWatermark());
@@ -109,7 +112,8 @@
 
         try {
             this.managedLedgerFactory =
-                    new ManagedLedgerFactoryImpl(metadataStore, bkFactory, managedLedgerFactoryConfig, statsLogger);
+                    new ManagedLedgerFactoryImpl(metadataStore, bkFactory, managedLedgerFactoryConfig, statsLogger,
+                            openTelemetry);
         } catch (Exception e) {
             statsProvider.stop();
             defaultBkClient.close();
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
index 617afc6..4fa773d 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
@@ -1069,7 +1069,7 @@
     protected ManagedLedgerStorage newManagedLedgerClientFactory() throws Exception {
         return ManagedLedgerStorage.create(
                 config, localMetadataStore,
-                bkClientFactory, ioEventLoopGroup
+                bkClientFactory, ioEventLoopGroup, openTelemetry.getOpenTelemetryService().getOpenTelemetry()
         );
     }
 
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/PulsarBrokerOpenTelemetry.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/PulsarBrokerOpenTelemetry.java
index c1bcfad..178da8b 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/PulsarBrokerOpenTelemetry.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/PulsarBrokerOpenTelemetry.java
@@ -26,6 +26,7 @@
 import lombok.Getter;
 import org.apache.pulsar.PulsarVersion;
 import org.apache.pulsar.broker.ServiceConfiguration;
+import org.apache.pulsar.opentelemetry.Constants;
 import org.apache.pulsar.opentelemetry.OpenTelemetryService;
 
 public class PulsarBrokerOpenTelemetry implements Closeable {
@@ -46,7 +47,7 @@
                 .serviceVersion(PulsarVersion.getVersion())
                 .builderCustomizer(builderCustomizer)
                 .build();
-        meter = openTelemetryService.getOpenTelemetry().getMeter("org.apache.pulsar.broker");
+        meter = openTelemetryService.getOpenTelemetry().getMeter(Constants.BROKER_INSTRUMENTATION_SCOPE_NAME);
     }
 
     @Override
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/metrics/ManagedLedgerCacheMetrics.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/metrics/ManagedLedgerCacheMetrics.java
index 890a37a..9eb4beb 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/metrics/ManagedLedgerCacheMetrics.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/metrics/ManagedLedgerCacheMetrics.java
@@ -18,13 +18,10 @@
  */
 package org.apache.pulsar.broker.stats.metrics;
 
-import io.netty.buffer.PoolArenaMetric;
-import io.netty.buffer.PoolChunkListMetric;
-import io.netty.buffer.PoolChunkMetric;
-import io.netty.buffer.PooledByteBufAllocator;
 import java.util.ArrayList;
 import java.util.List;
 import org.apache.bookkeeper.mledger.ManagedLedgerFactoryMXBean;
+import org.apache.bookkeeper.mledger.impl.cache.PooledByteBufAllocatorStats;
 import org.apache.bookkeeper.mledger.impl.cache.RangeEntryCacheImpl;
 import org.apache.pulsar.broker.PulsarService;
 import org.apache.pulsar.common.stats.Metrics;
@@ -57,37 +54,13 @@
         m.put("brk_ml_cache_hits_throughput", mlCacheStats.getCacheHitsThroughput());
         m.put("brk_ml_cache_misses_throughput", mlCacheStats.getCacheMissesThroughput());
 
-        PooledByteBufAllocator allocator = RangeEntryCacheImpl.ALLOCATOR;
-        long activeAllocations = 0;
-        long activeAllocationsSmall = 0;
-        long activeAllocationsNormal = 0;
-        long activeAllocationsHuge = 0;
-        long totalAllocated = 0;
-        long totalUsed = 0;
-
-        for (PoolArenaMetric arena : allocator.metric().directArenas()) {
-            activeAllocations += arena.numActiveAllocations();
-            activeAllocationsSmall += arena.numActiveSmallAllocations();
-            activeAllocationsNormal += arena.numActiveNormalAllocations();
-            activeAllocationsHuge += arena.numActiveHugeAllocations();
-
-            for (PoolChunkListMetric list : arena.chunkLists()) {
-                for (PoolChunkMetric chunk : list) {
-                    int size = chunk.chunkSize();
-                    int used = size - chunk.freeBytes();
-
-                    totalAllocated += size;
-                    totalUsed += used;
-                }
-            }
-        }
-
-        m.put("brk_ml_cache_pool_allocated", totalAllocated);
-        m.put("brk_ml_cache_pool_used", totalUsed);
-        m.put("brk_ml_cache_pool_active_allocations", activeAllocations);
-        m.put("brk_ml_cache_pool_active_allocations_small", activeAllocationsSmall);
-        m.put("brk_ml_cache_pool_active_allocations_normal", activeAllocationsNormal);
-        m.put("brk_ml_cache_pool_active_allocations_huge", activeAllocationsHuge);
+        var allocatorStats = new PooledByteBufAllocatorStats(RangeEntryCacheImpl.ALLOCATOR);
+        m.put("brk_ml_cache_pool_allocated", allocatorStats.totalAllocated);
+        m.put("brk_ml_cache_pool_used", allocatorStats.totalUsed);
+        m.put("brk_ml_cache_pool_active_allocations", allocatorStats.activeAllocations);
+        m.put("brk_ml_cache_pool_active_allocations_small", allocatorStats.activeAllocationsSmall);
+        m.put("brk_ml_cache_pool_active_allocations_normal", allocatorStats.activeAllocationsNormal);
+        m.put("brk_ml_cache_pool_active_allocations_huge", allocatorStats.activeAllocationsHuge);
 
         metrics.clear();
         metrics.add(m);
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/storage/ManagedLedgerStorage.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/storage/ManagedLedgerStorage.java
index 0b5a102..944d2ba 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/storage/ManagedLedgerStorage.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/storage/ManagedLedgerStorage.java
@@ -19,6 +19,7 @@
 package org.apache.pulsar.broker.storage;
 
 import io.netty.channel.EventLoopGroup;
+import io.opentelemetry.api.OpenTelemetry;
 import java.io.IOException;
 import org.apache.bookkeeper.client.BookKeeper;
 import org.apache.bookkeeper.mledger.ManagedLedgerFactory;
@@ -47,7 +48,8 @@
     void initialize(ServiceConfiguration conf,
                     MetadataStoreExtended metadataStore,
                     BookKeeperClientFactory bookkeeperProvider,
-                    EventLoopGroup eventLoopGroup) throws Exception;
+                    EventLoopGroup eventLoopGroup,
+                    OpenTelemetry openTelemetry) throws Exception;
 
     /**
      * Return the factory to create {@link ManagedLedgerFactory}.
@@ -87,11 +89,12 @@
     static ManagedLedgerStorage create(ServiceConfiguration conf,
                                        MetadataStoreExtended metadataStore,
                                        BookKeeperClientFactory bkProvider,
-                                       EventLoopGroup eventLoopGroup) throws Exception {
+                                       EventLoopGroup eventLoopGroup,
+                                       OpenTelemetry openTelemetry) throws Exception {
         ManagedLedgerStorage storage =
                 Reflections.createInstance(conf.getManagedLedgerStorageClassName(), ManagedLedgerStorage.class,
                         Thread.currentThread().getContextClassLoader());
-        storage.initialize(conf, metadataStore, bkProvider, eventLoopGroup);
+        storage.initialize(conf, metadataStore, bkProvider, eventLoopGroup, openTelemetry);
         return storage;
     }
 
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/OpenTelemetryManagedLedgerCacheStatsTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/OpenTelemetryManagedLedgerCacheStatsTest.java
new file mode 100644
index 0000000..c3a4a2e
--- /dev/null
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/OpenTelemetryManagedLedgerCacheStatsTest.java
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.stats;
+
+import static org.apache.bookkeeper.mledger.OpenTelemetryManagedLedgerCacheStats.CACHE_ENTRY_COUNTER;
+import static org.apache.bookkeeper.mledger.OpenTelemetryManagedLedgerCacheStats.CACHE_EVICTION_OPERATION_COUNTER;
+import static org.apache.bookkeeper.mledger.OpenTelemetryManagedLedgerCacheStats.CACHE_OPERATION_BYTES_COUNTER;
+import static org.apache.bookkeeper.mledger.OpenTelemetryManagedLedgerCacheStats.CACHE_OPERATION_COUNTER;
+import static org.apache.bookkeeper.mledger.OpenTelemetryManagedLedgerCacheStats.CACHE_POOL_ACTIVE_ALLOCATION_COUNTER;
+import static org.apache.bookkeeper.mledger.OpenTelemetryManagedLedgerCacheStats.CACHE_POOL_ACTIVE_ALLOCATION_SIZE_COUNTER;
+import static org.apache.bookkeeper.mledger.OpenTelemetryManagedLedgerCacheStats.CACHE_SIZE_COUNTER;
+import static org.apache.bookkeeper.mledger.OpenTelemetryManagedLedgerCacheStats.MANAGED_LEDGER_COUNTER;
+import static org.apache.pulsar.broker.stats.BrokerOpenTelemetryTestUtil.assertMetricLongSumValue;
+import static org.assertj.core.api.Assertions.assertThat;
+import io.opentelemetry.api.common.Attributes;
+import lombok.Cleanup;
+import org.apache.pulsar.broker.BrokerTestUtil;
+import org.apache.pulsar.broker.service.BrokerTestBase;
+import org.apache.pulsar.broker.testcontext.PulsarTestContext;
+import org.apache.pulsar.client.api.SubscriptionInitialPosition;
+import org.apache.pulsar.opentelemetry.OpenTelemetryAttributes.CacheEntryStatus;
+import org.apache.pulsar.opentelemetry.OpenTelemetryAttributes.CacheOperationStatus;
+import org.apache.pulsar.opentelemetry.OpenTelemetryAttributes.PoolArenaType;
+import org.apache.pulsar.opentelemetry.OpenTelemetryAttributes.PoolChunkAllocationType;
+import org.awaitility.Awaitility;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+public class OpenTelemetryManagedLedgerCacheStatsTest extends BrokerTestBase {
+
+    @BeforeMethod(alwaysRun = true)
+    @Override
+    protected void setup() throws Exception {
+        super.baseSetup();
+    }
+
+    @AfterMethod(alwaysRun = true)
+    @Override
+    protected void cleanup() throws Exception {
+        super.internalCleanup();
+    }
+
+    @Override
+    protected void customizeMainPulsarTestContextBuilder(PulsarTestContext.Builder builder) {
+        super.customizeMainPulsarTestContextBuilder(builder);
+        builder.enableOpenTelemetry(true);
+    }
+
+    @Test
+    public void testManagedLedgerCacheStats() throws Exception {
+        var topicName = BrokerTestUtil.newUniqueName("persistent://prop/ns-abc/testManagedLedgerCacheStats");
+
+        @Cleanup
+        var producer = pulsarClient.newProducer().topic(topicName).create();
+
+        @Cleanup
+        var consumer1 = pulsarClient.newConsumer()
+                .topic(topicName)
+                .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
+                .subscriptionName(BrokerTestUtil.newUniqueName("sub"))
+                .subscribe();
+
+        @Cleanup
+        var consumer2 = pulsarClient.newConsumer()
+                .topic(topicName)
+                .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
+                .subscriptionName(BrokerTestUtil.newUniqueName("sub"))
+                .subscribe();
+
+        producer.send("test".getBytes());
+        consumer1.receive();
+
+        Awaitility.await().untilAsserted(() -> {
+            var metrics = pulsarTestContext.getOpenTelemetryMetricReader().collectAllMetrics();
+            assertMetricLongSumValue(metrics, CACHE_ENTRY_COUNTER, CacheEntryStatus.ACTIVE.attributes,
+                    value -> assertThat(value).isNotNegative());
+            assertMetricLongSumValue(metrics, CACHE_ENTRY_COUNTER, CacheEntryStatus.INSERTED.attributes,
+                    value -> assertThat(value).isPositive());
+            assertMetricLongSumValue(metrics, CACHE_ENTRY_COUNTER, CacheEntryStatus.EVICTED.attributes,
+                    value -> assertThat(value).isPositive());
+            assertMetricLongSumValue(metrics, CACHE_SIZE_COUNTER, Attributes.empty(),
+                    value -> assertThat(value).isNotNegative());
+        });
+
+        var metrics = pulsarTestContext.getOpenTelemetryMetricReader().collectAllMetrics();
+
+        assertMetricLongSumValue(metrics, MANAGED_LEDGER_COUNTER, Attributes.empty(), 2);
+        assertMetricLongSumValue(metrics, CACHE_EVICTION_OPERATION_COUNTER, Attributes.empty(), 0);
+
+        assertMetricLongSumValue(metrics, CACHE_OPERATION_COUNTER, CacheOperationStatus.HIT.attributes,
+                value -> assertThat(value).isPositive());
+        assertMetricLongSumValue(metrics, CACHE_OPERATION_BYTES_COUNTER, CacheOperationStatus.HIT.attributes,
+                value -> assertThat(value).isPositive());
+        assertMetricLongSumValue(metrics, CACHE_OPERATION_COUNTER, CacheOperationStatus.MISS.attributes,
+                value -> assertThat(value).isNotNegative());
+        assertMetricLongSumValue(metrics, CACHE_OPERATION_BYTES_COUNTER, CacheOperationStatus.MISS.attributes,
+                value -> assertThat(value).isNotNegative());
+
+        assertMetricLongSumValue(metrics, CACHE_POOL_ACTIVE_ALLOCATION_COUNTER, PoolArenaType.SMALL.attributes,
+                value -> assertThat(value).isNotNegative());
+        assertMetricLongSumValue(metrics, CACHE_POOL_ACTIVE_ALLOCATION_COUNTER, PoolArenaType.NORMAL.attributes,
+                value -> assertThat(value).isNotNegative());
+        assertMetricLongSumValue(metrics, CACHE_POOL_ACTIVE_ALLOCATION_COUNTER, PoolArenaType.HUGE.attributes,
+                value -> assertThat(value).isNotNegative());
+        assertMetricLongSumValue(metrics, CACHE_POOL_ACTIVE_ALLOCATION_SIZE_COUNTER,
+                PoolChunkAllocationType.ALLOCATED.attributes, value -> assertThat(value).isNotNegative());
+        assertMetricLongSumValue(metrics, CACHE_POOL_ACTIVE_ALLOCATION_SIZE_COUNTER,
+                PoolChunkAllocationType.USED.attributes, value -> assertThat(value).isNotNegative());
+    }
+}
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/testcontext/PulsarTestContext.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/testcontext/PulsarTestContext.java
index 09cd4f7..3d79a17 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/testcontext/PulsarTestContext.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/testcontext/PulsarTestContext.java
@@ -21,6 +21,7 @@
 
 import com.google.common.util.concurrent.MoreExecutors;
 import io.netty.channel.EventLoopGroup;
+import io.opentelemetry.api.OpenTelemetry;
 import io.opentelemetry.sdk.autoconfigure.AutoConfiguredOpenTelemetrySdkBuilder;
 import io.opentelemetry.sdk.testing.exporter.InMemoryMetricReader;
 import java.io.IOException;
@@ -843,9 +844,8 @@
 
             @Override
             public void initialize(ServiceConfiguration conf, MetadataStoreExtended metadataStore,
-                                   BookKeeperClientFactory bookkeeperProvider, EventLoopGroup eventLoopGroup)
-                    throws Exception {
-
+                                   BookKeeperClientFactory bookkeeperProvider, EventLoopGroup eventLoopGroup,
+                                   OpenTelemetry openTelemetry) {
             }
 
             @Override
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/SequenceIdWithErrorTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/SequenceIdWithErrorTest.java
index 7d330bb..1395424 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/SequenceIdWithErrorTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/SequenceIdWithErrorTest.java
@@ -21,6 +21,7 @@
 import static org.testng.Assert.assertEquals;
 import io.netty.channel.EventLoopGroup;
 import io.netty.channel.nio.NioEventLoopGroup;
+import io.opentelemetry.api.OpenTelemetry;
 import java.util.Collections;
 import lombok.Cleanup;
 import org.apache.bookkeeper.mledger.ManagedLedger;
@@ -60,7 +61,7 @@
         EventLoopGroup eventLoopGroup = new NioEventLoopGroup(1);
         ManagedLedgerClientFactory clientFactory = new ManagedLedgerClientFactory();
         clientFactory.initialize(pulsar.getConfiguration(), pulsar.getLocalMetadataStore(),
-                pulsar.getBookKeeperClientFactory(), eventLoopGroup);
+                pulsar.getBookKeeperClientFactory(), eventLoopGroup, OpenTelemetry.noop());
         ManagedLedgerFactory mlFactory = clientFactory.getManagedLedgerFactory();
         ManagedLedger ml = mlFactory.open(TopicName.get(topicName).getPersistenceNamingEncoding());
         ml.close();
diff --git a/pulsar-opentelemetry/src/main/java/org/apache/pulsar/opentelemetry/Constants.java b/pulsar-opentelemetry/src/main/java/org/apache/pulsar/opentelemetry/Constants.java
new file mode 100644
index 0000000..6d61caf
--- /dev/null
+++ b/pulsar-opentelemetry/src/main/java/org/apache/pulsar/opentelemetry/Constants.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.opentelemetry;
+
+/**
+ * Common OpenTelemetry constants to be used by Pulsar components.
+ */
+public interface Constants {
+
+    String BROKER_INSTRUMENTATION_SCOPE_NAME = "org.apache.pulsar.broker";
+
+}
diff --git a/pulsar-opentelemetry/src/main/java/org/apache/pulsar/opentelemetry/OpenTelemetryAttributes.java b/pulsar-opentelemetry/src/main/java/org/apache/pulsar/opentelemetry/OpenTelemetryAttributes.java
index 31e527f..b530b50 100644
--- a/pulsar-opentelemetry/src/main/java/org/apache/pulsar/opentelemetry/OpenTelemetryAttributes.java
+++ b/pulsar-opentelemetry/src/main/java/org/apache/pulsar/opentelemetry/OpenTelemetryAttributes.java
@@ -164,4 +164,48 @@
         FAILURE;
         public final Attributes attributes = Attributes.of(PULSAR_CONNECTION_CREATE_STATUS, name().toLowerCase());
     }
+
+    /**
+     * The type of the pool arena.
+     */
+    AttributeKey<String> ML_POOL_ARENA_TYPE = AttributeKey.stringKey("pulsar.managed_ledger.pool.arena.type");
+    enum PoolArenaType {
+        SMALL,
+        NORMAL,
+        HUGE;
+        public final Attributes attributes = Attributes.of(ML_POOL_ARENA_TYPE, name().toLowerCase());
+    }
+
+    /**
+     * The type of the pool chunk allocation.
+     */
+    AttributeKey<String> ML_POOL_CHUNK_ALLOCATION_TYPE =
+            AttributeKey.stringKey("pulsar.managed_ledger.pool.chunk.allocation.type");
+    enum PoolChunkAllocationType {
+        ALLOCATED,
+        USED;
+        public final Attributes attributes = Attributes.of(ML_POOL_CHUNK_ALLOCATION_TYPE, name().toLowerCase());
+    }
+
+    /**
+     * The status of the cache entry.
+     */
+    AttributeKey<String> ML_CACHE_ENTRY_STATUS = AttributeKey.stringKey("pulsar.managed_ledger.cache.entry.status");
+    enum CacheEntryStatus {
+        ACTIVE,
+        EVICTED,
+        INSERTED;
+        public final Attributes attributes = Attributes.of(ML_CACHE_ENTRY_STATUS, name().toLowerCase());
+    }
+
+    /**
+     * The result of the cache operation.
+     */
+    AttributeKey<String> ML_CACHE_OPERATION_STATUS =
+            AttributeKey.stringKey("pulsar.managed_ledger.cache.operation.status");
+    enum CacheOperationStatus {
+        HIT,
+        MISS;
+        public final Attributes attributes = Attributes.of(ML_CACHE_OPERATION_STATUS, name().toLowerCase());
+    }
 }