blob: e9811b8079b794214565370fcdc01271cc557c7a [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.broker.stats.prometheus;
import java.util.HashMap;
import java.util.Map;
import org.apache.bookkeeper.mledger.util.StatsBuckets;
import org.apache.pulsar.compaction.CompactionRecord;
public class AggregatedNamespaceStats {
public int topicsCount;
public int subscriptionsCount;
public int producersCount;
public int consumersCount;
public double rateIn;
public double rateOut;
public double throughputIn;
public double throughputOut;
public long messageAckRate;
public long bytesInCounter;
public long msgInCounter;
public long bytesOutCounter;
public long msgOutCounter;
public ManagedLedgerStats managedLedgerStats = new ManagedLedgerStats();
public long msgBacklog;
public long msgDelayed;
public long ongoingTxnCount;
public long abortedTxnCount;
public long committedTxnCount;
long backlogQuotaLimit;
long backlogQuotaLimitTime;
public Map<String, AggregatedReplicationStats> replicationStats = new HashMap<>();
public Map<String, AggregatedSubscriptionStats> subscriptionStats = new HashMap<>();
long compactionRemovedEventCount;
long compactionSucceedCount;
long compactionFailedCount;
long compactionDurationTimeInMills;
double compactionReadThroughput;
double compactionWriteThroughput;
long compactionCompactedEntriesCount;
long compactionCompactedEntriesSize;
StatsBuckets compactionLatencyBuckets = new StatsBuckets(CompactionRecord.WRITE_LATENCY_BUCKETS_USEC);
int delayedTrackerMemoryUsage;
void updateStats(TopicStats stats) {
topicsCount++;
subscriptionsCount += stats.subscriptionsCount;
producersCount += stats.producersCount;
consumersCount += stats.consumersCount;
rateIn += stats.rateIn;
rateOut += stats.rateOut;
throughputIn += stats.throughputIn;
throughputOut += stats.throughputOut;
bytesInCounter += stats.bytesInCounter;
msgInCounter += stats.msgInCounter;
bytesOutCounter += stats.bytesOutCounter;
msgOutCounter += stats.msgOutCounter;
delayedTrackerMemoryUsage += stats.delayedTrackerMemoryUsage;
this.ongoingTxnCount += stats.ongoingTxnCount;
this.abortedTxnCount += stats.abortedTxnCount;
this.committedTxnCount += stats.committedTxnCount;
managedLedgerStats.storageSize += stats.managedLedgerStats.storageSize;
managedLedgerStats.storageLogicalSize += stats.managedLedgerStats.storageLogicalSize;
managedLedgerStats.backlogSize += stats.managedLedgerStats.backlogSize;
managedLedgerStats.offloadedStorageUsed += stats.managedLedgerStats.offloadedStorageUsed;
backlogQuotaLimit = Math.max(backlogQuotaLimit, stats.backlogQuotaLimit);
backlogQuotaLimitTime = Math.max(backlogQuotaLimitTime, stats.backlogQuotaLimitTime);
managedLedgerStats.storageWriteRate += stats.managedLedgerStats.storageWriteRate;
managedLedgerStats.storageReadRate += stats.managedLedgerStats.storageReadRate;
msgBacklog += stats.msgBacklog;
managedLedgerStats.storageWriteLatencyBuckets.addAll(stats.managedLedgerStats.storageWriteLatencyBuckets);
managedLedgerStats.storageLedgerWriteLatencyBuckets
.addAll(stats.managedLedgerStats.storageLedgerWriteLatencyBuckets);
managedLedgerStats.entrySizeBuckets.addAll(stats.managedLedgerStats.entrySizeBuckets);
stats.replicationStats.forEach((n, as) -> {
AggregatedReplicationStats replStats =
replicationStats.computeIfAbsent(n, k -> new AggregatedReplicationStats());
replStats.msgRateIn += as.msgRateIn;
replStats.msgRateOut += as.msgRateOut;
replStats.msgThroughputIn += as.msgThroughputIn;
replStats.msgThroughputOut += as.msgThroughputOut;
replStats.replicationBacklog += as.replicationBacklog;
replStats.msgRateExpired += as.msgRateExpired;
replStats.connectedCount += as.connectedCount;
replStats.replicationDelayInSeconds += as.replicationDelayInSeconds;
});
stats.subscriptionStats.forEach((n, as) -> {
AggregatedSubscriptionStats subsStats =
subscriptionStats.computeIfAbsent(n, k -> new AggregatedSubscriptionStats());
msgDelayed += as.msgDelayed;
subsStats.blockedSubscriptionOnUnackedMsgs = as.blockedSubscriptionOnUnackedMsgs;
subsStats.msgBacklog += as.msgBacklog;
subsStats.msgBacklogNoDelayed += as.msgBacklogNoDelayed;
subsStats.msgDelayed += as.msgDelayed;
subsStats.msgRateRedeliver += as.msgRateRedeliver;
subsStats.unackedMessages += as.unackedMessages;
as.consumerStat.forEach((c, v) -> {
AggregatedConsumerStats consumerStats =
subsStats.consumerStat.computeIfAbsent(c, k -> new AggregatedConsumerStats());
consumerStats.blockedSubscriptionOnUnackedMsgs = v.blockedSubscriptionOnUnackedMsgs;
consumerStats.msgRateRedeliver += v.msgRateRedeliver;
consumerStats.unackedMessages += v.unackedMessages;
messageAckRate += v.msgAckRate;
});
});
compactionRemovedEventCount += stats.compactionRemovedEventCount;
compactionSucceedCount += stats.compactionSucceedCount;
compactionFailedCount += stats.compactionFailedCount;
compactionDurationTimeInMills += stats.compactionDurationTimeInMills;
compactionReadThroughput += stats.compactionReadThroughput;
compactionWriteThroughput += stats.compactionWriteThroughput;
compactionCompactedEntriesCount += stats.compactionCompactedEntriesCount;
compactionCompactedEntriesSize += stats.compactionCompactedEntriesSize;
compactionLatencyBuckets.addAll(stats.compactionLatencyBuckets);
}
public void reset() {
managedLedgerStats.reset();
topicsCount = 0;
subscriptionsCount = 0;
producersCount = 0;
consumersCount = 0;
rateIn = 0;
rateOut = 0;
throughputIn = 0;
throughputOut = 0;
msgBacklog = 0;
msgDelayed = 0;
backlogQuotaLimit = 0;
backlogQuotaLimitTime = -1;
replicationStats.clear();
subscriptionStats.clear();
delayedTrackerMemoryUsage = 0;
}
}