blob: 0a885ef7356a6f9ff058ef43562368a546ab1ca7 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.broker.stats.prometheus;
import io.netty.util.concurrent.FastThreadLocal;
import java.util.Optional;
import java.util.concurrent.atomic.LongAdder;
import lombok.extern.slf4j.Slf4j;
import org.apache.bookkeeper.client.LedgerHandle;
import org.apache.bookkeeper.mledger.ManagedLedger;
import org.apache.bookkeeper.mledger.impl.ManagedLedgerMBeanImpl;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.service.Topic;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.common.policies.data.BacklogQuota;
import org.apache.pulsar.common.policies.data.stats.ConsumerStatsImpl;
import org.apache.pulsar.common.policies.data.stats.NonPersistentSubscriptionStatsImpl;
import org.apache.pulsar.common.policies.data.stats.NonPersistentTopicStatsImpl;
import org.apache.pulsar.common.policies.data.stats.ReplicatorStatsImpl;
import org.apache.pulsar.common.policies.data.stats.SubscriptionStatsImpl;
import org.apache.pulsar.common.policies.data.stats.TopicStatsImpl;
import org.apache.pulsar.common.util.SimpleTextOutputStream;
import org.apache.pulsar.compaction.CompactedTopicContext;
import org.apache.pulsar.compaction.Compactor;
import org.apache.pulsar.compaction.CompactorMXBean;
@Slf4j
public class NamespaceStatsAggregator {
private static FastThreadLocal<AggregatedNamespaceStats> localNamespaceStats =
new FastThreadLocal<AggregatedNamespaceStats>() {
@Override
protected AggregatedNamespaceStats initialValue() throws Exception {
return new AggregatedNamespaceStats();
}
};
private static FastThreadLocal<TopicStats> localTopicStats = new FastThreadLocal<TopicStats>() {
@Override
protected TopicStats initialValue() throws Exception {
return new TopicStats();
}
};
public static void generate(PulsarService pulsar, boolean includeTopicMetrics, boolean includeConsumerMetrics,
boolean includeProducerMetrics, boolean splitTopicAndPartitionIndexLabel, SimpleTextOutputStream stream) {
String cluster = pulsar.getConfiguration().getClusterName();
AggregatedNamespaceStats namespaceStats = localNamespaceStats.get();
TopicStats.resetTypes();
TopicStats topicStats = localTopicStats.get();
printDefaultBrokerStats(stream, cluster);
Optional<CompactorMXBean> compactorMXBean = getCompactorMXBean(pulsar);
LongAdder topicsCount = new LongAdder();
pulsar.getBrokerService().getMultiLayerTopicMap().forEach((namespace, bundlesMap) -> {
namespaceStats.reset();
topicsCount.reset();
bundlesMap.forEach((bundle, topicsMap) -> {
topicsMap.forEach((name, topic) -> {
getTopicStats(topic, topicStats, includeConsumerMetrics, includeProducerMetrics,
pulsar.getConfiguration().isExposePreciseBacklogInPrometheus(),
pulsar.getConfiguration().isExposeSubscriptionBacklogSizeInPrometheus(),
compactorMXBean
);
if (includeTopicMetrics) {
topicsCount.add(1);
TopicStats.printTopicStats(stream, cluster, namespace, name, topicStats, compactorMXBean,
splitTopicAndPartitionIndexLabel);
} else {
namespaceStats.updateStats(topicStats);
}
});
});
if (!includeTopicMetrics) {
// Only include namespace level stats if we don't have the per-topic, otherwise we're going to report
// the same data twice, and it will make the aggregation difficult
printNamespaceStats(stream, cluster, namespace, namespaceStats);
} else {
printTopicsCountStats(stream, cluster, namespace, topicsCount);
}
});
}
private static Optional<CompactorMXBean> getCompactorMXBean(PulsarService pulsar) {
Compactor compactor = pulsar.getNullableCompactor();
return Optional.ofNullable(compactor).map(c -> c.getStats());
}
private static void aggregateTopicStats(TopicStats stats, SubscriptionStatsImpl subscriptionStats,
AggregatedSubscriptionStats subsStats) {
stats.subscriptionsCount++;
stats.msgBacklog += subscriptionStats.msgBacklog;
subsStats.msgBacklog = subscriptionStats.msgBacklog;
subsStats.msgDelayed = subscriptionStats.msgDelayed;
subsStats.msgRateExpired = subscriptionStats.msgRateExpired;
subsStats.totalMsgExpired = subscriptionStats.totalMsgExpired;
subsStats.msgBacklogNoDelayed = subsStats.msgBacklog - subsStats.msgDelayed;
subsStats.lastExpireTimestamp = subscriptionStats.lastExpireTimestamp;
subsStats.lastAckedTimestamp = subscriptionStats.lastAckedTimestamp;
subsStats.lastConsumedFlowTimestamp = subscriptionStats.lastConsumedFlowTimestamp;
subsStats.lastConsumedTimestamp = subscriptionStats.lastConsumedTimestamp;
subsStats.lastMarkDeleteAdvancedTimestamp = subscriptionStats.lastMarkDeleteAdvancedTimestamp;
subsStats.consumersCount = subscriptionStats.consumers.size();
subscriptionStats.consumers.forEach(cStats -> {
stats.consumersCount++;
subsStats.unackedMessages += cStats.unackedMessages;
subsStats.msgRateRedeliver += cStats.msgRateRedeliver;
subsStats.msgRateOut += cStats.msgRateOut;
subsStats.messageAckRate += cStats.messageAckRate;
subsStats.msgThroughputOut += cStats.msgThroughputOut;
subsStats.bytesOutCounter += cStats.bytesOutCounter;
subsStats.msgOutCounter += cStats.msgOutCounter;
if (!subsStats.blockedSubscriptionOnUnackedMsgs && cStats.blockedConsumerOnUnackedMsgs) {
subsStats.blockedSubscriptionOnUnackedMsgs = true;
}
});
stats.rateOut += subsStats.msgRateOut;
stats.throughputOut += subsStats.msgThroughputOut;
}
private static void getTopicStats(Topic topic, TopicStats stats, boolean includeConsumerMetrics,
boolean includeProducerMetrics, boolean getPreciseBacklog, boolean subscriptionBacklogSize,
Optional<CompactorMXBean> compactorMXBean) {
stats.reset();
if (topic instanceof PersistentTopic) {
// Managed Ledger stats
ManagedLedger ml = ((PersistentTopic) topic).getManagedLedger();
ManagedLedgerMBeanImpl mlStats = (ManagedLedgerMBeanImpl) ml.getStats();
stats.managedLedgerStats.storageSize = mlStats.getStoredMessagesSize();
stats.managedLedgerStats.storageLogicalSize = mlStats.getStoredMessagesLogicalSize();
stats.managedLedgerStats.backlogSize = ml.getEstimatedBacklogSize();
stats.managedLedgerStats.offloadedStorageUsed = ml.getOffloadedSize();
stats.backlogQuotaLimit = topic
.getBacklogQuota(BacklogQuota.BacklogQuotaType.destination_storage).getLimitSize();
stats.backlogQuotaLimitTime = topic
.getBacklogQuota(BacklogQuota.BacklogQuotaType.message_age).getLimitTime();
stats.managedLedgerStats.storageWriteLatencyBuckets
.addAll(mlStats.getInternalAddEntryLatencyBuckets());
stats.managedLedgerStats.storageWriteLatencyBuckets.refresh();
stats.managedLedgerStats.storageLedgerWriteLatencyBuckets
.addAll(mlStats.getInternalLedgerAddEntryLatencyBuckets());
stats.managedLedgerStats.storageLedgerWriteLatencyBuckets.refresh();
stats.managedLedgerStats.entrySizeBuckets.addAll(mlStats.getInternalEntrySizeBuckets());
stats.managedLedgerStats.entrySizeBuckets.refresh();
stats.managedLedgerStats.storageWriteRate = mlStats.getAddEntryMessagesRate();
stats.managedLedgerStats.storageReadRate = mlStats.getReadEntriesRate();
}
TopicStatsImpl tStatus = topic.getStats(getPreciseBacklog, subscriptionBacklogSize, false);
stats.msgInCounter = tStatus.msgInCounter;
stats.bytesInCounter = tStatus.bytesInCounter;
stats.msgOutCounter = tStatus.msgOutCounter;
stats.bytesOutCounter = tStatus.bytesOutCounter;
stats.averageMsgSize = tStatus.averageMsgSize;
stats.publishRateLimitedTimes = tStatus.publishRateLimitedTimes;
stats.delayedTrackerMemoryUsage = tStatus.delayedMessageIndexSizeInBytes;
stats.abortedTxnCount = tStatus.abortedTxnCount;
stats.ongoingTxnCount = tStatus.ongoingTxnCount;
stats.committedTxnCount = tStatus.committedTxnCount;
stats.producersCount = 0;
topic.getProducers().values().forEach(producer -> {
if (producer.isRemote()) {
AggregatedReplicationStats replStats = stats.replicationStats
.computeIfAbsent(producer.getRemoteCluster(), k -> new AggregatedReplicationStats());
replStats.msgRateIn += producer.getStats().msgRateIn;
replStats.msgThroughputIn += producer.getStats().msgThroughputIn;
} else {
// Local producer
stats.producersCount++;
stats.rateIn += producer.getStats().msgRateIn;
stats.throughputIn += producer.getStats().msgThroughputIn;
if (includeProducerMetrics) {
AggregatedProducerStats producerStats = stats.producerStats.computeIfAbsent(
producer.getProducerName(), k -> new AggregatedProducerStats());
producerStats.producerId = producer.getStats().producerId;
producerStats.msgRateIn = producer.getStats().msgRateIn;
producerStats.msgThroughputIn = producer.getStats().msgThroughputIn;
producerStats.averageMsgSize = producer.getStats().averageMsgSize;
}
}
});
if (topic instanceof PersistentTopic) {
tStatus.subscriptions.forEach((subName, subscriptionStats) -> {
AggregatedSubscriptionStats subsStats = stats.subscriptionStats
.computeIfAbsent(subName, k -> new AggregatedSubscriptionStats());
aggregateTopicStats(stats, subscriptionStats, subsStats);
});
} else {
((NonPersistentTopicStatsImpl) tStatus).getNonPersistentSubscriptions()
.forEach((subName, nonPersistentSubscriptionStats) -> {
NonPersistentSubscriptionStatsImpl subscriptionStats =
(NonPersistentSubscriptionStatsImpl) nonPersistentSubscriptionStats;
AggregatedSubscriptionStats subsStats = stats.subscriptionStats
.computeIfAbsent(subName, k -> new AggregatedSubscriptionStats());
aggregateTopicStats(stats, subscriptionStats, subsStats);
subsStats.msgDropRate += subscriptionStats.getMsgDropRate();
});
}
// Consumer stats can be a lot if a subscription has many consumers
if (includeConsumerMetrics) {
topic.getSubscriptions().forEach((name, subscription) -> {
AggregatedSubscriptionStats subsStats = stats.subscriptionStats
.computeIfAbsent(name, k -> new AggregatedSubscriptionStats());
subscription.getConsumers().forEach(consumer -> {
ConsumerStatsImpl conStats = consumer.getStats();
AggregatedConsumerStats consumerStats = subsStats.consumerStat
.computeIfAbsent(consumer, k -> new AggregatedConsumerStats());
consumerStats.unackedMessages = conStats.unackedMessages;
consumerStats.msgRateRedeliver = conStats.msgRateRedeliver;
consumerStats.msgRateOut = conStats.msgRateOut;
consumerStats.msgAckRate = conStats.messageAckRate;
consumerStats.msgThroughputOut = conStats.msgThroughputOut;
consumerStats.bytesOutCounter = conStats.bytesOutCounter;
consumerStats.msgOutCounter = conStats.msgOutCounter;
consumerStats.availablePermits = conStats.availablePermits;
consumerStats.blockedSubscriptionOnUnackedMsgs = conStats.blockedConsumerOnUnackedMsgs;
});
});
}
topic.getReplicators().forEach((cluster, replicator) -> {
ReplicatorStatsImpl replStats = replicator.getStats();
AggregatedReplicationStats aggReplStats = stats.replicationStats.get(replicator.getRemoteCluster());
if (aggReplStats == null) {
aggReplStats = new AggregatedReplicationStats();
stats.replicationStats.put(replicator.getRemoteCluster(), aggReplStats);
aggReplStats.msgRateIn = replStats.msgRateIn;
aggReplStats.msgThroughputIn = replStats.msgThroughputIn;
}
aggReplStats.msgRateOut += replStats.msgRateOut;
aggReplStats.msgThroughputOut += replStats.msgThroughputOut;
aggReplStats.replicationBacklog += replStats.replicationBacklog;
aggReplStats.msgRateExpired += replStats.msgRateExpired;
aggReplStats.connectedCount += replStats.connected ? 1 : 0;
aggReplStats.replicationDelayInSeconds += replStats.replicationDelayInSeconds;
});
compactorMXBean
.flatMap(mxBean -> mxBean.getCompactionRecordForTopic(topic.getName()))
.map(compactionRecord -> {
stats.compactionRemovedEventCount = compactionRecord.getCompactionRemovedEventCount();
stats.compactionSucceedCount = compactionRecord.getCompactionSucceedCount();
stats.compactionFailedCount = compactionRecord.getCompactionFailedCount();
stats.compactionDurationTimeInMills = compactionRecord.getCompactionDurationTimeInMills();
stats.compactionReadThroughput = compactionRecord.getCompactionReadThroughput();
stats.compactionWriteThroughput = compactionRecord.getCompactionWriteThroughput();
stats.compactionLatencyBuckets.addAll(compactionRecord.getCompactionLatencyStats());
stats.compactionLatencyBuckets.refresh();
PersistentTopic persistentTopic = (PersistentTopic) topic;
Optional<CompactedTopicContext> compactedTopicContext = persistentTopic
.getCompactedTopicContext();
if (compactedTopicContext.isPresent()) {
LedgerHandle ledger = compactedTopicContext.get().getLedger();
long entries = ledger.getLastAddConfirmed() + 1;
long size = ledger.getLength();
stats.compactionCompactedEntriesCount = entries;
stats.compactionCompactedEntriesSize = size;
}
return compactionRecord;
});
}
private static void printDefaultBrokerStats(SimpleTextOutputStream stream, String cluster) {
// Print metrics with 0 values. This is necessary to have the available brokers being
// reported in the brokers dashboard even if they don't have any topic or traffic
metric(stream, cluster, "pulsar_topics_count", 0);
metric(stream, cluster, "pulsar_subscriptions_count", 0);
metric(stream, cluster, "pulsar_producers_count", 0);
metric(stream, cluster, "pulsar_consumers_count", 0);
metric(stream, cluster, "pulsar_rate_in", 0);
metric(stream, cluster, "pulsar_rate_out", 0);
metric(stream, cluster, "pulsar_throughput_in", 0);
metric(stream, cluster, "pulsar_throughput_out", 0);
metric(stream, cluster, "pulsar_storage_size", 0);
metric(stream, cluster, "pulsar_storage_logical_size", 0);
metric(stream, cluster, "pulsar_storage_write_rate", 0);
metric(stream, cluster, "pulsar_storage_read_rate", 0);
metric(stream, cluster, "pulsar_msg_backlog", 0);
}
private static void printTopicsCountStats(SimpleTextOutputStream stream, String cluster, String namespace,
LongAdder topicsCount) {
metric(stream, cluster, namespace, "pulsar_topics_count", topicsCount.sum());
}
private static void printNamespaceStats(SimpleTextOutputStream stream, String cluster, String namespace,
AggregatedNamespaceStats stats) {
metric(stream, cluster, namespace, "pulsar_topics_count", stats.topicsCount);
metric(stream, cluster, namespace, "pulsar_subscriptions_count", stats.subscriptionsCount);
metric(stream, cluster, namespace, "pulsar_producers_count", stats.producersCount);
metric(stream, cluster, namespace, "pulsar_consumers_count", stats.consumersCount);
metric(stream, cluster, namespace, "pulsar_rate_in", stats.rateIn);
metric(stream, cluster, namespace, "pulsar_rate_out", stats.rateOut);
metric(stream, cluster, namespace, "pulsar_throughput_in", stats.throughputIn);
metric(stream, cluster, namespace, "pulsar_throughput_out", stats.throughputOut);
metric(stream, cluster, namespace, "pulsar_txn_tb_active_total", stats.ongoingTxnCount);
metric(stream, cluster, namespace, "pulsar_txn_tb_aborted_total", stats.abortedTxnCount);
metric(stream, cluster, namespace, "pulsar_txn_tb_committed_total", stats.committedTxnCount);
metric(stream, cluster, namespace, "pulsar_consumer_msg_ack_rate", stats.messageAckRate);
metric(stream, cluster, namespace, "pulsar_in_bytes_total", stats.bytesInCounter);
metric(stream, cluster, namespace, "pulsar_in_messages_total", stats.msgInCounter);
metric(stream, cluster, namespace, "pulsar_out_bytes_total", stats.bytesOutCounter);
metric(stream, cluster, namespace, "pulsar_out_messages_total", stats.msgOutCounter);
metric(stream, cluster, namespace, "pulsar_storage_size", stats.managedLedgerStats.storageSize);
metric(stream, cluster, namespace, "pulsar_storage_logical_size", stats.managedLedgerStats.storageLogicalSize);
metric(stream, cluster, namespace, "pulsar_storage_backlog_size", stats.managedLedgerStats.backlogSize);
metric(stream, cluster, namespace, "pulsar_storage_offloaded_size",
stats.managedLedgerStats.offloadedStorageUsed);
metric(stream, cluster, namespace, "pulsar_storage_write_rate", stats.managedLedgerStats.storageWriteRate);
metric(stream, cluster, namespace, "pulsar_storage_read_rate", stats.managedLedgerStats.storageReadRate);
metric(stream, cluster, namespace, "pulsar_subscription_delayed", stats.msgDelayed);
metric(stream, cluster, namespace, "pulsar_delayed_message_index_size_bytes",
stats.delayedTrackerMemoryUsage);
metricWithRemoteCluster(stream, cluster, namespace, "pulsar_msg_backlog", "local", stats.msgBacklog);
stats.managedLedgerStats.storageWriteLatencyBuckets.refresh();
long[] latencyBuckets = stats.managedLedgerStats.storageWriteLatencyBuckets.getBuckets();
metric(stream, cluster, namespace, "pulsar_storage_write_latency_le_0_5", latencyBuckets[0]);
metric(stream, cluster, namespace, "pulsar_storage_write_latency_le_1", latencyBuckets[1]);
metric(stream, cluster, namespace, "pulsar_storage_write_latency_le_5", latencyBuckets[2]);
metric(stream, cluster, namespace, "pulsar_storage_write_latency_le_10", latencyBuckets[3]);
metric(stream, cluster, namespace, "pulsar_storage_write_latency_le_20", latencyBuckets[4]);
metric(stream, cluster, namespace, "pulsar_storage_write_latency_le_50", latencyBuckets[5]);
metric(stream, cluster, namespace, "pulsar_storage_write_latency_le_100", latencyBuckets[6]);
metric(stream, cluster, namespace, "pulsar_storage_write_latency_le_200", latencyBuckets[7]);
metric(stream, cluster, namespace, "pulsar_storage_write_latency_le_1000", latencyBuckets[8]);
metric(stream, cluster, namespace, "pulsar_storage_write_latency_overflow", latencyBuckets[9]);
metric(stream, cluster, namespace, "pulsar_storage_write_latency_count",
stats.managedLedgerStats.storageWriteLatencyBuckets.getCount());
metric(stream, cluster, namespace, "pulsar_storage_write_latency_sum",
stats.managedLedgerStats.storageWriteLatencyBuckets.getSum());
stats.managedLedgerStats.storageLedgerWriteLatencyBuckets.refresh();
long[] ledgerWritelatencyBuckets = stats.managedLedgerStats.storageLedgerWriteLatencyBuckets.getBuckets();
metric(stream, cluster, namespace, "pulsar_storage_ledger_write_latency_le_0_5", ledgerWritelatencyBuckets[0]);
metric(stream, cluster, namespace, "pulsar_storage_ledger_write_latency_le_1", ledgerWritelatencyBuckets[1]);
metric(stream, cluster, namespace, "pulsar_storage_ledger_write_latency_le_5", ledgerWritelatencyBuckets[2]);
metric(stream, cluster, namespace, "pulsar_storage_ledger_write_latency_le_10", ledgerWritelatencyBuckets[3]);
metric(stream, cluster, namespace, "pulsar_storage_ledger_write_latency_le_20", ledgerWritelatencyBuckets[4]);
metric(stream, cluster, namespace, "pulsar_storage_ledger_write_latency_le_50", ledgerWritelatencyBuckets[5]);
metric(stream, cluster, namespace, "pulsar_storage_ledger_write_latency_le_100", ledgerWritelatencyBuckets[6]);
metric(stream, cluster, namespace, "pulsar_storage_ledger_write_latency_le_200", ledgerWritelatencyBuckets[7]);
metric(stream, cluster, namespace, "pulsar_storage_ledger_write_latency_le_1000", ledgerWritelatencyBuckets[8]);
metric(stream, cluster, namespace, "pulsar_storage_ledger_write_latency_overflow",
ledgerWritelatencyBuckets[9]);
metric(stream, cluster, namespace, "pulsar_storage_ledger_write_latency_count",
stats.managedLedgerStats.storageLedgerWriteLatencyBuckets.getCount());
metric(stream, cluster, namespace, "pulsar_storage_ledger_write_latency_sum",
stats.managedLedgerStats.storageLedgerWriteLatencyBuckets.getSum());
stats.managedLedgerStats.entrySizeBuckets.refresh();
long[] entrySizeBuckets = stats.managedLedgerStats.entrySizeBuckets.getBuckets();
metric(stream, cluster, namespace, "pulsar_entry_size_le_128", entrySizeBuckets[0]);
metric(stream, cluster, namespace, "pulsar_entry_size_le_512", entrySizeBuckets[1]);
metric(stream, cluster, namespace, "pulsar_entry_size_le_1_kb", entrySizeBuckets[2]);
metric(stream, cluster, namespace, "pulsar_entry_size_le_2_kb", entrySizeBuckets[3]);
metric(stream, cluster, namespace, "pulsar_entry_size_le_4_kb", entrySizeBuckets[4]);
metric(stream, cluster, namespace, "pulsar_entry_size_le_16_kb", entrySizeBuckets[5]);
metric(stream, cluster, namespace, "pulsar_entry_size_le_100_kb", entrySizeBuckets[6]);
metric(stream, cluster, namespace, "pulsar_entry_size_le_1_mb", entrySizeBuckets[7]);
metric(stream, cluster, namespace, "pulsar_entry_size_le_overflow", entrySizeBuckets[8]);
metric(stream, cluster, namespace, "pulsar_entry_size_count",
stats.managedLedgerStats.entrySizeBuckets.getCount());
metric(stream, cluster, namespace, "pulsar_entry_size_sum",
stats.managedLedgerStats.entrySizeBuckets.getSum());
if (!stats.replicationStats.isEmpty()) {
stats.replicationStats.forEach((remoteCluster, replStats) -> {
metricWithRemoteCluster(stream, cluster, namespace, "pulsar_replication_rate_in", remoteCluster,
replStats.msgRateIn);
metricWithRemoteCluster(stream, cluster, namespace, "pulsar_replication_rate_out", remoteCluster,
replStats.msgRateOut);
metricWithRemoteCluster(stream, cluster, namespace, "pulsar_replication_throughput_in", remoteCluster,
replStats.msgThroughputIn);
metricWithRemoteCluster(stream, cluster, namespace, "pulsar_replication_throughput_out", remoteCluster,
replStats.msgThroughputOut);
metricWithRemoteCluster(stream, cluster, namespace, "pulsar_replication_backlog", remoteCluster,
replStats.replicationBacklog);
metricWithRemoteCluster(stream, cluster, namespace, "pulsar_replication_connected_count", remoteCluster,
replStats.connectedCount);
metricWithRemoteCluster(stream, cluster, namespace, "pulsar_replication_rate_expired", remoteCluster,
replStats.msgRateExpired);
metricWithRemoteCluster(stream, cluster, namespace, "pulsar_replication_delay_in_seconds",
remoteCluster, replStats.replicationDelayInSeconds);
});
}
}
private static void metric(SimpleTextOutputStream stream, String cluster, String name,
long value) {
TopicStats.metricType(stream, name);
stream.write(name)
.write("{cluster=\"").write(cluster).write("\"} ")
.write(value).write(' ').write(System.currentTimeMillis())
.write('\n');
}
private static void metric(SimpleTextOutputStream stream, String cluster, String namespace, String name,
long value) {
TopicStats.metricType(stream, name);
stream.write(name).write("{cluster=\"").write(cluster).write("\",namespace=\"").write(namespace).write("\"} ");
stream.write(value).write(' ').write(System.currentTimeMillis()).write('\n');
}
private static void metric(SimpleTextOutputStream stream, String cluster, String namespace, String name,
double value) {
TopicStats.metricType(stream, name);
stream.write(name).write("{cluster=\"").write(cluster).write("\",namespace=\"").write(namespace).write("\"} ");
stream.write(value).write(' ').write(System.currentTimeMillis()).write('\n');
}
private static void metricWithRemoteCluster(SimpleTextOutputStream stream, String cluster, String namespace,
String name, String remoteCluster, double value) {
TopicStats.metricType(stream, name);
stream.write(name).write("{cluster=\"").write(cluster).write("\",namespace=\"").write(namespace);
stream.write("\",remote_cluster=\"").write(remoteCluster).write("\"} ");
stream.write(value).write(' ').write(System.currentTimeMillis()).write('\n');
}
}