SLING-8934 Create new metric for the number of subscribers of distribution publisher (#18)


diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/publisher/DistributionPublisher.java b/src/main/java/org/apache/sling/distribution/journal/impl/publisher/DistributionPublisher.java
index f5ac277..52b6a7a 100644
--- a/src/main/java/org/apache/sling/distribution/journal/impl/publisher/DistributionPublisher.java
+++ b/src/main/java/org/apache/sling/distribution/journal/impl/publisher/DistributionPublisher.java
@@ -43,6 +43,7 @@
 import javax.annotation.ParametersAreNonnullByDefault;
 import javax.management.NotCompliantMBeanException;
 
+import org.apache.commons.io.IOUtils;
 import org.apache.sling.distribution.journal.impl.event.DistributionEvent;
 import org.apache.sling.distribution.journal.impl.queue.PubQueueProvider;
 import org.apache.sling.distribution.journal.impl.shared.AgentState;
@@ -136,6 +137,8 @@
 
     private JMXRegistration reg;
 
+    private DistributionMetricsService.GaugeService<Integer> subscriberCountGauge;
+
     public DistributionPublisher() {
         log = new DefaultDistributionLog(pubAgentName, this.getClass(), DefaultDistributionLog.LogLevel.INFO);
         REQ_TYPES.put(ADD,    this::sendAndWait);
@@ -146,6 +149,7 @@
     @Activate
     public void activate(PublisherConfiguration config, BundleContext context) {
         requireNonNull(factory);
+        requireNonNull(distributionMetricsService);
         pubAgentName = requireNonNull(config.name());
 
         queuedTimeout = config.queuedTimeout();
@@ -167,6 +171,11 @@
         
         String msg = String.format("Started Publisher agent %s with packageBuilder %s, queuedTimeout %s",
                 pubAgentName, pkgType, queuedTimeout);
+        subscriberCountGauge = distributionMetricsService.createGauge(
+                DistributionMetricsService.PUB_COMPONENT + ".subscriber_count;pub_name=" + pubAgentName,
+                "Current number of publish subscribers",
+                () -> discoveryService.getTopologyView().getSubscribedAgentIds().size()
+        );
         log.info(msg);
     }
 
@@ -176,6 +185,7 @@
         componentReg.unregister();
         String msg = String.format("Stopped Publisher agent %s with packageBuilder %s, queuedTimeout %s",
                 pubAgentName, pkgType, queuedTimeout);
+        IOUtils.closeQuietly(subscriberCountGauge);
         log.info(msg);
     }
     
diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/shared/DistributionMetricsService.java b/src/main/java/org/apache/sling/distribution/journal/impl/shared/DistributionMetricsService.java
index 23c09ff..16504e9 100644
--- a/src/main/java/org/apache/sling/distribution/journal/impl/shared/DistributionMetricsService.java
+++ b/src/main/java/org/apache/sling/distribution/journal/impl/shared/DistributionMetricsService.java
@@ -46,7 +46,7 @@
 
     public static final String BASE_COMPONENT = "distribution.journal";
 
-    private static final String PUB_COMPONENT = BASE_COMPONENT + ".publisher";
+    public static final String PUB_COMPONENT = BASE_COMPONENT + ".publisher";
 
     public static final String SUB_COMPONENT = BASE_COMPONENT + ".subscriber";