[feat][broker] PIP-264: Add managed ledger cache metrics (#22898)
diff --git a/managed-ledger/pom.xml b/managed-ledger/pom.xml
index d8b3122..60a4eda 100644
--- a/managed-ledger/pom.xml
+++ b/managed-ledger/pom.xml
@@ -72,6 +72,12 @@
</dependency>
<dependency>
+ <groupId>${project.groupId}</groupId>
+ <artifactId>pulsar-opentelemetry</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+
+ <dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
</dependency>
@@ -121,6 +127,12 @@
</dependency>
<dependency>
+ <groupId>io.opentelemetry</groupId>
+ <artifactId>opentelemetry-sdk-testing</artifactId>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
</dependency>
diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerFactoryMXBean.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerFactoryMXBean.java
index 35c26c5..43e8196 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerFactoryMXBean.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerFactoryMXBean.java
@@ -48,26 +48,51 @@
double getCacheHitsRate();
/**
+ * Cumulative number of cache hits.
+ */
+ long getCacheHitsTotal();
+
+ /**
* Get the number of cache misses per second.
*/
double getCacheMissesRate();
/**
+ * Cumulative number of cache misses.
+ */
+ long getCacheMissesTotal();
+
+ /**
* Get the amount of data is retrieved from the cache in byte/s.
*/
double getCacheHitsThroughput();
/**
+ * Cumulative amount of data retrieved from the cache in bytes.
+ */
+ long getCacheHitsBytesTotal();
+
+ /**
* Get the amount of data is retrieved from the bookkeeper in byte/s.
*/
double getCacheMissesThroughput();
/**
+ * Cumulative amount of data retrieved from the bookkeeper in bytes.
+ */
+ long getCacheMissesBytesTotal();
+
+ /**
* Get the number of cache evictions during the last minute.
*/
long getNumberOfCacheEvictions();
/**
+ * Cumulative number of cache evictions.
+ */
+ long getNumberOfCacheEvictionsTotal();
+
+ /**
* Cumulative number of entries inserted into the cache.
*/
long getCacheInsertedEntriesCount();
diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/OpenTelemetryManagedLedgerCacheStats.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/OpenTelemetryManagedLedgerCacheStats.java
new file mode 100644
index 0000000..13e7ed6
--- /dev/null
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/OpenTelemetryManagedLedgerCacheStats.java
@@ -0,0 +1,172 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.bookkeeper.mledger;
+
+import io.opentelemetry.api.OpenTelemetry;
+import io.opentelemetry.api.metrics.BatchCallback;
+import io.opentelemetry.api.metrics.ObservableLongMeasurement;
+import org.apache.bookkeeper.mledger.impl.ManagedLedgerFactoryImpl;
+import org.apache.bookkeeper.mledger.impl.cache.PooledByteBufAllocatorStats;
+import org.apache.bookkeeper.mledger.impl.cache.RangeEntryCacheImpl;
+import org.apache.pulsar.opentelemetry.Constants;
+import org.apache.pulsar.opentelemetry.OpenTelemetryAttributes.CacheEntryStatus;
+import org.apache.pulsar.opentelemetry.OpenTelemetryAttributes.CacheOperationStatus;
+import org.apache.pulsar.opentelemetry.OpenTelemetryAttributes.PoolArenaType;
+import org.apache.pulsar.opentelemetry.OpenTelemetryAttributes.PoolChunkAllocationType;
+
+public class OpenTelemetryManagedLedgerCacheStats implements AutoCloseable {
+
+ // Replaces pulsar_ml_count
+ public static final String MANAGED_LEDGER_COUNTER = "pulsar.broker.managed_ledger.count";
+ private final ObservableLongMeasurement managedLedgerCounter;
+
+ // Replaces pulsar_ml_cache_evictions
+ public static final String CACHE_EVICTION_OPERATION_COUNTER = "pulsar.broker.managed_ledger.cache.eviction.count";
+ private final ObservableLongMeasurement cacheEvictionOperationCounter;
+
+ // Replaces 'pulsar_ml_cache_entries',
+ // 'pulsar_ml_cache_inserted_entries_total',
+ // 'pulsar_ml_cache_evicted_entries_total'
+ public static final String CACHE_ENTRY_COUNTER = "pulsar.broker.managed_ledger.cache.entry.count";
+ private final ObservableLongMeasurement cacheEntryCounter;
+
+ // Replaces pulsar_ml_cache_used_size
+ public static final String CACHE_SIZE_COUNTER = "pulsar.broker.managed_ledger.cache.entry.size";
+ private final ObservableLongMeasurement cacheSizeCounter;
+
+ // Replaces pulsar_ml_cache_hits_rate, pulsar_ml_cache_misses_rate
+ public static final String CACHE_OPERATION_COUNTER = "pulsar.broker.managed_ledger.cache.operation.count";
+ private final ObservableLongMeasurement cacheOperationCounter;
+
+ // Replaces pulsar_ml_cache_hits_throughput, pulsar_ml_cache_misses_throughput
+ public static final String CACHE_OPERATION_BYTES_COUNTER = "pulsar.broker.managed_ledger.cache.operation.size";
+ private final ObservableLongMeasurement cacheOperationBytesCounter;
+
+ // Replaces 'pulsar_ml_cache_pool_active_allocations',
+ // 'pulsar_ml_cache_pool_active_allocations_huge',
+ // 'pulsar_ml_cache_pool_active_allocations_normal',
+ // 'pulsar_ml_cache_pool_active_allocations_small'
+ public static final String CACHE_POOL_ACTIVE_ALLOCATION_COUNTER =
+ "pulsar.broker.managed_ledger.cache.pool.allocation.active.count";
+ private final ObservableLongMeasurement cachePoolActiveAllocationCounter;
+
+ // Replaces ['pulsar_ml_cache_pool_allocated', 'pulsar_ml_cache_pool_used']
+ public static final String CACHE_POOL_ACTIVE_ALLOCATION_SIZE_COUNTER =
+ "pulsar.broker.managed_ledger.cache.pool.allocation.size";
+ private final ObservableLongMeasurement cachePoolActiveAllocationSizeCounter;
+
+ private final BatchCallback batchCallback;
+
+ public OpenTelemetryManagedLedgerCacheStats(OpenTelemetry openTelemetry, ManagedLedgerFactoryImpl factory) {
+ var meter = openTelemetry.getMeter(Constants.BROKER_INSTRUMENTATION_SCOPE_NAME);
+
+ managedLedgerCounter = meter
+ .upDownCounterBuilder(MANAGED_LEDGER_COUNTER)
+ .setUnit("{managed_ledger}")
+ .setDescription("The total number of managed ledgers.")
+ .buildObserver();
+
+ cacheEvictionOperationCounter = meter
+ .counterBuilder(CACHE_EVICTION_OPERATION_COUNTER)
+ .setUnit("{eviction}")
+ .setDescription("The total number of cache eviction operations.")
+ .buildObserver();
+
+ cacheEntryCounter = meter
+ .upDownCounterBuilder(CACHE_ENTRY_COUNTER)
+ .setUnit("{entry}")
+ .setDescription("The number of entries in the entry cache.")
+ .buildObserver();
+
+ cacheSizeCounter = meter
+ .upDownCounterBuilder(CACHE_SIZE_COUNTER)
+ .setUnit("{By}")
+ .setDescription("The byte amount of entries stored in the entry cache.")
+ .buildObserver();
+
+ cacheOperationCounter = meter
+ .counterBuilder(CACHE_OPERATION_COUNTER)
+ .setUnit("{entry}")
+ .setDescription("The number of cache operations.")
+ .buildObserver();
+
+ cacheOperationBytesCounter = meter
+ .counterBuilder(CACHE_OPERATION_BYTES_COUNTER)
+ .setUnit("{By}")
+ .setDescription("The byte amount of data retrieved from cache operations.")
+ .buildObserver();
+
+ cachePoolActiveAllocationCounter = meter
+ .upDownCounterBuilder(CACHE_POOL_ACTIVE_ALLOCATION_COUNTER)
+ .setUnit("{allocation}")
+ .setDescription("The number of currently active allocations in the direct arena.")
+ .buildObserver();
+
+ cachePoolActiveAllocationSizeCounter = meter
+ .upDownCounterBuilder(CACHE_POOL_ACTIVE_ALLOCATION_SIZE_COUNTER)
+ .setUnit("{By}")
+ .setDescription("The memory allocated in the direct arena.")
+ .buildObserver();
+
+
+ batchCallback = meter.batchCallback(() -> recordMetrics(factory),
+ managedLedgerCounter,
+ cacheEvictionOperationCounter,
+ cacheEntryCounter,
+ cacheSizeCounter,
+ cacheOperationCounter,
+ cacheOperationBytesCounter,
+ cachePoolActiveAllocationCounter,
+ cachePoolActiveAllocationSizeCounter);
+ }
+
+ @Override
+ public void close() {
+ batchCallback.close();
+ }
+
+ private void recordMetrics(ManagedLedgerFactoryImpl factory) {
+ var stats = factory.getCacheStats();
+
+ managedLedgerCounter.record(stats.getNumberOfManagedLedgers());
+ cacheEvictionOperationCounter.record(stats.getNumberOfCacheEvictionsTotal());
+
+ var entriesOut = stats.getCacheEvictedEntriesCount();
+ var entriesIn = stats.getCacheInsertedEntriesCount();
+ var entriesActive = entriesIn - entriesOut;
+ cacheEntryCounter.record(entriesActive, CacheEntryStatus.ACTIVE.attributes);
+ cacheEntryCounter.record(entriesIn, CacheEntryStatus.INSERTED.attributes);
+ cacheEntryCounter.record(entriesOut, CacheEntryStatus.EVICTED.attributes);
+ cacheSizeCounter.record(stats.getCacheUsedSize());
+
+ cacheOperationCounter.record(stats.getCacheHitsTotal(), CacheOperationStatus.HIT.attributes);
+ cacheOperationBytesCounter.record(stats.getCacheHitsBytesTotal(), CacheOperationStatus.HIT.attributes);
+ cacheOperationCounter.record(stats.getCacheMissesTotal(), CacheOperationStatus.MISS.attributes);
+ cacheOperationBytesCounter.record(stats.getCacheMissesBytesTotal(), CacheOperationStatus.MISS.attributes);
+
+ var allocatorStats = new PooledByteBufAllocatorStats(RangeEntryCacheImpl.ALLOCATOR);
+ cachePoolActiveAllocationCounter.record(allocatorStats.activeAllocationsSmall, PoolArenaType.SMALL.attributes);
+ cachePoolActiveAllocationCounter.record(allocatorStats.activeAllocationsNormal,
+ PoolArenaType.NORMAL.attributes);
+ cachePoolActiveAllocationCounter.record(allocatorStats.activeAllocationsHuge, PoolArenaType.HUGE.attributes);
+ cachePoolActiveAllocationSizeCounter.record(allocatorStats.totalAllocated,
+ PoolChunkAllocationType.ALLOCATED.attributes);
+ cachePoolActiveAllocationSizeCounter.record(allocatorStats.totalUsed, PoolChunkAllocationType.USED.attributes);
+ }
+}
\ No newline at end of file
diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java
index 0b0f66d..fc291b8 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java
@@ -24,6 +24,7 @@
import com.google.common.base.Predicates;
import com.google.common.collect.Maps;
import io.netty.util.concurrent.DefaultThreadFactory;
+import io.opentelemetry.api.OpenTelemetry;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
@@ -67,6 +68,7 @@
import org.apache.bookkeeper.mledger.ManagedLedgerInfo.MessageRangeInfo;
import org.apache.bookkeeper.mledger.ManagedLedgerInfo.PositionInfo;
import org.apache.bookkeeper.mledger.MetadataCompressionConfig;
+import org.apache.bookkeeper.mledger.OpenTelemetryManagedLedgerCacheStats;
import org.apache.bookkeeper.mledger.Position;
import org.apache.bookkeeper.mledger.ReadOnlyCursor;
import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl.ManagedLedgerInitializeLedgerCallback;
@@ -118,6 +120,8 @@
private volatile long cacheEvictionTimeThresholdNanos;
private final MetadataStore metadataStore;
+ private final OpenTelemetryManagedLedgerCacheStats openTelemetryCacheStats;
+
//indicate whether shutdown() is called.
private volatile boolean closed;
@@ -149,7 +153,7 @@
ManagedLedgerFactoryConfig config)
throws Exception {
this(metadataStore, new DefaultBkFactory(bkClientConfiguration),
- true /* isBookkeeperManaged */, config, NullStatsLogger.INSTANCE);
+ true /* isBookkeeperManaged */, config, NullStatsLogger.INSTANCE, OpenTelemetry.noop());
}
public ManagedLedgerFactoryImpl(MetadataStoreExtended metadataStore, BookKeeper bookKeeper)
@@ -168,21 +172,24 @@
ManagedLedgerFactoryConfig config)
throws Exception {
this(metadataStore, bookKeeperGroupFactory, false /* isBookkeeperManaged */,
- config, NullStatsLogger.INSTANCE);
+ config, NullStatsLogger.INSTANCE, OpenTelemetry.noop());
}
public ManagedLedgerFactoryImpl(MetadataStoreExtended metadataStore,
BookkeeperFactoryForCustomEnsemblePlacementPolicy bookKeeperGroupFactory,
- ManagedLedgerFactoryConfig config, StatsLogger statsLogger)
+ ManagedLedgerFactoryConfig config, StatsLogger statsLogger,
+ OpenTelemetry openTelemetry)
throws Exception {
this(metadataStore, bookKeeperGroupFactory, false /* isBookkeeperManaged */,
- config, statsLogger);
+ config, statsLogger, openTelemetry);
}
private ManagedLedgerFactoryImpl(MetadataStoreExtended metadataStore,
BookkeeperFactoryForCustomEnsemblePlacementPolicy bookKeeperGroupFactory,
boolean isBookkeeperManaged,
- ManagedLedgerFactoryConfig config, StatsLogger statsLogger) throws Exception {
+ ManagedLedgerFactoryConfig config,
+ StatsLogger statsLogger,
+ OpenTelemetry openTelemetry) throws Exception {
MetadataCompressionConfig compressionConfigForManagedLedgerInfo =
config.getCompressionConfigForManagedLedgerInfo();
MetadataCompressionConfig compressionConfigForManagedCursorInfo =
@@ -220,6 +227,8 @@
closed = false;
metadataStore.registerSessionListener(this::handleMetadataStoreNotification);
+
+ openTelemetryCacheStats = new OpenTelemetryManagedLedgerCacheStats(openTelemetry, this);
}
static class DefaultBkFactory implements BookkeeperFactoryForCustomEnsemblePlacementPolicy {
@@ -611,6 +620,7 @@
}));
}).thenAcceptAsync(__ -> {
//wait for tasks in scheduledExecutor executed.
+ openTelemetryCacheStats.close();
scheduledExecutor.shutdownNow();
entryCacheManager.clear();
});
diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryMBeanImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryMBeanImpl.java
index cf3d714..a3038a0 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryMBeanImpl.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryMBeanImpl.java
@@ -100,25 +100,50 @@
}
@Override
+ public long getCacheHitsTotal() {
+ return cacheHits.getTotalCount();
+ }
+
+ @Override
public double getCacheMissesRate() {
return cacheMisses.getRate();
}
@Override
+ public long getCacheMissesTotal() {
+ return cacheMisses.getTotalCount();
+ }
+
+ @Override
public double getCacheHitsThroughput() {
return cacheHits.getValueRate();
}
@Override
+ public long getCacheHitsBytesTotal() {
+ return cacheHits.getTotalValue();
+ }
+
+ @Override
public double getCacheMissesThroughput() {
return cacheMisses.getValueRate();
}
@Override
+ public long getCacheMissesBytesTotal() {
+ return cacheMisses.getTotalValue();
+ }
+
+ @Override
public long getNumberOfCacheEvictions() {
return cacheEvictions.getCount();
}
+ @Override
+ public long getNumberOfCacheEvictionsTotal() {
+ return cacheEvictions.getTotalCount();
+ }
+
public long getCacheInsertedEntriesCount() {
return insertedEntryCount.sum();
}
diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/PooledByteBufAllocatorStats.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/PooledByteBufAllocatorStats.java
new file mode 100644
index 0000000..4f6a18c
--- /dev/null
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/PooledByteBufAllocatorStats.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.bookkeeper.mledger.impl.cache;
+
+import io.netty.buffer.PooledByteBufAllocator;
+import lombok.Value;
+
+@Value
+public class PooledByteBufAllocatorStats {
+
+ public long activeAllocations;
+ public long activeAllocationsSmall;
+ public long activeAllocationsNormal;
+ public long activeAllocationsHuge;
+
+ public long totalAllocated;
+ public long totalUsed;
+
+ public PooledByteBufAllocatorStats(PooledByteBufAllocator allocator) {
+ long activeAllocations = 0;
+ long activeAllocationsSmall = 0;
+ long activeAllocationsNormal = 0;
+ long activeAllocationsHuge = 0;
+ long totalAllocated = 0;
+ long totalUsed = 0;
+
+ for (var arena : allocator.metric().directArenas()) {
+ activeAllocations += arena.numActiveAllocations();
+ activeAllocationsSmall += arena.numActiveSmallAllocations();
+ activeAllocationsNormal += arena.numActiveNormalAllocations();
+ activeAllocationsHuge += arena.numActiveHugeAllocations();
+
+ for (var list : arena.chunkLists()) {
+ for (var chunk : list) {
+ int size = chunk.chunkSize();
+ int used = size - chunk.freeBytes();
+
+ totalAllocated += size;
+ totalUsed += used;
+ }
+ }
+ }
+
+ this.activeAllocations = activeAllocations;
+ this.activeAllocationsSmall = activeAllocationsSmall;
+ this.activeAllocationsNormal = activeAllocationsNormal;
+ this.activeAllocationsHuge = activeAllocationsHuge;
+
+ this.totalAllocated = totalAllocated;
+ this.totalUsed = totalUsed;
+ }
+}
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/ManagedLedgerClientFactory.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/ManagedLedgerClientFactory.java
index 6ed95f1..9bbc285 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/ManagedLedgerClientFactory.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/ManagedLedgerClientFactory.java
@@ -22,6 +22,7 @@
import com.github.benmanes.caffeine.cache.Caffeine;
import com.google.common.annotations.VisibleForTesting;
import io.netty.channel.EventLoopGroup;
+import io.opentelemetry.api.OpenTelemetry;
import java.io.IOException;
import java.util.Map;
import java.util.Optional;
@@ -54,9 +55,11 @@
bkEnsemblePolicyToBkClientMap = Caffeine.newBuilder().buildAsync();
private StatsProvider statsProvider = new NullStatsProvider();
+ @Override
public void initialize(ServiceConfiguration conf, MetadataStoreExtended metadataStore,
BookKeeperClientFactory bookkeeperProvider,
- EventLoopGroup eventLoopGroup) throws Exception {
+ EventLoopGroup eventLoopGroup,
+ OpenTelemetry openTelemetry) throws Exception {
ManagedLedgerFactoryConfig managedLedgerFactoryConfig = new ManagedLedgerFactoryConfig();
managedLedgerFactoryConfig.setMaxCacheSize(conf.getManagedLedgerCacheSizeMB() * 1024L * 1024L);
managedLedgerFactoryConfig.setCacheEvictionWatermark(conf.getManagedLedgerCacheEvictionWatermark());
@@ -109,7 +112,8 @@
try {
this.managedLedgerFactory =
- new ManagedLedgerFactoryImpl(metadataStore, bkFactory, managedLedgerFactoryConfig, statsLogger);
+ new ManagedLedgerFactoryImpl(metadataStore, bkFactory, managedLedgerFactoryConfig, statsLogger,
+ openTelemetry);
} catch (Exception e) {
statsProvider.stop();
defaultBkClient.close();
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
index 617afc6..4fa773d 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
@@ -1069,7 +1069,7 @@
protected ManagedLedgerStorage newManagedLedgerClientFactory() throws Exception {
return ManagedLedgerStorage.create(
config, localMetadataStore,
- bkClientFactory, ioEventLoopGroup
+ bkClientFactory, ioEventLoopGroup, openTelemetry.getOpenTelemetryService().getOpenTelemetry()
);
}
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/PulsarBrokerOpenTelemetry.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/PulsarBrokerOpenTelemetry.java
index c1bcfad..178da8b 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/PulsarBrokerOpenTelemetry.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/PulsarBrokerOpenTelemetry.java
@@ -26,6 +26,7 @@
import lombok.Getter;
import org.apache.pulsar.PulsarVersion;
import org.apache.pulsar.broker.ServiceConfiguration;
+import org.apache.pulsar.opentelemetry.Constants;
import org.apache.pulsar.opentelemetry.OpenTelemetryService;
public class PulsarBrokerOpenTelemetry implements Closeable {
@@ -46,7 +47,7 @@
.serviceVersion(PulsarVersion.getVersion())
.builderCustomizer(builderCustomizer)
.build();
- meter = openTelemetryService.getOpenTelemetry().getMeter("org.apache.pulsar.broker");
+ meter = openTelemetryService.getOpenTelemetry().getMeter(Constants.BROKER_INSTRUMENTATION_SCOPE_NAME);
}
@Override
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/metrics/ManagedLedgerCacheMetrics.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/metrics/ManagedLedgerCacheMetrics.java
index 890a37a..9eb4beb 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/metrics/ManagedLedgerCacheMetrics.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/stats/metrics/ManagedLedgerCacheMetrics.java
@@ -18,13 +18,10 @@
*/
package org.apache.pulsar.broker.stats.metrics;
-import io.netty.buffer.PoolArenaMetric;
-import io.netty.buffer.PoolChunkListMetric;
-import io.netty.buffer.PoolChunkMetric;
-import io.netty.buffer.PooledByteBufAllocator;
import java.util.ArrayList;
import java.util.List;
import org.apache.bookkeeper.mledger.ManagedLedgerFactoryMXBean;
+import org.apache.bookkeeper.mledger.impl.cache.PooledByteBufAllocatorStats;
import org.apache.bookkeeper.mledger.impl.cache.RangeEntryCacheImpl;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.common.stats.Metrics;
@@ -57,37 +54,13 @@
m.put("brk_ml_cache_hits_throughput", mlCacheStats.getCacheHitsThroughput());
m.put("brk_ml_cache_misses_throughput", mlCacheStats.getCacheMissesThroughput());
- PooledByteBufAllocator allocator = RangeEntryCacheImpl.ALLOCATOR;
- long activeAllocations = 0;
- long activeAllocationsSmall = 0;
- long activeAllocationsNormal = 0;
- long activeAllocationsHuge = 0;
- long totalAllocated = 0;
- long totalUsed = 0;
-
- for (PoolArenaMetric arena : allocator.metric().directArenas()) {
- activeAllocations += arena.numActiveAllocations();
- activeAllocationsSmall += arena.numActiveSmallAllocations();
- activeAllocationsNormal += arena.numActiveNormalAllocations();
- activeAllocationsHuge += arena.numActiveHugeAllocations();
-
- for (PoolChunkListMetric list : arena.chunkLists()) {
- for (PoolChunkMetric chunk : list) {
- int size = chunk.chunkSize();
- int used = size - chunk.freeBytes();
-
- totalAllocated += size;
- totalUsed += used;
- }
- }
- }
-
- m.put("brk_ml_cache_pool_allocated", totalAllocated);
- m.put("brk_ml_cache_pool_used", totalUsed);
- m.put("brk_ml_cache_pool_active_allocations", activeAllocations);
- m.put("brk_ml_cache_pool_active_allocations_small", activeAllocationsSmall);
- m.put("brk_ml_cache_pool_active_allocations_normal", activeAllocationsNormal);
- m.put("brk_ml_cache_pool_active_allocations_huge", activeAllocationsHuge);
+ var allocatorStats = new PooledByteBufAllocatorStats(RangeEntryCacheImpl.ALLOCATOR);
+ m.put("brk_ml_cache_pool_allocated", allocatorStats.totalAllocated);
+ m.put("brk_ml_cache_pool_used", allocatorStats.totalUsed);
+ m.put("brk_ml_cache_pool_active_allocations", allocatorStats.activeAllocations);
+ m.put("brk_ml_cache_pool_active_allocations_small", allocatorStats.activeAllocationsSmall);
+ m.put("brk_ml_cache_pool_active_allocations_normal", allocatorStats.activeAllocationsNormal);
+ m.put("brk_ml_cache_pool_active_allocations_huge", allocatorStats.activeAllocationsHuge);
metrics.clear();
metrics.add(m);
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/storage/ManagedLedgerStorage.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/storage/ManagedLedgerStorage.java
index 0b5a102..944d2ba 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/storage/ManagedLedgerStorage.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/storage/ManagedLedgerStorage.java
@@ -19,6 +19,7 @@
package org.apache.pulsar.broker.storage;
import io.netty.channel.EventLoopGroup;
+import io.opentelemetry.api.OpenTelemetry;
import java.io.IOException;
import org.apache.bookkeeper.client.BookKeeper;
import org.apache.bookkeeper.mledger.ManagedLedgerFactory;
@@ -47,7 +48,8 @@
void initialize(ServiceConfiguration conf,
MetadataStoreExtended metadataStore,
BookKeeperClientFactory bookkeeperProvider,
- EventLoopGroup eventLoopGroup) throws Exception;
+ EventLoopGroup eventLoopGroup,
+ OpenTelemetry openTelemetry) throws Exception;
/**
* Return the factory to create {@link ManagedLedgerFactory}.
@@ -87,11 +89,12 @@
static ManagedLedgerStorage create(ServiceConfiguration conf,
MetadataStoreExtended metadataStore,
BookKeeperClientFactory bkProvider,
- EventLoopGroup eventLoopGroup) throws Exception {
+ EventLoopGroup eventLoopGroup,
+ OpenTelemetry openTelemetry) throws Exception {
ManagedLedgerStorage storage =
Reflections.createInstance(conf.getManagedLedgerStorageClassName(), ManagedLedgerStorage.class,
Thread.currentThread().getContextClassLoader());
- storage.initialize(conf, metadataStore, bkProvider, eventLoopGroup);
+ storage.initialize(conf, metadataStore, bkProvider, eventLoopGroup, openTelemetry);
return storage;
}
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/OpenTelemetryManagedLedgerCacheStatsTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/OpenTelemetryManagedLedgerCacheStatsTest.java
new file mode 100644
index 0000000..c3a4a2e
--- /dev/null
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/OpenTelemetryManagedLedgerCacheStatsTest.java
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.stats;
+
+import static org.apache.bookkeeper.mledger.OpenTelemetryManagedLedgerCacheStats.CACHE_ENTRY_COUNTER;
+import static org.apache.bookkeeper.mledger.OpenTelemetryManagedLedgerCacheStats.CACHE_EVICTION_OPERATION_COUNTER;
+import static org.apache.bookkeeper.mledger.OpenTelemetryManagedLedgerCacheStats.CACHE_OPERATION_BYTES_COUNTER;
+import static org.apache.bookkeeper.mledger.OpenTelemetryManagedLedgerCacheStats.CACHE_OPERATION_COUNTER;
+import static org.apache.bookkeeper.mledger.OpenTelemetryManagedLedgerCacheStats.CACHE_POOL_ACTIVE_ALLOCATION_COUNTER;
+import static org.apache.bookkeeper.mledger.OpenTelemetryManagedLedgerCacheStats.CACHE_POOL_ACTIVE_ALLOCATION_SIZE_COUNTER;
+import static org.apache.bookkeeper.mledger.OpenTelemetryManagedLedgerCacheStats.CACHE_SIZE_COUNTER;
+import static org.apache.bookkeeper.mledger.OpenTelemetryManagedLedgerCacheStats.MANAGED_LEDGER_COUNTER;
+import static org.apache.pulsar.broker.stats.BrokerOpenTelemetryTestUtil.assertMetricLongSumValue;
+import static org.assertj.core.api.Assertions.assertThat;
+import io.opentelemetry.api.common.Attributes;
+import lombok.Cleanup;
+import org.apache.pulsar.broker.BrokerTestUtil;
+import org.apache.pulsar.broker.service.BrokerTestBase;
+import org.apache.pulsar.broker.testcontext.PulsarTestContext;
+import org.apache.pulsar.client.api.SubscriptionInitialPosition;
+import org.apache.pulsar.opentelemetry.OpenTelemetryAttributes.CacheEntryStatus;
+import org.apache.pulsar.opentelemetry.OpenTelemetryAttributes.CacheOperationStatus;
+import org.apache.pulsar.opentelemetry.OpenTelemetryAttributes.PoolArenaType;
+import org.apache.pulsar.opentelemetry.OpenTelemetryAttributes.PoolChunkAllocationType;
+import org.awaitility.Awaitility;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+public class OpenTelemetryManagedLedgerCacheStatsTest extends BrokerTestBase {
+
+ @BeforeMethod(alwaysRun = true)
+ @Override
+ protected void setup() throws Exception {
+ super.baseSetup();
+ }
+
+ @AfterMethod(alwaysRun = true)
+ @Override
+ protected void cleanup() throws Exception {
+ super.internalCleanup();
+ }
+
+ @Override
+ protected void customizeMainPulsarTestContextBuilder(PulsarTestContext.Builder builder) {
+ super.customizeMainPulsarTestContextBuilder(builder);
+ builder.enableOpenTelemetry(true);
+ }
+
+ @Test
+ public void testManagedLedgerCacheStats() throws Exception {
+ var topicName = BrokerTestUtil.newUniqueName("persistent://prop/ns-abc/testManagedLedgerCacheStats");
+
+ @Cleanup
+ var producer = pulsarClient.newProducer().topic(topicName).create();
+
+ @Cleanup
+ var consumer1 = pulsarClient.newConsumer()
+ .topic(topicName)
+ .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
+ .subscriptionName(BrokerTestUtil.newUniqueName("sub"))
+ .subscribe();
+
+ @Cleanup
+ var consumer2 = pulsarClient.newConsumer()
+ .topic(topicName)
+ .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
+ .subscriptionName(BrokerTestUtil.newUniqueName("sub"))
+ .subscribe();
+
+ producer.send("test".getBytes());
+ consumer1.receive();
+
+ Awaitility.await().untilAsserted(() -> {
+ var metrics = pulsarTestContext.getOpenTelemetryMetricReader().collectAllMetrics();
+ assertMetricLongSumValue(metrics, CACHE_ENTRY_COUNTER, CacheEntryStatus.ACTIVE.attributes,
+ value -> assertThat(value).isNotNegative());
+ assertMetricLongSumValue(metrics, CACHE_ENTRY_COUNTER, CacheEntryStatus.INSERTED.attributes,
+ value -> assertThat(value).isPositive());
+ assertMetricLongSumValue(metrics, CACHE_ENTRY_COUNTER, CacheEntryStatus.EVICTED.attributes,
+ value -> assertThat(value).isPositive());
+ assertMetricLongSumValue(metrics, CACHE_SIZE_COUNTER, Attributes.empty(),
+ value -> assertThat(value).isNotNegative());
+ });
+
+ var metrics = pulsarTestContext.getOpenTelemetryMetricReader().collectAllMetrics();
+
+ assertMetricLongSumValue(metrics, MANAGED_LEDGER_COUNTER, Attributes.empty(), 2);
+ assertMetricLongSumValue(metrics, CACHE_EVICTION_OPERATION_COUNTER, Attributes.empty(), 0);
+
+ assertMetricLongSumValue(metrics, CACHE_OPERATION_COUNTER, CacheOperationStatus.HIT.attributes,
+ value -> assertThat(value).isPositive());
+ assertMetricLongSumValue(metrics, CACHE_OPERATION_BYTES_COUNTER, CacheOperationStatus.HIT.attributes,
+ value -> assertThat(value).isPositive());
+ assertMetricLongSumValue(metrics, CACHE_OPERATION_COUNTER, CacheOperationStatus.MISS.attributes,
+ value -> assertThat(value).isNotNegative());
+ assertMetricLongSumValue(metrics, CACHE_OPERATION_BYTES_COUNTER, CacheOperationStatus.MISS.attributes,
+ value -> assertThat(value).isNotNegative());
+
+ assertMetricLongSumValue(metrics, CACHE_POOL_ACTIVE_ALLOCATION_COUNTER, PoolArenaType.SMALL.attributes,
+ value -> assertThat(value).isNotNegative());
+ assertMetricLongSumValue(metrics, CACHE_POOL_ACTIVE_ALLOCATION_COUNTER, PoolArenaType.NORMAL.attributes,
+ value -> assertThat(value).isNotNegative());
+ assertMetricLongSumValue(metrics, CACHE_POOL_ACTIVE_ALLOCATION_COUNTER, PoolArenaType.HUGE.attributes,
+ value -> assertThat(value).isNotNegative());
+ assertMetricLongSumValue(metrics, CACHE_POOL_ACTIVE_ALLOCATION_SIZE_COUNTER,
+ PoolChunkAllocationType.ALLOCATED.attributes, value -> assertThat(value).isNotNegative());
+ assertMetricLongSumValue(metrics, CACHE_POOL_ACTIVE_ALLOCATION_SIZE_COUNTER,
+ PoolChunkAllocationType.USED.attributes, value -> assertThat(value).isNotNegative());
+ }
+}
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/testcontext/PulsarTestContext.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/testcontext/PulsarTestContext.java
index 09cd4f7..3d79a17 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/testcontext/PulsarTestContext.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/testcontext/PulsarTestContext.java
@@ -21,6 +21,7 @@
import com.google.common.util.concurrent.MoreExecutors;
import io.netty.channel.EventLoopGroup;
+import io.opentelemetry.api.OpenTelemetry;
import io.opentelemetry.sdk.autoconfigure.AutoConfiguredOpenTelemetrySdkBuilder;
import io.opentelemetry.sdk.testing.exporter.InMemoryMetricReader;
import java.io.IOException;
@@ -843,9 +844,8 @@
@Override
public void initialize(ServiceConfiguration conf, MetadataStoreExtended metadataStore,
- BookKeeperClientFactory bookkeeperProvider, EventLoopGroup eventLoopGroup)
- throws Exception {
-
+ BookKeeperClientFactory bookkeeperProvider, EventLoopGroup eventLoopGroup,
+ OpenTelemetry openTelemetry) {
}
@Override
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/SequenceIdWithErrorTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/SequenceIdWithErrorTest.java
index 7d330bb..1395424 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/SequenceIdWithErrorTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/SequenceIdWithErrorTest.java
@@ -21,6 +21,7 @@
import static org.testng.Assert.assertEquals;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
+import io.opentelemetry.api.OpenTelemetry;
import java.util.Collections;
import lombok.Cleanup;
import org.apache.bookkeeper.mledger.ManagedLedger;
@@ -60,7 +61,7 @@
EventLoopGroup eventLoopGroup = new NioEventLoopGroup(1);
ManagedLedgerClientFactory clientFactory = new ManagedLedgerClientFactory();
clientFactory.initialize(pulsar.getConfiguration(), pulsar.getLocalMetadataStore(),
- pulsar.getBookKeeperClientFactory(), eventLoopGroup);
+ pulsar.getBookKeeperClientFactory(), eventLoopGroup, OpenTelemetry.noop());
ManagedLedgerFactory mlFactory = clientFactory.getManagedLedgerFactory();
ManagedLedger ml = mlFactory.open(TopicName.get(topicName).getPersistenceNamingEncoding());
ml.close();
diff --git a/pulsar-opentelemetry/src/main/java/org/apache/pulsar/opentelemetry/Constants.java b/pulsar-opentelemetry/src/main/java/org/apache/pulsar/opentelemetry/Constants.java
new file mode 100644
index 0000000..6d61caf
--- /dev/null
+++ b/pulsar-opentelemetry/src/main/java/org/apache/pulsar/opentelemetry/Constants.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.opentelemetry;
+
+/**
+ * Common OpenTelemetry constants to be used by Pulsar components.
+ */
+public interface Constants {
+
+ String BROKER_INSTRUMENTATION_SCOPE_NAME = "org.apache.pulsar.broker";
+
+}
diff --git a/pulsar-opentelemetry/src/main/java/org/apache/pulsar/opentelemetry/OpenTelemetryAttributes.java b/pulsar-opentelemetry/src/main/java/org/apache/pulsar/opentelemetry/OpenTelemetryAttributes.java
index 31e527f..b530b50 100644
--- a/pulsar-opentelemetry/src/main/java/org/apache/pulsar/opentelemetry/OpenTelemetryAttributes.java
+++ b/pulsar-opentelemetry/src/main/java/org/apache/pulsar/opentelemetry/OpenTelemetryAttributes.java
@@ -164,4 +164,48 @@
FAILURE;
public final Attributes attributes = Attributes.of(PULSAR_CONNECTION_CREATE_STATUS, name().toLowerCase());
}
+
+ /**
+ * The type of the pool arena.
+ */
+ AttributeKey<String> ML_POOL_ARENA_TYPE = AttributeKey.stringKey("pulsar.managed_ledger.pool.arena.type");
+ enum PoolArenaType {
+ SMALL,
+ NORMAL,
+ HUGE;
+ public final Attributes attributes = Attributes.of(ML_POOL_ARENA_TYPE, name().toLowerCase());
+ }
+
+ /**
+ * The type of the pool chunk allocation.
+ */
+ AttributeKey<String> ML_POOL_CHUNK_ALLOCATION_TYPE =
+ AttributeKey.stringKey("pulsar.managed_ledger.pool.chunk.allocation.type");
+ enum PoolChunkAllocationType {
+ ALLOCATED,
+ USED;
+ public final Attributes attributes = Attributes.of(ML_POOL_CHUNK_ALLOCATION_TYPE, name().toLowerCase());
+ }
+
+ /**
+ * The status of the cache entry.
+ */
+ AttributeKey<String> ML_CACHE_ENTRY_STATUS = AttributeKey.stringKey("pulsar.managed_ledger.cache.entry.status");
+ enum CacheEntryStatus {
+ ACTIVE,
+ EVICTED,
+ INSERTED;
+ public final Attributes attributes = Attributes.of(ML_CACHE_ENTRY_STATUS, name().toLowerCase());
+ }
+
+ /**
+ * The result of the cache operation.
+ */
+ AttributeKey<String> ML_CACHE_OPERATION_STATUS =
+ AttributeKey.stringKey("pulsar.managed_ledger.cache.operation.status");
+ enum CacheOperationStatus {
+ HIT,
+ MISS;
+ public final Attributes attributes = Attributes.of(ML_CACHE_OPERATION_STATUS, name().toLowerCase());
+ }
}