SLING-8934 Create new metric for the number of subscribers of distribution publisher (#18)
diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/publisher/DistributionPublisher.java b/src/main/java/org/apache/sling/distribution/journal/impl/publisher/DistributionPublisher.java
index f5ac277..52b6a7a 100644
--- a/src/main/java/org/apache/sling/distribution/journal/impl/publisher/DistributionPublisher.java
+++ b/src/main/java/org/apache/sling/distribution/journal/impl/publisher/DistributionPublisher.java
@@ -43,6 +43,7 @@
import javax.annotation.ParametersAreNonnullByDefault;
import javax.management.NotCompliantMBeanException;
+import org.apache.commons.io.IOUtils;
import org.apache.sling.distribution.journal.impl.event.DistributionEvent;
import org.apache.sling.distribution.journal.impl.queue.PubQueueProvider;
import org.apache.sling.distribution.journal.impl.shared.AgentState;
@@ -136,6 +137,8 @@
private JMXRegistration reg;
+ private DistributionMetricsService.GaugeService<Integer> subscriberCountGauge;
+
public DistributionPublisher() {
log = new DefaultDistributionLog(pubAgentName, this.getClass(), DefaultDistributionLog.LogLevel.INFO);
REQ_TYPES.put(ADD, this::sendAndWait);
@@ -146,6 +149,7 @@
@Activate
public void activate(PublisherConfiguration config, BundleContext context) {
requireNonNull(factory);
+ requireNonNull(distributionMetricsService);
pubAgentName = requireNonNull(config.name());
queuedTimeout = config.queuedTimeout();
@@ -167,6 +171,11 @@
String msg = String.format("Started Publisher agent %s with packageBuilder %s, queuedTimeout %s",
pubAgentName, pkgType, queuedTimeout);
+ subscriberCountGauge = distributionMetricsService.createGauge(
+ DistributionMetricsService.PUB_COMPONENT + ".subscriber_count;pub_name=" + pubAgentName,
+ "Current number of publish subscribers",
+ () -> discoveryService.getTopologyView().getSubscribedAgentIds().size()
+ );
log.info(msg);
}
@@ -176,6 +185,7 @@
componentReg.unregister();
String msg = String.format("Stopped Publisher agent %s with packageBuilder %s, queuedTimeout %s",
pubAgentName, pkgType, queuedTimeout);
+ IOUtils.closeQuietly(subscriberCountGauge);
log.info(msg);
}
diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/shared/DistributionMetricsService.java b/src/main/java/org/apache/sling/distribution/journal/impl/shared/DistributionMetricsService.java
index 23c09ff..16504e9 100644
--- a/src/main/java/org/apache/sling/distribution/journal/impl/shared/DistributionMetricsService.java
+++ b/src/main/java/org/apache/sling/distribution/journal/impl/shared/DistributionMetricsService.java
@@ -46,7 +46,7 @@
public static final String BASE_COMPONENT = "distribution.journal";
- private static final String PUB_COMPONENT = BASE_COMPONENT + ".publisher";
+ public static final String PUB_COMPONENT = BASE_COMPONENT + ".publisher";
public static final String SUB_COMPONENT = BASE_COMPONENT + ".subscriber";