blob: 99838ccfae920b51daabb57e89b425a09174cdbb [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.broker.stats.prometheus;
import static org.apache.pulsar.common.naming.TopicName.PARTITIONED_TOPIC_SUFFIX;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
import org.apache.bookkeeper.mledger.util.StatsBuckets;
import org.apache.pulsar.common.util.SimpleTextOutputStream;
import org.apache.pulsar.compaction.CompactionRecord;
import org.apache.pulsar.compaction.CompactorMXBean;
class TopicStats {
int subscriptionsCount;
int producersCount;
int consumersCount;
double rateIn;
double rateOut;
double throughputIn;
double throughputOut;
long msgInCounter;
long bytesInCounter;
long msgOutCounter;
long bytesOutCounter;
double averageMsgSize;
public long msgBacklog;
long publishRateLimitedTimes;
long backlogQuotaLimit;
long backlogQuotaLimitTime;
ManagedLedgerStats managedLedgerStats = new ManagedLedgerStats();
Map<String, AggregatedReplicationStats> replicationStats = new HashMap<>();
Map<String, AggregatedSubscriptionStats> subscriptionStats = new HashMap<>();
Map<String, AggregatedProducerStats> producerStats = new HashMap<>();
// Used for tracking duplicate TYPE definitions
static Map<String, String> metricWithTypeDefinition = new HashMap<>();
// For compaction
long compactionRemovedEventCount;
long compactionSucceedCount;
long compactionFailedCount;
long compactionDurationTimeInMills;
double compactionReadThroughput;
double compactionWriteThroughput;
long compactionCompactedEntriesCount;
long compactionCompactedEntriesSize;
StatsBuckets compactionLatencyBuckets = new StatsBuckets(CompactionRecord.WRITE_LATENCY_BUCKETS_USEC);
public int delayedTrackerMemoryUsage;
public void reset() {
subscriptionsCount = 0;
producersCount = 0;
consumersCount = 0;
rateIn = 0;
rateOut = 0;
throughputIn = 0;
throughputOut = 0;
bytesInCounter = 0;
msgInCounter = 0;
bytesOutCounter = 0;
msgOutCounter = 0;
managedLedgerStats.reset();
msgBacklog = 0;
publishRateLimitedTimes = 0L;
backlogQuotaLimit = 0;
backlogQuotaLimitTime = -1;
replicationStats.clear();
subscriptionStats.clear();
producerStats.clear();
compactionRemovedEventCount = 0;
compactionSucceedCount = 0;
compactionFailedCount = 0;
compactionDurationTimeInMills = 0;
compactionReadThroughput = 0;
compactionWriteThroughput = 0;
compactionCompactedEntriesCount = 0;
compactionCompactedEntriesSize = 0;
compactionLatencyBuckets.reset();
delayedTrackerMemoryUsage = 0;
}
static void resetTypes() {
metricWithTypeDefinition.clear();
}
static void printTopicStats(SimpleTextOutputStream stream, String cluster, String namespace, String topic,
TopicStats stats, Optional<CompactorMXBean> compactorMXBean,
boolean splitTopicAndPartitionIndexLabel) {
metric(stream, cluster, namespace, topic, "pulsar_subscriptions_count", stats.subscriptionsCount,
splitTopicAndPartitionIndexLabel);
metric(stream, cluster, namespace, topic, "pulsar_producers_count", stats.producersCount,
splitTopicAndPartitionIndexLabel);
metric(stream, cluster, namespace, topic, "pulsar_consumers_count", stats.consumersCount,
splitTopicAndPartitionIndexLabel);
metric(stream, cluster, namespace, topic, "pulsar_rate_in", stats.rateIn,
splitTopicAndPartitionIndexLabel);
metric(stream, cluster, namespace, topic, "pulsar_rate_out", stats.rateOut,
splitTopicAndPartitionIndexLabel);
metric(stream, cluster, namespace, topic, "pulsar_throughput_in", stats.throughputIn,
splitTopicAndPartitionIndexLabel);
metric(stream, cluster, namespace, topic, "pulsar_throughput_out", stats.throughputOut,
splitTopicAndPartitionIndexLabel);
metric(stream, cluster, namespace, topic, "pulsar_average_msg_size", stats.averageMsgSize,
splitTopicAndPartitionIndexLabel);
metric(stream, cluster, namespace, topic, "pulsar_storage_size", stats.managedLedgerStats.storageSize,
splitTopicAndPartitionIndexLabel);
metric(stream, cluster, namespace, topic, "pulsar_storage_logical_size",
stats.managedLedgerStats.storageLogicalSize, splitTopicAndPartitionIndexLabel);
metric(stream, cluster, namespace, topic, "pulsar_msg_backlog", stats.msgBacklog,
splitTopicAndPartitionIndexLabel);
metric(stream, cluster, namespace, topic, "pulsar_storage_write_rate",
stats.managedLedgerStats.storageWriteRate, splitTopicAndPartitionIndexLabel);
metric(stream, cluster, namespace, topic, "pulsar_storage_read_rate", stats.managedLedgerStats.storageReadRate,
splitTopicAndPartitionIndexLabel);
metric(stream, cluster, namespace, topic, "pulsar_storage_backlog_size",
stats.managedLedgerStats.backlogSize, splitTopicAndPartitionIndexLabel);
metric(stream, cluster, namespace, topic, "pulsar_publish_rate_limit_times", stats.publishRateLimitedTimes,
splitTopicAndPartitionIndexLabel);
metric(stream, cluster, namespace, topic, "pulsar_storage_offloaded_size", stats.managedLedgerStats
.offloadedStorageUsed, splitTopicAndPartitionIndexLabel);
metric(stream, cluster, namespace, topic, "pulsar_storage_backlog_quota_limit", stats.backlogQuotaLimit,
splitTopicAndPartitionIndexLabel);
metric(stream, cluster, namespace, topic, "pulsar_storage_backlog_quota_limit_time",
stats.backlogQuotaLimitTime, splitTopicAndPartitionIndexLabel);
metric(stream, cluster, namespace, topic, "pulsar_delayed_message_index_size_bytes",
stats.delayedTrackerMemoryUsage, splitTopicAndPartitionIndexLabel);
long[] latencyBuckets = stats.managedLedgerStats.storageWriteLatencyBuckets.getBuckets();
metric(stream, cluster, namespace, topic, "pulsar_storage_write_latency_le_0_5", latencyBuckets[0],
splitTopicAndPartitionIndexLabel);
metric(stream, cluster, namespace, topic, "pulsar_storage_write_latency_le_1", latencyBuckets[1],
splitTopicAndPartitionIndexLabel);
metric(stream, cluster, namespace, topic, "pulsar_storage_write_latency_le_5", latencyBuckets[2],
splitTopicAndPartitionIndexLabel);
metric(stream, cluster, namespace, topic, "pulsar_storage_write_latency_le_10", latencyBuckets[3],
splitTopicAndPartitionIndexLabel);
metric(stream, cluster, namespace, topic, "pulsar_storage_write_latency_le_20", latencyBuckets[4],
splitTopicAndPartitionIndexLabel);
metric(stream, cluster, namespace, topic, "pulsar_storage_write_latency_le_50", latencyBuckets[5],
splitTopicAndPartitionIndexLabel);
metric(stream, cluster, namespace, topic, "pulsar_storage_write_latency_le_100", latencyBuckets[6],
splitTopicAndPartitionIndexLabel);
metric(stream, cluster, namespace, topic, "pulsar_storage_write_latency_le_200", latencyBuckets[7],
splitTopicAndPartitionIndexLabel);
metric(stream, cluster, namespace, topic, "pulsar_storage_write_latency_le_1000", latencyBuckets[8],
splitTopicAndPartitionIndexLabel);
metric(stream, cluster, namespace, topic, "pulsar_storage_write_latency_overflow", latencyBuckets[9],
splitTopicAndPartitionIndexLabel);
metric(stream, cluster, namespace, topic, "pulsar_storage_write_latency_count",
stats.managedLedgerStats.storageWriteLatencyBuckets.getCount(), splitTopicAndPartitionIndexLabel);
metric(stream, cluster, namespace, topic, "pulsar_storage_write_latency_sum",
stats.managedLedgerStats.storageWriteLatencyBuckets.getSum(), splitTopicAndPartitionIndexLabel);
long[] ledgerWriteLatencyBuckets = stats.managedLedgerStats.storageLedgerWriteLatencyBuckets.getBuckets();
metric(stream, cluster, namespace, topic, "pulsar_storage_ledger_write_latency_le_0_5",
ledgerWriteLatencyBuckets[0], splitTopicAndPartitionIndexLabel);
metric(stream, cluster, namespace, topic, "pulsar_storage_ledger_write_latency_le_1",
ledgerWriteLatencyBuckets[1], splitTopicAndPartitionIndexLabel);
metric(stream, cluster, namespace, topic, "pulsar_storage_ledger_write_latency_le_5",
ledgerWriteLatencyBuckets[2], splitTopicAndPartitionIndexLabel);
metric(stream, cluster, namespace, topic, "pulsar_storage_ledger_write_latency_le_10",
ledgerWriteLatencyBuckets[3], splitTopicAndPartitionIndexLabel);
metric(stream, cluster, namespace, topic, "pulsar_storage_ledger_write_latency_le_20",
ledgerWriteLatencyBuckets[4], splitTopicAndPartitionIndexLabel);
metric(stream, cluster, namespace, topic, "pulsar_storage_ledger_write_latency_le_50",
ledgerWriteLatencyBuckets[5], splitTopicAndPartitionIndexLabel);
metric(stream, cluster, namespace, topic, "pulsar_storage_ledger_write_latency_le_100",
ledgerWriteLatencyBuckets[6], splitTopicAndPartitionIndexLabel);
metric(stream, cluster, namespace, topic, "pulsar_storage_ledger_write_latency_le_200",
ledgerWriteLatencyBuckets[7], splitTopicAndPartitionIndexLabel);
metric(stream, cluster, namespace, topic, "pulsar_storage_ledger_write_latency_le_1000",
ledgerWriteLatencyBuckets[8], splitTopicAndPartitionIndexLabel);
metric(stream, cluster, namespace, topic, "pulsar_storage_ledger_write_latency_overflow",
ledgerWriteLatencyBuckets[9], splitTopicAndPartitionIndexLabel);
metric(stream, cluster, namespace, topic, "pulsar_storage_ledger_write_latency_count",
stats.managedLedgerStats.storageLedgerWriteLatencyBuckets.getCount(),
splitTopicAndPartitionIndexLabel);
metric(stream, cluster, namespace, topic, "pulsar_storage_ledger_write_latency_sum",
stats.managedLedgerStats.storageLedgerWriteLatencyBuckets.getSum(),
splitTopicAndPartitionIndexLabel);
long[] entrySizeBuckets = stats.managedLedgerStats.entrySizeBuckets.getBuckets();
metric(stream, cluster, namespace, topic, "pulsar_entry_size_le_128", entrySizeBuckets[0],
splitTopicAndPartitionIndexLabel);
metric(stream, cluster, namespace, topic, "pulsar_entry_size_le_512", entrySizeBuckets[1],
splitTopicAndPartitionIndexLabel);
metric(stream, cluster, namespace, topic, "pulsar_entry_size_le_1_kb", entrySizeBuckets[2],
splitTopicAndPartitionIndexLabel);
metric(stream, cluster, namespace, topic, "pulsar_entry_size_le_2_kb", entrySizeBuckets[3],
splitTopicAndPartitionIndexLabel);
metric(stream, cluster, namespace, topic, "pulsar_entry_size_le_4_kb", entrySizeBuckets[4],
splitTopicAndPartitionIndexLabel);
metric(stream, cluster, namespace, topic, "pulsar_entry_size_le_16_kb", entrySizeBuckets[5],
splitTopicAndPartitionIndexLabel);
metric(stream, cluster, namespace, topic, "pulsar_entry_size_le_100_kb", entrySizeBuckets[6],
splitTopicAndPartitionIndexLabel);
metric(stream, cluster, namespace, topic, "pulsar_entry_size_le_1_mb", entrySizeBuckets[7],
splitTopicAndPartitionIndexLabel);
metric(stream, cluster, namespace, topic, "pulsar_entry_size_le_overflow", entrySizeBuckets[8],
splitTopicAndPartitionIndexLabel);
metric(stream, cluster, namespace, topic, "pulsar_entry_size_count",
stats.managedLedgerStats.entrySizeBuckets.getCount(), splitTopicAndPartitionIndexLabel);
metric(stream, cluster, namespace, topic, "pulsar_entry_size_sum",
stats.managedLedgerStats.entrySizeBuckets.getSum(), splitTopicAndPartitionIndexLabel);
stats.producerStats.forEach((p, producerStats) -> {
metric(stream, cluster, namespace, topic, p, producerStats.producerId, "pulsar_producer_msg_rate_in",
producerStats.msgRateIn, splitTopicAndPartitionIndexLabel);
metric(stream, cluster, namespace, topic, p, producerStats.producerId, "pulsar_producer_msg_throughput_in",
producerStats.msgThroughputIn, splitTopicAndPartitionIndexLabel);
metric(stream, cluster, namespace, topic, p, producerStats.producerId, "pulsar_producer_msg_average_Size",
producerStats.averageMsgSize, splitTopicAndPartitionIndexLabel);
});
stats.subscriptionStats.forEach((n, subsStats) -> {
metric(stream, cluster, namespace, topic, n, "pulsar_subscription_back_log",
subsStats.msgBacklog, splitTopicAndPartitionIndexLabel);
metric(stream, cluster, namespace, topic, n, "pulsar_subscription_back_log_no_delayed",
subsStats.msgBacklogNoDelayed, splitTopicAndPartitionIndexLabel);
metric(stream, cluster, namespace, topic, n, "pulsar_subscription_delayed",
subsStats.msgDelayed, splitTopicAndPartitionIndexLabel);
metric(stream, cluster, namespace, topic, n, "pulsar_subscription_msg_rate_redeliver",
subsStats.msgRateRedeliver, splitTopicAndPartitionIndexLabel);
metric(stream, cluster, namespace, topic, n, "pulsar_subscription_unacked_messages",
subsStats.unackedMessages, splitTopicAndPartitionIndexLabel);
metric(stream, cluster, namespace, topic, n, "pulsar_subscription_blocked_on_unacked_messages",
subsStats.blockedSubscriptionOnUnackedMsgs ? 1 : 0, splitTopicAndPartitionIndexLabel);
metric(stream, cluster, namespace, topic, n, "pulsar_subscription_msg_rate_out",
subsStats.msgRateOut, splitTopicAndPartitionIndexLabel);
metric(stream, cluster, namespace, topic, n, "pulsar_subscription_msg_ack_rate",
subsStats.messageAckRate, splitTopicAndPartitionIndexLabel);
metric(stream, cluster, namespace, topic, n, "pulsar_subscription_msg_throughput_out",
subsStats.msgThroughputOut, splitTopicAndPartitionIndexLabel);
metric(stream, cluster, namespace, topic, n, "pulsar_out_bytes_total",
subsStats.bytesOutCounter, splitTopicAndPartitionIndexLabel);
metric(stream, cluster, namespace, topic, n, "pulsar_out_messages_total",
subsStats.msgOutCounter, splitTopicAndPartitionIndexLabel);
metric(stream, cluster, namespace, topic, n, "pulsar_subscription_last_expire_timestamp",
subsStats.lastExpireTimestamp, splitTopicAndPartitionIndexLabel);
metric(stream, cluster, namespace, topic, n, "pulsar_subscription_last_acked_timestamp",
subsStats.lastAckedTimestamp, splitTopicAndPartitionIndexLabel);
metric(stream, cluster, namespace, topic, n, "pulsar_subscription_last_consumed_flow_timestamp",
subsStats.lastConsumedFlowTimestamp, splitTopicAndPartitionIndexLabel);
metric(stream, cluster, namespace, topic, n, "pulsar_subscription_last_consumed_timestamp",
subsStats.lastConsumedTimestamp, splitTopicAndPartitionIndexLabel);
metric(stream, cluster, namespace, topic, n, "pulsar_subscription_last_mark_delete_advanced_timestamp",
subsStats.lastMarkDeleteAdvancedTimestamp, splitTopicAndPartitionIndexLabel);
metric(stream, cluster, namespace, topic, n, "pulsar_subscription_msg_rate_expired",
subsStats.msgRateExpired, splitTopicAndPartitionIndexLabel);
metric(stream, cluster, namespace, topic, n, "pulsar_subscription_total_msg_expired",
subsStats.totalMsgExpired, splitTopicAndPartitionIndexLabel);
metric(stream, cluster, namespace, topic, n, "pulsar_subscription_msg_drop_rate",
subsStats.msgDropRate, splitTopicAndPartitionIndexLabel);
metric(stream, cluster, namespace, topic, n, "pulsar_subscription_consumers_count",
subsStats.consumersCount, splitTopicAndPartitionIndexLabel);
subsStats.consumerStat.forEach((c, consumerStats) -> {
metric(stream, cluster, namespace, topic, n, c.consumerName(), c.consumerId(),
"pulsar_consumer_msg_rate_redeliver", consumerStats.msgRateRedeliver,
splitTopicAndPartitionIndexLabel);
metric(stream, cluster, namespace, topic, n, c.consumerName(), c.consumerId(),
"pulsar_consumer_unacked_messages", consumerStats.unackedMessages,
splitTopicAndPartitionIndexLabel);
metric(stream, cluster, namespace, topic, n, c.consumerName(), c.consumerId(),
"pulsar_consumer_blocked_on_unacked_messages",
consumerStats.blockedSubscriptionOnUnackedMsgs ? 1 : 0,
splitTopicAndPartitionIndexLabel);
metric(stream, cluster, namespace, topic, n, c.consumerName(), c.consumerId(),
"pulsar_consumer_msg_rate_out", consumerStats.msgRateOut,
splitTopicAndPartitionIndexLabel);
metric(stream, cluster, namespace, topic, n, c.consumerName(), c.consumerId(),
"pulsar_consumer_msg_ack_rate", consumerStats.msgAckRate,
splitTopicAndPartitionIndexLabel);
metric(stream, cluster, namespace, topic, n, c.consumerName(), c.consumerId(),
"pulsar_consumer_msg_throughput_out", consumerStats.msgThroughputOut,
splitTopicAndPartitionIndexLabel);
metric(stream, cluster, namespace, topic, n, c.consumerName(), c.consumerId(),
"pulsar_consumer_available_permits", consumerStats.availablePermits,
splitTopicAndPartitionIndexLabel);
metric(stream, cluster, namespace, topic, n, c.consumerName(), c.consumerId(),
"pulsar_out_bytes_total", consumerStats.bytesOutCounter,
splitTopicAndPartitionIndexLabel);
metric(stream, cluster, namespace, topic, n, c.consumerName(), c.consumerId(),
"pulsar_out_messages_total", consumerStats.msgOutCounter,
splitTopicAndPartitionIndexLabel);
});
});
if (!stats.replicationStats.isEmpty()) {
stats.replicationStats.forEach((remoteCluster, replStats) -> {
metricWithRemoteCluster(stream, cluster, namespace, topic, "pulsar_replication_rate_in", remoteCluster,
replStats.msgRateIn, splitTopicAndPartitionIndexLabel);
metricWithRemoteCluster(stream, cluster, namespace, topic, "pulsar_replication_rate_out", remoteCluster,
replStats.msgRateOut, splitTopicAndPartitionIndexLabel);
metricWithRemoteCluster(stream, cluster, namespace, topic, "pulsar_replication_throughput_in",
remoteCluster, replStats.msgThroughputIn, splitTopicAndPartitionIndexLabel);
metricWithRemoteCluster(stream, cluster, namespace, topic, "pulsar_replication_throughput_out",
remoteCluster, replStats.msgThroughputOut, splitTopicAndPartitionIndexLabel);
metricWithRemoteCluster(stream, cluster, namespace, topic, "pulsar_replication_backlog", remoteCluster,
replStats.replicationBacklog, splitTopicAndPartitionIndexLabel);
metricWithRemoteCluster(stream, cluster, namespace, topic, "pulsar_replication_connected_count",
remoteCluster, replStats.connectedCount, splitTopicAndPartitionIndexLabel);
metricWithRemoteCluster(stream, cluster, namespace, topic, "pulsar_replication_rate_expired",
remoteCluster, replStats.msgRateExpired, splitTopicAndPartitionIndexLabel);
metricWithRemoteCluster(stream, cluster, namespace, topic, "pulsar_replication_delay_in_seconds",
remoteCluster, replStats.replicationDelayInSeconds, splitTopicAndPartitionIndexLabel);
});
}
metric(stream, cluster, namespace, topic, "pulsar_in_bytes_total", stats.bytesInCounter,
splitTopicAndPartitionIndexLabel);
metric(stream, cluster, namespace, topic, "pulsar_in_messages_total", stats.msgInCounter,
splitTopicAndPartitionIndexLabel);
// Compaction
boolean hasCompaction = compactorMXBean.flatMap(mxBean -> mxBean.getCompactionRecordForTopic(topic))
.map(__ -> true).orElse(false);
if (hasCompaction) {
metric(stream, cluster, namespace, topic, "pulsar_compaction_removed_event_count",
stats.compactionRemovedEventCount, splitTopicAndPartitionIndexLabel);
metric(stream, cluster, namespace, topic, "pulsar_compaction_succeed_count",
stats.compactionSucceedCount, splitTopicAndPartitionIndexLabel);
metric(stream, cluster, namespace, topic, "pulsar_compaction_failed_count",
stats.compactionFailedCount, splitTopicAndPartitionIndexLabel);
metric(stream, cluster, namespace, topic, "pulsar_compaction_duration_time_in_mills",
stats.compactionDurationTimeInMills, splitTopicAndPartitionIndexLabel);
metric(stream, cluster, namespace, topic, "pulsar_compaction_read_throughput",
stats.compactionReadThroughput, splitTopicAndPartitionIndexLabel);
metric(stream, cluster, namespace, topic, "pulsar_compaction_write_throughput",
stats.compactionWriteThroughput, splitTopicAndPartitionIndexLabel);
metric(stream, cluster, namespace, topic, "pulsar_compaction_compacted_entries_count",
stats.compactionCompactedEntriesCount, splitTopicAndPartitionIndexLabel);
metric(stream, cluster, namespace, topic, "pulsar_compaction_compacted_entries_size",
stats.compactionCompactedEntriesSize, splitTopicAndPartitionIndexLabel);
long[] compactionLatencyBuckets = stats.compactionLatencyBuckets.getBuckets();
metric(stream, cluster, namespace, topic, "pulsar_compaction_latency_le_0_5",
compactionLatencyBuckets[0], splitTopicAndPartitionIndexLabel);
metric(stream, cluster, namespace, topic, "pulsar_compaction_latency_le_1",
compactionLatencyBuckets[1], splitTopicAndPartitionIndexLabel);
metric(stream, cluster, namespace, topic, "pulsar_compaction_latency_le_5",
compactionLatencyBuckets[2], splitTopicAndPartitionIndexLabel);
metric(stream, cluster, namespace, topic, "pulsar_compaction_latency_le_10",
compactionLatencyBuckets[3], splitTopicAndPartitionIndexLabel);
metric(stream, cluster, namespace, topic, "pulsar_compaction_latency_le_20",
compactionLatencyBuckets[4], splitTopicAndPartitionIndexLabel);
metric(stream, cluster, namespace, topic, "pulsar_compaction_latency_le_50",
compactionLatencyBuckets[5], splitTopicAndPartitionIndexLabel);
metric(stream, cluster, namespace, topic, "pulsar_compaction_latency_le_100",
compactionLatencyBuckets[6], splitTopicAndPartitionIndexLabel);
metric(stream, cluster, namespace, topic, "pulsar_compaction_latency_le_200",
compactionLatencyBuckets[7], splitTopicAndPartitionIndexLabel);
metric(stream, cluster, namespace, topic, "pulsar_compaction_latency_le_1000",
compactionLatencyBuckets[8], splitTopicAndPartitionIndexLabel);
metric(stream, cluster, namespace, topic, "pulsar_compaction_latency_overflow",
compactionLatencyBuckets[9], splitTopicAndPartitionIndexLabel);
metric(stream, cluster, namespace, topic, "pulsar_compaction_latency_sum",
stats.compactionLatencyBuckets.getSum(), splitTopicAndPartitionIndexLabel);
metric(stream, cluster, namespace, topic, "pulsar_compaction_latency_count",
stats.compactionLatencyBuckets.getCount(), splitTopicAndPartitionIndexLabel);
}
}
static void metricType(SimpleTextOutputStream stream, String name) {
if (!metricWithTypeDefinition.containsKey(name)) {
metricWithTypeDefinition.put(name, "gauge");
stream.write("# TYPE ").write(name).write(" gauge\n");
}
}
private static void metric(SimpleTextOutputStream stream, String cluster, String namespace, String topic,
String name, double value, boolean splitTopicAndPartitionIndexLabel) {
metricType(stream, name);
appendRequiredLabels(stream, cluster, namespace, topic, name, splitTopicAndPartitionIndexLabel).write("\"} ");
stream.write(value);
appendEndings(stream);
}
private static void metric(SimpleTextOutputStream stream, String cluster, String namespace, String topic,
String subscription, String name, long value, boolean splitTopicAndPartitionIndexLabel) {
metricType(stream, name);
appendRequiredLabels(stream, cluster, namespace, topic, name, splitTopicAndPartitionIndexLabel)
.write("\",subscription=\"").write(subscription).write("\"} ");
stream.write(value);
appendEndings(stream);
}
private static void metric(SimpleTextOutputStream stream, String cluster, String namespace, String topic,
String producerName, long produceId, String name, double value, boolean splitTopicAndPartitionIndexLabel) {
metricType(stream, name);
appendRequiredLabels(stream, cluster, namespace, topic, name, splitTopicAndPartitionIndexLabel)
.write("\",producer_name=\"").write(producerName)
.write("\",producer_id=\"").write(produceId).write("\"} ");
stream.write(value);
appendEndings(stream);
}
private static void metric(SimpleTextOutputStream stream, String cluster, String namespace, String topic,
String subscription, String name, double value, boolean splitTopicAndPartitionIndexLabel) {
metricType(stream, name);
appendRequiredLabels(stream, cluster, namespace, topic, name, splitTopicAndPartitionIndexLabel)
.write("\",subscription=\"").write(subscription).write("\"} ");
stream.write(value);
appendEndings(stream);
}
private static void metric(SimpleTextOutputStream stream, String cluster, String namespace, String topic,
String subscription, String consumerName, long consumerId, String name, long value,
boolean splitTopicAndPartitionIndexLabel) {
metricType(stream, name);
appendRequiredLabels(stream, cluster, namespace, topic, name, splitTopicAndPartitionIndexLabel)
.write("\",subscription=\"").write(subscription)
.write("\",consumer_name=\"").write(consumerName).write("\",consumer_id=\"").write(consumerId)
.write("\"} ");
stream.write(value);
appendEndings(stream);
}
private static void metric(SimpleTextOutputStream stream, String cluster, String namespace, String topic,
String subscription, String consumerName, long consumerId, String name, double value,
boolean splitTopicAndPartitionIndexLabel) {
metricType(stream, name);
appendRequiredLabels(stream, cluster, namespace, topic, name, splitTopicAndPartitionIndexLabel)
.write("\",subscription=\"").write(subscription)
.write("\",consumer_name=\"").write(consumerName).write("\",consumer_id=\"")
.write(consumerId).write("\"} ");
stream.write(value);
appendEndings(stream);
}
private static void metricWithRemoteCluster(SimpleTextOutputStream stream, String cluster, String namespace,
String topic, String name, String remoteCluster, double value, boolean splitTopicAndPartitionIndexLabel) {
metricType(stream, name);
appendRequiredLabels(stream, cluster, namespace, topic, name, splitTopicAndPartitionIndexLabel)
.write("\",remote_cluster=\"").write(remoteCluster).write("\"} ");
stream.write(value);
appendEndings(stream);
}
private static SimpleTextOutputStream appendRequiredLabels(SimpleTextOutputStream stream, String cluster,
String namespace, String topic, String name, boolean splitTopicAndPartitionIndexLabel) {
stream.write(name).write("{cluster=\"").write(cluster).write("\",namespace=\"").write(namespace);
if (splitTopicAndPartitionIndexLabel) {
int index = topic.indexOf(PARTITIONED_TOPIC_SUFFIX);
if (index > 0) {
stream.write("\",topic=\"").write(topic.substring(0, index)).write("\",partition=\"")
.write(topic.substring(index + PARTITIONED_TOPIC_SUFFIX.length()));
} else {
stream.write("\",topic=\"").write(topic).write("\",partition=\"").write("-1");
}
} else {
stream.write("\",topic=\"").write(topic);
}
return stream;
}
private static void appendEndings(SimpleTextOutputStream stream) {
stream.write(' ').write(System.currentTimeMillis()).write('\n');
}
}