blob: ed6e79fdbb7e0aeb86b634eb70fb60eacb6d133d [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.broker.stats.metrics;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import org.apache.bookkeeper.mledger.ManagedLedgerFactoryMXBean;
import org.apache.bookkeeper.mledger.impl.ManagedLedgerFactoryImpl;
import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
import org.apache.bookkeeper.mledger.impl.ManagedLedgerMBeanImpl;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.common.policies.data.TopicStats;
import org.apache.pulsar.common.stats.Metrics;
abstract class AbstractMetrics {
protected static final String METRICS_VERSION_SUFFIX = "v2";
protected static final Pattern V2_LEDGER_NAME_PATTERN = Pattern.compile("^(([^/]+)/([^/]+)/([^/]+))/(.*)$");
protected static final double[] ENTRY_LATENCY_BUCKETS_MS =
new double[ManagedLedgerMBeanImpl.ENTRY_LATENCY_BUCKETS_USEC.length];
static {
// Convert buckets boundaries from usec to millis
for (int i = 0; i < ManagedLedgerMBeanImpl.ENTRY_LATENCY_BUCKETS_USEC.length; i++) {
ENTRY_LATENCY_BUCKETS_MS[i] = ManagedLedgerMBeanImpl.ENTRY_LATENCY_BUCKETS_USEC[i] / 1000.0;
}
}
protected static final double[] ENTRY_SIZE_BUCKETS_BYTES =
new double[ManagedLedgerMBeanImpl.ENTRY_SIZE_BUCKETS_BYTES.length];
static {
// Convert buckets boundaries from usec to millis
for (int i = 0; i < ManagedLedgerMBeanImpl.ENTRY_SIZE_BUCKETS_BYTES.length; i++) {
ENTRY_SIZE_BUCKETS_BYTES[i] = ManagedLedgerMBeanImpl.ENTRY_SIZE_BUCKETS_BYTES[i];
}
}
// simple abstract for the buckets, their boundaries and pre-calculated keys
// pre-calculating the keys avoids a lot of object allocations during metric collection
static class Buckets {
private final double[] boundaries;
private final String[] bucketKeys;
Buckets(String metricKey, double[] boundaries) {
this.boundaries = boundaries;
this.bucketKeys = generateBucketKeys(metricKey, boundaries);
}
private static String[] generateBucketKeys(String mkey, double[] boundaries) {
String[] keys = new String[boundaries.length + 1];
for (int i = 0; i < boundaries.length + 1; i++) {
String bucketKey;
double value;
// example of key : "<metric_key>_0.0_0.5"
if (i == 0 && boundaries.length > 0) {
bucketKey = String.format("%s_0.0_%1.1f", mkey, boundaries[i]);
} else if (i < boundaries.length) {
bucketKey = String.format("%s_%1.1f_%1.1f", mkey, boundaries[i - 1], boundaries[i]);
} else {
bucketKey = String.format("%s_OVERFLOW", mkey);
}
keys[i] = bucketKey;
}
return keys;
}
public void populateBucketEntries(Map<String, Double> map, long[] bucketValues, int period) {
// bucket values should be one more that the boundaries to have the last element as OVERFLOW
if (bucketValues != null && bucketValues.length != boundaries.length + 1) {
throw new RuntimeException("Bucket boundary and value array length mismatch");
}
for (int i = 0; i < boundaries.length + 1; i++) {
double value = (bucketValues == null) ? 0.0D : ((double) bucketValues[i] / (period > 0 ? period : 1));
map.compute(bucketKeys[i], (key, currentValue) -> (currentValue == null ? 0.0d : currentValue) + value);
}
}
}
protected final PulsarService pulsar;
abstract List<Metrics> generate();
AbstractMetrics(PulsarService pulsar) {
this.pulsar = pulsar;
}
/**
* Creates a metrics with empty immutable dimension.
* <p>
* Use this for metrics that doesn't need any dimension - i.e global metrics
*
* @return
*/
protected Metrics createMetrics() {
return createMetrics(new HashMap<String, String>());
}
protected Metrics createMetrics(Map<String, String> dimensionMap) {
// create with current version
return Metrics.create(dimensionMap);
}
/**
* Returns the managed ledger cache statistics from ML factory.
*
* @return
*/
protected ManagedLedgerFactoryMXBean getManagedLedgerCacheStats() {
return ((ManagedLedgerFactoryImpl) pulsar.getManagedLedgerFactory()).getCacheStats();
}
/**
* Returns managed ledgers map from ML factory.
*
* @return
*/
protected Map<String, ManagedLedgerImpl> getManagedLedgers() {
return ((ManagedLedgerFactoryImpl) pulsar.getManagedLedgerFactory()).getManagedLedgers();
}
protected String getLocalClusterName() {
return pulsar.getConfiguration().getClusterName();
}
protected double average(List<Double> values) {
double average = 0;
if (values.size() > 0) {
double sum = 0;
for (Double value : values) {
sum += value;
}
average = sum / values.size();
}
return average;
}
protected double sum(List<Double> values) {
double sum = 0;
if (values.size() > 0) {
for (Double value : values) {
sum += value;
}
}
return sum;
}
protected String parseNamespaceFromLedgerName(String ledgerName) {
Matcher m = V2_LEDGER_NAME_PATTERN.matcher(ledgerName);
if (m.matches()) {
return m.group(1);
} else {
throw new RuntimeException("Failed to parse the namespace from ledger name : " + ledgerName);
}
}
/**
* Creates a dimension key for metrics.
*
* @param namespace Namespace of metric
* @return
*/
protected Metrics createMetricsByDimension(String namespace) {
Map<String, String> dimensionMap = Maps.newHashMap();
dimensionMap.put("namespace", namespace);
return createMetrics(dimensionMap);
}
/**
* Creates a dimension key for replication metrics.
*
* @param namespace
* @param fromClusterName
* @param toClusterName
* @return
*/
protected Metrics createMetricsByDimension(String namespace, String fromClusterName, String toClusterName) {
Map<String, String> dimensionMap = Maps.newHashMap();
dimensionMap.put("namespace", namespace);
dimensionMap.put("from_cluster", fromClusterName);
dimensionMap.put("to_cluster", toClusterName);
return createMetrics(dimensionMap);
}
protected void populateAggregationMap(Map<String, List<Double>> map, String mkey, double value) {
if (!map.containsKey(mkey)) {
map.put(mkey, Lists.newArrayList(value));
} else {
map.get(mkey).add(value);
}
}
protected void populateAggregationMapWithSum(Map<String, Double> map, String mkey, double value) {
Double val = map.getOrDefault(mkey, 0.0);
map.put(mkey, val + value);
}
protected void populateMaxMap(Map<String, Long> map, String mkey, long value) {
Long existingValue = map.get(mkey);
if (existingValue == null || value > existingValue) {
map.put(mkey, value);
}
}
/**
* Helper to manage populating topics map.
*
* @param ledgersByDimensionMap
* @param metrics
* @param ledger
*/
protected void populateDimensionMap(Map<Metrics, List<ManagedLedgerImpl>> ledgersByDimensionMap, Metrics metrics,
ManagedLedgerImpl ledger) {
if (!ledgersByDimensionMap.containsKey(metrics)) {
// create new list
ledgersByDimensionMap.put(metrics, Lists.newArrayList(ledger));
} else {
// add to collection
ledgersByDimensionMap.get(metrics).add(ledger);
}
}
protected void populateDimensionMap(Map<Metrics, List<TopicStats>> topicsStatsByDimensionMap,
Metrics metrics, TopicStats destStats) {
if (!topicsStatsByDimensionMap.containsKey(metrics)) {
// create new list
topicsStatsByDimensionMap.put(metrics, Lists.newArrayList(destStats));
} else {
// add to collection
topicsStatsByDimensionMap.get(metrics).add(destStats);
}
}
}