SLING-12292 - Add tags to metrics (#138)

* SLING-12292 - Add tags to metrics

* SLING-12292 - Add tags to publish metrics
diff --git a/src/main/java/org/apache/sling/distribution/journal/bookkeeper/BookKeeper.java b/src/main/java/org/apache/sling/distribution/journal/bookkeeper/BookKeeper.java
index 76697c1..98e8298 100644
--- a/src/main/java/org/apache/sling/distribution/journal/bookkeeper/BookKeeper.java
+++ b/src/main/java/org/apache/sling/distribution/journal/bookkeeper/BookKeeper.java
@@ -112,7 +112,7 @@
         this.logSender = logSender;
         this.config = config;
         
-        subscriberMetrics.currentRetries(config.getSubAgentName(), packageRetries::getSum);
+        subscriberMetrics.currentRetries(packageRetries::getSum);
         this.resolverFactory = resolverFactory;
         this.subscriberMetrics = subscriberMetrics;
         // Error queues are enabled when the number
diff --git a/src/main/java/org/apache/sling/distribution/journal/bookkeeper/BookKeeperFactory.java b/src/main/java/org/apache/sling/distribution/journal/bookkeeper/BookKeeperFactory.java
index 584d404..dd1b492 100644
--- a/src/main/java/org/apache/sling/distribution/journal/bookkeeper/BookKeeperFactory.java
+++ b/src/main/java/org/apache/sling/distribution/journal/bookkeeper/BookKeeperFactory.java
@@ -39,9 +39,6 @@
     private ResourceResolverFactory resolverFactory;
     
     @Reference
-    private SubscriberMetrics subscriberMetrics;
-    
-    @Reference
     private EventAdmin eventAdmin;
     
     @Reference
@@ -63,7 +60,8 @@
             DistributionPackageBuilder packageBuilder, 
             BookKeeperConfig config, 
             Consumer<PackageStatusMessage> statusSender,
-            Consumer<LogMessage> logSender
+            Consumer<LogMessage> logSender, 
+            SubscriberMetrics subscriberMetrics
             ) {
         ContentPackageExtractor extractor = new ContentPackageExtractor(
                 packaging,
diff --git a/src/main/java/org/apache/sling/distribution/journal/bookkeeper/SubscriberMetrics.java b/src/main/java/org/apache/sling/distribution/journal/bookkeeper/SubscriberMetrics.java
index 686dd0c..5846cfc 100644
--- a/src/main/java/org/apache/sling/distribution/journal/bookkeeper/SubscriberMetrics.java
+++ b/src/main/java/org/apache/sling/distribution/journal/bookkeeper/SubscriberMetrics.java
@@ -18,8 +18,10 @@
  */
 package org.apache.sling.distribution.journal.bookkeeper;
 
-import static java.lang.String.format;
+import static org.apache.sling.distribution.journal.metrics.TaggedMetrics.getMetricName;
 
+import java.util.Arrays;
+import java.util.List;
 import java.util.function.Supplier;
 
 import org.apache.sling.commons.metrics.Counter;
@@ -28,82 +30,73 @@
 import org.apache.sling.commons.metrics.MetricsService;
 import org.apache.sling.commons.metrics.Timer;
 import org.apache.sling.distribution.journal.messages.PackageStatusMessage.Status;
-import org.osgi.service.component.annotations.Activate;
-import org.osgi.service.component.annotations.Component;
-import org.osgi.service.component.annotations.Reference;
+import org.apache.sling.distribution.journal.metrics.Tag;
 
-@Component(service = SubscriberMetrics.class)
+/**
+ * Metrics for DistributionSubscriber
+ * most metrics will have two parameters:
+ * TAG_SUB_NAME and TAG_EDITABLE
+ */
 public class SubscriberMetrics {
-    public static final String SUB_COMPONENT = "distribution.journal.subscriber";
+    // Name of the subscriber agent
+    private static final String TAG_SUB_NAME = "sub_name";
     
+    // Status of a package : 
+    private static final String TAG_STATUS = "status";
+    
+    // Is the queue editable (true, false)
+    private static final String TAG_EDITABLE = "editable";
+    
+    public static final String SUB_COMPONENT = "distribution.journal.subscriber.";
+    
+    private static final String PACKAGE_STATUS_COUNT = SUB_COMPONENT + "package_status_count";
+    
+    // Number of packages with at least one failure to apply 
+    private static final String CURRENT_RETRIES = SUB_COMPONENT + "current_retries";
+
+    // Cumulated size of all packages (parameters: TAG_SUB_NAME, editable (golden publish))
+    private static final String IMPORTED_PACKAGE_SIZE = SUB_COMPONENT + "imported_package_size";
+    private static final String ITEMS_BUFFER_SIZE = SUB_COMPONENT + "items_buffer_size";
+
+    // Increased on every failure to apply a package
+    private static final String FAILED_PACKAGE_IMPORTS = SUB_COMPONENT + "failed_package_imports";
+    
+    // Increased when a package failed before but then succeeded (parameters: agent, editable (golden publish))
+    private static final String TRANSIENT_IMPORT_ERRORS = SUB_COMPONENT + "transient_import_errors";
+
+    // Only counted in error queue setup
+    private static final String PERMANENT_IMPORT_ERRORS = SUB_COMPONENT + "permanent_import_errors";
+
+    private static final String IMPORT_PRE_PROCESS_REQUEST_COUNT = SUB_COMPONENT + "import_pre_process_request_count";
+    private static final String IMPORT_POST_PROCESS_SUCCESS_COUNT = SUB_COMPONENT + "import_post_process_success_count";
+    private static final String IMPORT_POST_PROCESS_REQUEST_COUNT = SUB_COMPONENT + "import_post_process_request_count";
+    private static final String INVALIDATION_PROCESS_SUCCESS_COUNT = SUB_COMPONENT + "invalidation_process_success_count";
+    private static final String INVALIDATION_PROCESS_REQUEST_COUNT = SUB_COMPONENT + "invalidation_process_request_count";
+    private static final String IMPORT_PRE_PROCESS_SUCCESS_COUNT = SUB_COMPONENT + "import_pre_process_success_count";
+
+    private static final String IMPORTED_PACKAGE_DURATION = SUB_COMPONENT + "imported_package_duration";
+    private static final String REMOVED_PACKAGE_DURATION = SUB_COMPONENT + "removed_package_duration";
+    private static final String REMOVED_FAILED_PACKAGE_DURATION = SUB_COMPONENT + "removed_failed_package_duration";
+    private static final String SEND_STORED_STATUS_DURATION = SUB_COMPONENT + "send_stored_status_duration";
+    private static final String PROCESS_QUEUE_ITEM_DURATION = SUB_COMPONENT + "process_queue_item_duration";
+    private static final String REQUEST_DISTRIBUTED_DURATION = SUB_COMPONENT + "request_distributed_duration";
+    private static final String PACKAGE_JOURNAL_DISTRIBUTION_DURATION = SUB_COMPONENT + "package_journal_distribution_duration";
+    private static final String IMPORT_PRE_PROCESS_DURATION = SUB_COMPONENT + "import_pre_process_duration";
+    private static final String IMPORT_POST_PROCESS_DURATION = SUB_COMPONENT + "import_post_process_duration";
+    private static final String INVALIDATION_PROCESS_DURATION = SUB_COMPONENT + "invalidation_process_duration";
+
     private final MetricsService metricsService;
+    private final Tag tagSubName;
+    private final Tag tagEditable;
+    private final List<Tag> tags;
 
-    private final Histogram importedPackageSize;
-
-    private final  Counter itemsBufferSize;
-
-    private final  Timer removedPackageDuration;
-
-    private final  Timer removedFailedPackageDuration;
-
-    private final  Timer importedPackageDuration;
-
-    private final  Meter failedPackageImports;
-
-    private final  Timer sendStoredStatusDuration;
-
-    private final  Timer processQueueItemDuration;
-
-    private final  Timer packageDistributedDuration;
-
-    private final  Timer packageJournalDistributionDuration;
-
-    private final  Timer importPreProcessDuration;
-
-    private final  Counter importPreProcessSuccess;
-
-    private final  Counter importPreProcessRequest;
-
-    private final  Timer importPostProcessDuration;
-    
-    private final  Counter importPostProcessSuccess;
-
-    private final  Counter importPostProcessRequest;
-
-    private final  Timer invalidationProcessDuration;
-
-    private final  Counter invalidationProcessSuccess;
-
-    private final  Counter invalidationProcessRequest;
-
-    private final  Counter transientImportErrors;
-
-    private final  Counter permanentImportErrors;
-
-    @Activate
-    public SubscriberMetrics(@Reference MetricsService metricsService) {
+    public SubscriberMetrics(MetricsService metricsService, String subAgentName, boolean editable) {
         this.metricsService = metricsService;
-        importedPackageSize = metricsService.histogram(getMetricName("imported_package_size"));
-        itemsBufferSize = metricsService.counter(getMetricName("items_buffer_size"));
-        importedPackageDuration = metricsService.timer(getMetricName("imported_package_duration"));
-        removedPackageDuration = metricsService.timer(getMetricName("removed_package_duration"));
-        removedFailedPackageDuration = metricsService.timer(getMetricName("removed_failed_package_duration"));
-        failedPackageImports = metricsService.meter(getMetricName("failed_package_imports"));
-        sendStoredStatusDuration = metricsService.timer(getMetricName("send_stored_status_duration"));
-        processQueueItemDuration = metricsService.timer(getMetricName("process_queue_item_duration"));
-        packageDistributedDuration = metricsService.timer(getMetricName("request_distributed_duration"));
-        packageJournalDistributionDuration = metricsService.timer(getMetricName("package_journal_distribution_duration"));
-        importPreProcessDuration = metricsService.timer(getMetricName("import_pre_process_duration"));
-        importPreProcessSuccess = metricsService.counter(getMetricName("import_pre_process_success_count"));
-        importPreProcessRequest = metricsService.counter(getMetricName("import_pre_process_request_count"));
-        importPostProcessDuration = metricsService.timer(getMetricName("import_post_process_duration"));
-        importPostProcessSuccess = metricsService.counter(getMetricName("import_post_process_success_count"));
-        importPostProcessRequest = metricsService.counter(getMetricName("import_post_process_request_count"));
-        invalidationProcessDuration = metricsService.timer(getMetricName("invalidation_process_duration"));
-        invalidationProcessSuccess = metricsService.counter(getMetricName("invalidation_process_success_count"));
-        invalidationProcessRequest = metricsService.counter(getMetricName("invalidation_process_request_count"));
-        transientImportErrors = metricsService.counter(getMetricName("transient_import_errors"));
-        permanentImportErrors = metricsService.counter(getMetricName("permanent_import_errors"));
+        tagSubName = Tag.of(TAG_SUB_NAME, subAgentName);
+        tagEditable = Tag.of(TAG_EDITABLE, Boolean.toString(editable));
+        tags = Arrays.asList(
+                tagSubName, 
+                tagEditable);
     }
 
     /**
@@ -112,7 +105,7 @@
      * @return a Sling Metrics histogram
      */
     public Histogram getImportedPackageSize() {
-        return importedPackageSize;
+        return metricsService.histogram(getMetricName(IMPORTED_PACKAGE_SIZE, tags));
     }
 
     /**
@@ -121,7 +114,7 @@
      * @return a Sling Metrics counter
      */
     public Counter getItemsBufferSize() {
-        return itemsBufferSize;
+        return metricsService.counter(getMetricName(ITEMS_BUFFER_SIZE, tags));
     } 
 
     /**
@@ -130,7 +123,7 @@
      * @return a Sling Metrics timer
      */
     public Timer getImportedPackageDuration() {
-        return importedPackageDuration;
+        return metricsService.timer(getMetricName(IMPORTED_PACKAGE_DURATION, tags));
     }
 
     /**
@@ -139,7 +132,7 @@
      * @return a Sling Metrics timer
      */
     public Timer getRemovedPackageDuration() {
-        return removedPackageDuration;
+        return metricsService.timer(getMetricName(REMOVED_PACKAGE_DURATION, tags));
     }
 
     /**
@@ -148,7 +141,7 @@
      * @return a Sling Metrics timer
      */
     public Timer getRemovedFailedPackageDuration() {
-        return removedFailedPackageDuration;
+        return metricsService.timer(getMetricName(REMOVED_FAILED_PACKAGE_DURATION, tags));
     }
 
     /**
@@ -157,7 +150,7 @@
      * @return a Sling Metrics meter
      */
     public Meter getFailedPackageImports() {
-        return failedPackageImports;
+        return metricsService.meter(getMetricName(FAILED_PACKAGE_IMPORTS, tags));
     }
 
     /**
@@ -166,7 +159,7 @@
      * @return a Sling Metric timer
      */
     public Timer getSendStoredStatusDuration() {
-        return sendStoredStatusDuration;
+        return metricsService.timer(getMetricName(SEND_STORED_STATUS_DURATION, tags));
     }
 
     /**
@@ -175,7 +168,7 @@
      * @return a Sling Metric timer
      */
     public Timer getProcessQueueItemDuration() {
-        return processQueueItemDuration;
+        return metricsService.timer(getMetricName(PROCESS_QUEUE_ITEM_DURATION, tags));
     }
 
     /**
@@ -185,7 +178,7 @@
      * @return a Sling Metric timer
      */
     public Timer getPackageDistributedDuration() {
-        return packageDistributedDuration;
+        return metricsService.timer(getMetricName(REQUEST_DISTRIBUTED_DURATION, tags));
     }
 
     /**
@@ -195,7 +188,7 @@
      * @return a Sling Metrics timer
      */
     public Timer getPackageJournalDistributionDuration() {
-        return packageJournalDistributionDuration;
+        return metricsService.timer(getMetricName(PACKAGE_JOURNAL_DISTRIBUTION_DURATION, tags));
     }
 
     /**
@@ -204,63 +197,58 @@
      * @return a Sling Metric counter
      */
     public Counter getPackageStatusCounter(Status status) {
-        return metricsService.counter(getNameWithLabel(getMetricName("package_status_count"), "status", status.name()));
+        Tag tagStatus = Tag.of(TAG_STATUS, status.name());
+        String name = getMetricName(PACKAGE_STATUS_COUNT, Arrays.asList(tagSubName, tagEditable, tagStatus));
+        return metricsService.counter(name);
     }
 
     public Timer getImportPreProcessDuration() {
-        return importPreProcessDuration;
+        return metricsService.timer(getMetricName(IMPORT_PRE_PROCESS_DURATION, tags));
     }
 
     public Counter getImportPreProcessSuccess() {
-        return importPreProcessSuccess;
+        return metricsService.counter(getMetricName(IMPORT_PRE_PROCESS_SUCCESS_COUNT, tags));
     }
 
     public Counter getImportPreProcessRequest() {
-        return importPreProcessRequest;
+        return metricsService.counter(getMetricName(IMPORT_PRE_PROCESS_REQUEST_COUNT, tags));
     }
 
     public Timer getImportPostProcessDuration() {
-        return importPostProcessDuration;
+        return metricsService.timer(getMetricName(IMPORT_POST_PROCESS_DURATION, tags));
     }
 
     public Counter getImportPostProcessSuccess() {
-        return importPostProcessSuccess;
+        return metricsService.counter(getMetricName(IMPORT_POST_PROCESS_SUCCESS_COUNT, tags));
     }
 
     public Counter getImportPostProcessRequest() {
-        return importPostProcessRequest;
+        return metricsService.counter(getMetricName(IMPORT_POST_PROCESS_REQUEST_COUNT, tags));
     }
 
     public Timer getInvalidationProcessDuration() {
-        return invalidationProcessDuration;
+        return metricsService.timer(getMetricName(INVALIDATION_PROCESS_DURATION, tags));
     }
 
     public Counter getInvalidationProcessSuccess() {
-        return invalidationProcessSuccess;
+        return metricsService.counter(getMetricName(INVALIDATION_PROCESS_SUCCESS_COUNT, tags));
     }
 
     public Counter getInvalidationProcessRequest() {
-        return invalidationProcessRequest;
+        return metricsService.counter(getMetricName(INVALIDATION_PROCESS_REQUEST_COUNT, tags));
     }
 
     public Counter getTransientImportErrors() {
-        return transientImportErrors;
+        return metricsService.counter(getMetricName(TRANSIENT_IMPORT_ERRORS, tags));
     }
 
     public Counter getPermanentImportErrors() { 
-        return permanentImportErrors;
+        return metricsService.counter(getMetricName(PERMANENT_IMPORT_ERRORS, tags));
     }
 
-    public void currentRetries(String subAgentName, Supplier<Integer> retriesCallback) {
-        String nameRetries = SubscriberMetrics.SUB_COMPONENT + ".current_retries;sub_name=" + subAgentName;
-        metricsService.gauge(nameRetries, retriesCallback);
+    public void currentRetries(Supplier<Integer> retriesCallback) {
+        metricsService.gauge(getMetricName(CURRENT_RETRIES, tags), retriesCallback);
     }
     
-    private String getMetricName(String name) {
-        return format("%s.%s", SUB_COMPONENT, name);
-    }
-
-    private String getNameWithLabel(String name, String label, String labelVal) {
-        return format("%s;%s=%s", name, label, labelVal);
-    }
 }
+ 
\ No newline at end of file
diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/discovery/DiscoveryService.java b/src/main/java/org/apache/sling/distribution/journal/impl/discovery/DiscoveryService.java
index ef76bd1..b624d80 100644
--- a/src/main/java/org/apache/sling/distribution/journal/impl/discovery/DiscoveryService.java
+++ b/src/main/java/org/apache/sling/distribution/journal/impl/discovery/DiscoveryService.java
@@ -134,6 +134,10 @@
     public TopologyView getTopologyView() {
         return viewManager.getCurrentView();
     }
+    
+    public int getSubscriberCount(String pubAgentName) {
+        return getTopologyView().getSubscribedAgentIds(pubAgentName).size();
+    }
 
     @Override
     public void run() {
diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/publisher/DistributionPublisher.java b/src/main/java/org/apache/sling/distribution/journal/impl/publisher/DistributionPublisher.java
index 2bf24f5..2adec23 100644
--- a/src/main/java/org/apache/sling/distribution/journal/impl/publisher/DistributionPublisher.java
+++ b/src/main/java/org/apache/sling/distribution/journal/impl/publisher/DistributionPublisher.java
@@ -46,6 +46,7 @@
 import org.apache.sling.distribution.journal.shared.Timed;
 import org.apache.sling.distribution.journal.shared.Topics;
 import org.apache.sling.api.resource.ResourceResolver;
+import org.apache.sling.commons.metrics.MetricsService;
 import org.apache.sling.distribution.DistributionRequest;
 import org.apache.sling.distribution.DistributionRequestState;
 import org.apache.sling.distribution.DistributionResponse;
@@ -123,7 +124,7 @@
             @Reference
             Topics topics,
             @Reference
-            PublishMetrics publishMetrics,
+            MetricsService metricsService,
             @Reference
             PubQueueProvider pubQueueProvider,
             @Reference(target = "(osgi.condition.id=toggle.FT_SLING-12218)", cardinality = OPTIONAL, policyOption = GREEDY)
@@ -131,13 +132,15 @@
             PublisherConfiguration config,
             BundleContext context) {
 
+        pubAgentName = requireNotBlank(config.name());
+
         this.packageBuilder = packageBuilder;
         this.factory = requireNonNull(factory);
         this.eventAdmin = eventAdmin;
-        this.publishMetrics = requireNonNull(publishMetrics);
+        requireNonNull(metricsService);
+        this.publishMetrics = new PublishMetrics(metricsService, pubAgentName);
         this.pubQueueProvider = pubQueueProvider;
 
-        pubAgentName = requireNotBlank(config.name());
         distLog = new DefaultDistributionLog(pubAgentName, this.getClass(), DefaultDistributionLog.LogLevel.INFO);
         distributionLogEventListener = new DistributionLogEventListener(context, distLog, pubAgentName);
 
@@ -148,9 +151,7 @@
         pkgType = packageBuilder.getType();
 
         this.sender = messagingProvider.createSender(topics.getPackageTopic());
-        publishMetrics.subscriberCount(pubAgentName,
-                () -> discoveryService.getTopologyView().getSubscribedAgentIds(pubAgentName).size()
-        );
+        publishMetrics.subscriberCount(() -> discoveryService.getSubscriberCount(pubAgentName));
         
         distLog.info("Started Publisher agent={} with packageBuilder={}, limitEnabled={}, queuedTimeout={}, queueSizeLimit={}, maxQueueSizeDelay={}",
                 pubAgentName, pkgType, limitEnabled, queuedTimeout, queueSizeLimit, maxQueueSizeDelay);
diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/publisher/PublishMetrics.java b/src/main/java/org/apache/sling/distribution/journal/impl/publisher/PublishMetrics.java
index ce53518..ef64ad5 100644
--- a/src/main/java/org/apache/sling/distribution/journal/impl/publisher/PublishMetrics.java
+++ b/src/main/java/org/apache/sling/distribution/journal/impl/publisher/PublishMetrics.java
@@ -18,8 +18,10 @@
  */
 package org.apache.sling.distribution.journal.impl.publisher;
 
-import static java.lang.String.format;
+import static org.apache.sling.distribution.journal.metrics.TaggedMetrics.getMetricName;
 
+import java.util.Arrays;
+import java.util.List;
 import java.util.function.Supplier;
 
 import org.apache.sling.commons.metrics.Counter;
@@ -27,43 +29,27 @@
 import org.apache.sling.commons.metrics.Meter;
 import org.apache.sling.commons.metrics.MetricsService;
 import org.apache.sling.commons.metrics.Timer;
-import org.osgi.service.component.annotations.Activate;
-import org.osgi.service.component.annotations.Component;
-import org.osgi.service.component.annotations.Reference;
+import org.apache.sling.distribution.journal.metrics.Tag;
 
-@Component(service = PublishMetrics.class)
 public class PublishMetrics {
+    private static final String TAG_AGENT_NAME = "pub_name";
 
-    public static final String BASE_COMPONENT = "distribution.journal";
+    public static final String PUB_COMPONENT = "distribution.journal.publisher.";
+    private static final String EXPORTED_PACKAGE_SIZE = PUB_COMPONENT + "exported_package_size";
+    private static final String ACCEPTED_REQUESTS = PUB_COMPONENT + "accepted_requests";
+    private static final String DROPPED_REQUESTS = PUB_COMPONENT + "dropped_requests";
+    private static final String BUILD_PACKAGE_DURATION = PUB_COMPONENT + "build_package_duration";
+    private static final String ENQUEUE_PACKAGE_DURATION = PUB_COMPONENT + "enqueue_package_duration";
+    private static final String QUEUE_CACHE_FETCH_COUNT = PUB_COMPONENT + "queue_cache_fetch_count";
+    private static final String QUEUE_ACCESS_ERROR_COUNT = PUB_COMPONENT + "queue_access_error_count";
+    private static final String SUBSCRIBER_COUNT = PUB_COMPONENT + "subscriber_count";
 
-    public static final String PUB_COMPONENT = BASE_COMPONENT + ".publisher";
-
+    private final List<Tag> tags;
     private final MetricsService metricsService;
 
-    private final  Histogram exportedPackageSize;
-
-    private final  Meter acceptedRequests;
-
-    private final  Meter droppedRequests;
-
-    private final  Timer buildPackageDuration;
-
-    private final  Timer enqueuePackageDuration;
-
-    private final  Counter queueCacheFetchCount;
-
-    private final  Counter queueAccessErrorCount;
-
-    @Activate
-    public PublishMetrics(@Reference MetricsService metricsService) {
+    public PublishMetrics(MetricsService metricsService, String pubAgentName) {
+        this.tags = Arrays.asList(Tag.of(TAG_AGENT_NAME, pubAgentName));
         this.metricsService = metricsService;
-        exportedPackageSize = metricsService.histogram(getMetricName("exported_package_size"));
-        acceptedRequests = metricsService.meter(getMetricName("accepted_requests"));
-        droppedRequests = metricsService.meter(getMetricName("dropped_requests"));
-        buildPackageDuration = metricsService.timer(getMetricName("build_package_duration"));
-        enqueuePackageDuration = metricsService.timer(getMetricName("enqueue_package_duration"));
-        queueCacheFetchCount = metricsService.counter(getMetricName("queue_cache_fetch_count"));
-        queueAccessErrorCount = metricsService.counter(getMetricName("queue_access_error_count"));
     }
 
     /**
@@ -72,7 +58,7 @@
      * @return a Sling Metrics histogram
      */
     public Histogram getExportedPackageSize() {
-        return exportedPackageSize;
+        return metricsService.histogram(getMetricName(EXPORTED_PACKAGE_SIZE, tags));
     }
 
     /**
@@ -81,7 +67,7 @@
      * @return a Sling Metrics meter
      */
     public Meter getAcceptedRequests() {
-        return acceptedRequests;
+        return metricsService.meter(getMetricName(ACCEPTED_REQUESTS, tags));
     }
 
     /**
@@ -90,7 +76,7 @@
      * @return a Sling Metrics meter
      */
     public Meter getDroppedRequests() {
-        return droppedRequests;
+        return metricsService.meter(getMetricName(DROPPED_REQUESTS, tags));
     }
 
     /**
@@ -99,7 +85,7 @@
      * @return a Sling Metric timer
      */
     public Timer getBuildPackageDuration() {
-        return buildPackageDuration;
+        return metricsService.timer(getMetricName(BUILD_PACKAGE_DURATION, tags));
     }
 
     /**
@@ -108,7 +94,7 @@
      * @return a Sling Metric timer
      */
     public Timer getEnqueuePackageDuration() {
-        return enqueuePackageDuration;
+        return metricsService.timer(getMetricName(ENQUEUE_PACKAGE_DURATION, tags));
     }
 
     /**
@@ -117,7 +103,7 @@
      * @return a Sling Metric counter
      */
     public Counter getQueueCacheFetchCount() {
-        return queueCacheFetchCount;
+        return metricsService.counter(getMetricName(QUEUE_CACHE_FETCH_COUNT, tags));
     }
 
     /**
@@ -126,17 +112,11 @@
      * @return a Sling Metric counter
      */
     public Counter getQueueAccessErrorCount() {
-        return queueAccessErrorCount;
+        return metricsService.counter(getMetricName(QUEUE_ACCESS_ERROR_COUNT, tags));
     }
 
-    public void subscriberCount(String pubAgentName, Supplier<Integer> subscriberCountCallback) {
-        metricsService.gauge(PublishMetrics.PUB_COMPONENT + ".subscriber_count;pub_name=" + pubAgentName,
-                subscriberCountCallback);
-        
-    }
-
-    private String getMetricName(String name) {
-        return format("%s.%s", PUB_COMPONENT, name);
+    public void subscriberCount(Supplier<Integer> subscriberCountCallback) {
+        metricsService.gauge(getMetricName(SUBSCRIBER_COUNT, tags), subscriberCountCallback);
     }
 
 }
diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/DistributionSubscriber.java b/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/DistributionSubscriber.java
index f9fc84f..ff33c6f 100644
--- a/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/DistributionSubscriber.java
+++ b/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/DistributionSubscriber.java
@@ -52,6 +52,7 @@
 import org.apache.jackrabbit.util.Text;
 import org.apache.sling.api.resource.LoginException;
 import org.apache.sling.api.resource.PersistenceException;
+import org.apache.sling.commons.metrics.MetricsService;
 import org.apache.sling.commons.metrics.Timer;
 import org.apache.sling.distribution.ImportPostProcessException;
 import org.apache.sling.distribution.agent.DistributionAgentState;
@@ -119,7 +120,7 @@
     private Precondition precondition;
 
     @Reference
-    private SubscriberMetrics subscriberMetrics;
+    private MetricsService metricsService;
 
     @Reference
     BookKeeperFactory bookKeeperFactory;
@@ -127,6 +128,8 @@
     @Reference
     private SubscriberReadyStore subscriberReadyStore;
 
+    private SubscriberMetrics subscriberMetrics;
+
     private volatile Closeable idleReadyCheck; // NOSONAR
 
     private volatile IdleCheck idleCheck; // NOSONAR
@@ -162,12 +165,15 @@
         subAgentName = requireNotBlank(config.name());
         requireNonNull(config);
         requireNonNull(context);
+        requireNonNull(metricsService);
         requireNonNull(packageBuilder);
         requireNonNull(slingSettings);
         requireNonNull(messagingProvider);
         requireNonNull(topics);
         requireNonNull(precondition);
         requireNonNull(bookKeeperFactory);
+        
+        this.subscriberMetrics = new SubscriberMetrics(metricsService, subAgentName, config.editable());
 
         if (config.editable()) {
             commandPoller = new CommandPoller(messagingProvider, topics, subSlingId, subAgentName, delay::signal);
@@ -197,7 +203,7 @@
                 config.packageHandling(),
                 packageNodeName,
                 config.contentPackageExtractorOverwritePrimaryTypesOfFolders());
-        bookKeeper = bookKeeperFactory.create(packageBuilder, bkConfig, statusSender, logSender);
+        bookKeeper = bookKeeperFactory.create(packageBuilder, bkConfig, statusSender, logSender, this.subscriberMetrics);
 
         long startOffset = bookKeeper.loadOffset() + 1;
         String assign = startOffset > 0 ? messagingProvider.assignTo(startOffset) : null;
diff --git a/src/main/java/org/apache/sling/distribution/journal/metrics/TaggedMetrics.java b/src/main/java/org/apache/sling/distribution/journal/metrics/TaggedMetrics.java
index 0c52dd7..b34f294 100644
--- a/src/main/java/org/apache/sling/distribution/journal/metrics/TaggedMetrics.java
+++ b/src/main/java/org/apache/sling/distribution/journal/metrics/TaggedMetrics.java
@@ -18,6 +18,7 @@
  */
 package org.apache.sling.distribution.journal.metrics;
 
+import java.util.Collections;
 import java.util.List;
 import java.util.stream.Collectors;
 
@@ -37,4 +38,8 @@
         log.debug("metric={}", metric);
         return metric;
     }
+    
+    public static String getMetricName(String metricName, Tag tag) {
+        return getMetricName(metricName, Collections.singletonList(tag));
+    }
 }
diff --git a/src/test/java/org/apache/sling/distribution/journal/bookkeeper/BookKeeperTest.java b/src/test/java/org/apache/sling/distribution/journal/bookkeeper/BookKeeperTest.java
index bf72a26..2663457 100644
--- a/src/test/java/org/apache/sling/distribution/journal/bookkeeper/BookKeeperTest.java
+++ b/src/test/java/org/apache/sling/distribution/journal/bookkeeper/BookKeeperTest.java
@@ -88,9 +88,8 @@
 
     @Before
     public void before() {
-        subscriberMetrics = new SubscriberMetrics(MetricsService.NOOP);
-
         BookKeeperConfig bkConfig = new BookKeeperConfig("subAgentName", "subSlingId", true, 10, PackageHandling.Extract, "package", true);
+        subscriberMetrics = new SubscriberMetrics(MetricsService.NOOP, bkConfig.getSubAgentName(), bkConfig.isEditable());
         bookKeeper = new BookKeeper(resolverFactory, subscriberMetrics, packageHandler, eventAdmin, sender, logSender, bkConfig,
                 importPreProcessor, importPostProcessor, invalidationProcessor);
     }
diff --git a/src/test/java/org/apache/sling/distribution/journal/impl/publisher/DistributionPublisherTest.java b/src/test/java/org/apache/sling/distribution/journal/impl/publisher/DistributionPublisherTest.java
index d7541e9..206cb29 100644
--- a/src/test/java/org/apache/sling/distribution/journal/impl/publisher/DistributionPublisherTest.java
+++ b/src/test/java/org/apache/sling/distribution/journal/impl/publisher/DistributionPublisherTest.java
@@ -47,7 +47,6 @@
 
 import org.apache.commons.lang3.time.StopWatch;
 import org.apache.sling.api.resource.ResourceResolver;
-import org.apache.sling.commons.metrics.Counter;
 import org.apache.sling.commons.metrics.MetricsService;
 import org.apache.sling.commons.metrics.internal.MetricsServiceImpl;
 import org.apache.sling.distribution.DistributionRequest;
@@ -108,8 +107,6 @@
     @Mock
     private DistributionPackageBuilder packageBuilder;
 
-    private PublishMetrics publishMetrics;
-
     private OsgiContext context = new OsgiContext();
 
     private DistributionPublisher publisher;
@@ -128,11 +125,12 @@
 
     @Spy
     private Topics topics = new Topics();
+    
+    private MetricsService metricsService;
 
     @Before
     public void before() throws Exception {
-        MetricsService metricsService = context.registerInjectActivateService(MetricsServiceImpl.class);
-        publishMetrics = new PublishMetrics(metricsService);
+        metricsService = context.registerInjectActivateService(MetricsServiceImpl.class);
         when(packageBuilder.getType()).thenReturn("journal");
         Map<String, String> props = Map.of("name", PUB1AGENT1,
                 "maxQueueSizeDelay", "1000");
@@ -141,7 +139,7 @@
         BundleContext bcontext = context.bundleContext();
         when(messagingProvider.<PackageMessage>createSender(Mockito.anyString())).thenReturn(sender);
         publisher = new DistributionPublisher(messagingProvider, packageBuilder, discoveryService, factory,
-                eventAdmin, topics, publishMetrics, pubQueueProvider, Condition.INSTANCE, config, bcontext);
+                eventAdmin, topics, metricsService, pubQueueProvider, Condition.INSTANCE, config, bcontext);
         when(pubQueueProvider.getQueuedNotifier()).thenReturn(queuedNotifier);
     }
     
@@ -250,8 +248,8 @@
 
         DistributionQueue queue = publisher.getQueue("i_am_not_a_queue");
         assertNull(queue);
-        Counter counter = publishMetrics.getQueueAccessErrorCount();
-        assertEquals("Wrong queue counter expected",1, counter.getCount());
+        long count = getQueueAccessErrorCount();
+        assertEquals("Wrong queue counter expected",1, count);
     }
 
     @Test
@@ -264,8 +262,12 @@
             fail("Expected exception not thrown");
         } catch (RuntimeException expectedException) {
         }
-        Counter counter = publishMetrics.getQueueAccessErrorCount();
-        assertEquals("Wrong getQueue error counter",1, counter.getCount());
+        long count = getQueueAccessErrorCount();
+        assertEquals("Wrong getQueue error counter",1, count);
+    }
+
+    private long getQueueAccessErrorCount() {
+        return new PublishMetrics(metricsService, PUB1AGENT1).getQueueAccessErrorCount().getCount();
     }
 
     @Test(expected = DistributionException.class)
diff --git a/src/test/java/org/apache/sling/distribution/journal/impl/subscriber/SubscriberTest.java b/src/test/java/org/apache/sling/distribution/journal/impl/subscriber/SubscriberTest.java
index c73b4b5..56ab9fc 100644
--- a/src/test/java/org/apache/sling/distribution/journal/impl/subscriber/SubscriberTest.java
+++ b/src/test/java/org/apache/sling/distribution/journal/impl/subscriber/SubscriberTest.java
@@ -74,7 +74,6 @@
 import org.apache.sling.distribution.journal.bookkeeper.BookKeeper;
 import org.apache.sling.distribution.journal.bookkeeper.BookKeeperFactory;
 import org.apache.sling.distribution.journal.bookkeeper.LocalStore;
-import org.apache.sling.distribution.journal.bookkeeper.SubscriberMetrics;
 import org.apache.sling.distribution.journal.shared.NoOpImportPreProcessor;
 import org.apache.sling.distribution.journal.shared.NoOpImportPostProcessor;
 import org.apache.sling.distribution.journal.impl.precondition.Precondition;
@@ -113,7 +112,6 @@
 import org.osgi.service.event.EventAdmin;
 import org.osgi.util.converter.Converters;
 
-@SuppressWarnings("unchecked")
 @RunWith(MockitoJUnitRunner.class)
 public class SubscriberTest {
 
@@ -180,7 +178,7 @@
     private MessageSender<PackageStatusMessage> statusSender;
 
     @Spy
-    private SubscriberMetrics subscriberMetrics = new SubscriberMetrics(MetricsService.NOOP);
+    private MetricsService metricsService = MetricsService.NOOP;
 
     @Spy
     private ImportPreProcessor importPreProcessor = new NoOpImportPreProcessor();
diff --git a/src/test/java/org/apache/sling/distribution/journal/shared/SubscriberMetricsTest.java b/src/test/java/org/apache/sling/distribution/journal/shared/SubscriberMetricsTest.java
index 85eb95c..a3f0878 100644
--- a/src/test/java/org/apache/sling/distribution/journal/shared/SubscriberMetricsTest.java
+++ b/src/test/java/org/apache/sling/distribution/journal/shared/SubscriberMetricsTest.java
@@ -42,7 +42,7 @@
     @Before
     public void before() {
         MetricsService metricsService = MetricsService.NOOP;
-        metrics = new SubscriberMetrics(metricsService);
+        metrics = new SubscriberMetrics(metricsService, "subAgentName", true);
     }
 
     public static void mockBehaviour(MetricsService metricsService) {