SLING-12292 - Add tags to metrics (#138)
* SLING-12292 - Add tags to metrics
* SLING-12292 - Add tags to publish metrics
diff --git a/src/main/java/org/apache/sling/distribution/journal/bookkeeper/BookKeeper.java b/src/main/java/org/apache/sling/distribution/journal/bookkeeper/BookKeeper.java
index 76697c1..98e8298 100644
--- a/src/main/java/org/apache/sling/distribution/journal/bookkeeper/BookKeeper.java
+++ b/src/main/java/org/apache/sling/distribution/journal/bookkeeper/BookKeeper.java
@@ -112,7 +112,7 @@
this.logSender = logSender;
this.config = config;
- subscriberMetrics.currentRetries(config.getSubAgentName(), packageRetries::getSum);
+ subscriberMetrics.currentRetries(packageRetries::getSum);
this.resolverFactory = resolverFactory;
this.subscriberMetrics = subscriberMetrics;
// Error queues are enabled when the number
diff --git a/src/main/java/org/apache/sling/distribution/journal/bookkeeper/BookKeeperFactory.java b/src/main/java/org/apache/sling/distribution/journal/bookkeeper/BookKeeperFactory.java
index 584d404..dd1b492 100644
--- a/src/main/java/org/apache/sling/distribution/journal/bookkeeper/BookKeeperFactory.java
+++ b/src/main/java/org/apache/sling/distribution/journal/bookkeeper/BookKeeperFactory.java
@@ -39,9 +39,6 @@
private ResourceResolverFactory resolverFactory;
@Reference
- private SubscriberMetrics subscriberMetrics;
-
- @Reference
private EventAdmin eventAdmin;
@Reference
@@ -63,7 +60,8 @@
DistributionPackageBuilder packageBuilder,
BookKeeperConfig config,
Consumer<PackageStatusMessage> statusSender,
- Consumer<LogMessage> logSender
+ Consumer<LogMessage> logSender,
+ SubscriberMetrics subscriberMetrics
) {
ContentPackageExtractor extractor = new ContentPackageExtractor(
packaging,
diff --git a/src/main/java/org/apache/sling/distribution/journal/bookkeeper/SubscriberMetrics.java b/src/main/java/org/apache/sling/distribution/journal/bookkeeper/SubscriberMetrics.java
index 686dd0c..5846cfc 100644
--- a/src/main/java/org/apache/sling/distribution/journal/bookkeeper/SubscriberMetrics.java
+++ b/src/main/java/org/apache/sling/distribution/journal/bookkeeper/SubscriberMetrics.java
@@ -18,8 +18,10 @@
*/
package org.apache.sling.distribution.journal.bookkeeper;
-import static java.lang.String.format;
+import static org.apache.sling.distribution.journal.metrics.TaggedMetrics.getMetricName;
+import java.util.Arrays;
+import java.util.List;
import java.util.function.Supplier;
import org.apache.sling.commons.metrics.Counter;
@@ -28,82 +30,73 @@
import org.apache.sling.commons.metrics.MetricsService;
import org.apache.sling.commons.metrics.Timer;
import org.apache.sling.distribution.journal.messages.PackageStatusMessage.Status;
-import org.osgi.service.component.annotations.Activate;
-import org.osgi.service.component.annotations.Component;
-import org.osgi.service.component.annotations.Reference;
+import org.apache.sling.distribution.journal.metrics.Tag;
-@Component(service = SubscriberMetrics.class)
+/**
+ * Metrics for DistributionSubscriber
+ * most metrics will have two parameters:
+ * TAG_SUB_NAME and TAG_EDITABLE
+ */
public class SubscriberMetrics {
- public static final String SUB_COMPONENT = "distribution.journal.subscriber";
+ // Name of the subscriber agent
+ private static final String TAG_SUB_NAME = "sub_name";
+ // Status of a package :
+ private static final String TAG_STATUS = "status";
+
+ // Is the queue editable (true, false)
+ private static final String TAG_EDITABLE = "editable";
+
+ public static final String SUB_COMPONENT = "distribution.journal.subscriber.";
+
+ private static final String PACKAGE_STATUS_COUNT = SUB_COMPONENT + "package_status_count";
+
+ // Number of packages with at least one failure to apply
+ private static final String CURRENT_RETRIES = SUB_COMPONENT + "current_retries";
+
+ // Cumulated size of all packages (parameters: TAG_SUB_NAME, editable (golden publish))
+ private static final String IMPORTED_PACKAGE_SIZE = SUB_COMPONENT + "imported_package_size";
+ private static final String ITEMS_BUFFER_SIZE = SUB_COMPONENT + "items_buffer_size";
+
+ // Increased on every failure to apply a package
+ private static final String FAILED_PACKAGE_IMPORTS = SUB_COMPONENT + "failed_package_imports";
+
+ // Increased when a package failed before but then succeeded (parameters: agent, editable (golden publish))
+ private static final String TRANSIENT_IMPORT_ERRORS = SUB_COMPONENT + "transient_import_errors";
+
+ // Only counted in error queue setup
+ private static final String PERMANENT_IMPORT_ERRORS = SUB_COMPONENT + "permanent_import_errors";
+
+ private static final String IMPORT_PRE_PROCESS_REQUEST_COUNT = SUB_COMPONENT + "import_pre_process_request_count";
+ private static final String IMPORT_POST_PROCESS_SUCCESS_COUNT = SUB_COMPONENT + "import_post_process_success_count";
+ private static final String IMPORT_POST_PROCESS_REQUEST_COUNT = SUB_COMPONENT + "import_post_process_request_count";
+ private static final String INVALIDATION_PROCESS_SUCCESS_COUNT = SUB_COMPONENT + "invalidation_process_success_count";
+ private static final String INVALIDATION_PROCESS_REQUEST_COUNT = SUB_COMPONENT + "invalidation_process_request_count";
+ private static final String IMPORT_PRE_PROCESS_SUCCESS_COUNT = SUB_COMPONENT + "import_pre_process_success_count";
+
+ private static final String IMPORTED_PACKAGE_DURATION = SUB_COMPONENT + "imported_package_duration";
+ private static final String REMOVED_PACKAGE_DURATION = SUB_COMPONENT + "removed_package_duration";
+ private static final String REMOVED_FAILED_PACKAGE_DURATION = SUB_COMPONENT + "removed_failed_package_duration";
+ private static final String SEND_STORED_STATUS_DURATION = SUB_COMPONENT + "send_stored_status_duration";
+ private static final String PROCESS_QUEUE_ITEM_DURATION = SUB_COMPONENT + "process_queue_item_duration";
+ private static final String REQUEST_DISTRIBUTED_DURATION = SUB_COMPONENT + "request_distributed_duration";
+ private static final String PACKAGE_JOURNAL_DISTRIBUTION_DURATION = SUB_COMPONENT + "package_journal_distribution_duration";
+ private static final String IMPORT_PRE_PROCESS_DURATION = SUB_COMPONENT + "import_pre_process_duration";
+ private static final String IMPORT_POST_PROCESS_DURATION = SUB_COMPONENT + "import_post_process_duration";
+ private static final String INVALIDATION_PROCESS_DURATION = SUB_COMPONENT + "invalidation_process_duration";
+
private final MetricsService metricsService;
+ private final Tag tagSubName;
+ private final Tag tagEditable;
+ private final List<Tag> tags;
- private final Histogram importedPackageSize;
-
- private final Counter itemsBufferSize;
-
- private final Timer removedPackageDuration;
-
- private final Timer removedFailedPackageDuration;
-
- private final Timer importedPackageDuration;
-
- private final Meter failedPackageImports;
-
- private final Timer sendStoredStatusDuration;
-
- private final Timer processQueueItemDuration;
-
- private final Timer packageDistributedDuration;
-
- private final Timer packageJournalDistributionDuration;
-
- private final Timer importPreProcessDuration;
-
- private final Counter importPreProcessSuccess;
-
- private final Counter importPreProcessRequest;
-
- private final Timer importPostProcessDuration;
-
- private final Counter importPostProcessSuccess;
-
- private final Counter importPostProcessRequest;
-
- private final Timer invalidationProcessDuration;
-
- private final Counter invalidationProcessSuccess;
-
- private final Counter invalidationProcessRequest;
-
- private final Counter transientImportErrors;
-
- private final Counter permanentImportErrors;
-
- @Activate
- public SubscriberMetrics(@Reference MetricsService metricsService) {
+ public SubscriberMetrics(MetricsService metricsService, String subAgentName, boolean editable) {
this.metricsService = metricsService;
- importedPackageSize = metricsService.histogram(getMetricName("imported_package_size"));
- itemsBufferSize = metricsService.counter(getMetricName("items_buffer_size"));
- importedPackageDuration = metricsService.timer(getMetricName("imported_package_duration"));
- removedPackageDuration = metricsService.timer(getMetricName("removed_package_duration"));
- removedFailedPackageDuration = metricsService.timer(getMetricName("removed_failed_package_duration"));
- failedPackageImports = metricsService.meter(getMetricName("failed_package_imports"));
- sendStoredStatusDuration = metricsService.timer(getMetricName("send_stored_status_duration"));
- processQueueItemDuration = metricsService.timer(getMetricName("process_queue_item_duration"));
- packageDistributedDuration = metricsService.timer(getMetricName("request_distributed_duration"));
- packageJournalDistributionDuration = metricsService.timer(getMetricName("package_journal_distribution_duration"));
- importPreProcessDuration = metricsService.timer(getMetricName("import_pre_process_duration"));
- importPreProcessSuccess = metricsService.counter(getMetricName("import_pre_process_success_count"));
- importPreProcessRequest = metricsService.counter(getMetricName("import_pre_process_request_count"));
- importPostProcessDuration = metricsService.timer(getMetricName("import_post_process_duration"));
- importPostProcessSuccess = metricsService.counter(getMetricName("import_post_process_success_count"));
- importPostProcessRequest = metricsService.counter(getMetricName("import_post_process_request_count"));
- invalidationProcessDuration = metricsService.timer(getMetricName("invalidation_process_duration"));
- invalidationProcessSuccess = metricsService.counter(getMetricName("invalidation_process_success_count"));
- invalidationProcessRequest = metricsService.counter(getMetricName("invalidation_process_request_count"));
- transientImportErrors = metricsService.counter(getMetricName("transient_import_errors"));
- permanentImportErrors = metricsService.counter(getMetricName("permanent_import_errors"));
+ tagSubName = Tag.of(TAG_SUB_NAME, subAgentName);
+ tagEditable = Tag.of(TAG_EDITABLE, Boolean.toString(editable));
+ tags = Arrays.asList(
+ tagSubName,
+ tagEditable);
}
/**
@@ -112,7 +105,7 @@
* @return a Sling Metrics histogram
*/
public Histogram getImportedPackageSize() {
- return importedPackageSize;
+ return metricsService.histogram(getMetricName(IMPORTED_PACKAGE_SIZE, tags));
}
/**
@@ -121,7 +114,7 @@
* @return a Sling Metrics counter
*/
public Counter getItemsBufferSize() {
- return itemsBufferSize;
+ return metricsService.counter(getMetricName(ITEMS_BUFFER_SIZE, tags));
}
/**
@@ -130,7 +123,7 @@
* @return a Sling Metrics timer
*/
public Timer getImportedPackageDuration() {
- return importedPackageDuration;
+ return metricsService.timer(getMetricName(IMPORTED_PACKAGE_DURATION, tags));
}
/**
@@ -139,7 +132,7 @@
* @return a Sling Metrics timer
*/
public Timer getRemovedPackageDuration() {
- return removedPackageDuration;
+ return metricsService.timer(getMetricName(REMOVED_PACKAGE_DURATION, tags));
}
/**
@@ -148,7 +141,7 @@
* @return a Sling Metrics timer
*/
public Timer getRemovedFailedPackageDuration() {
- return removedFailedPackageDuration;
+ return metricsService.timer(getMetricName(REMOVED_FAILED_PACKAGE_DURATION, tags));
}
/**
@@ -157,7 +150,7 @@
* @return a Sling Metrics meter
*/
public Meter getFailedPackageImports() {
- return failedPackageImports;
+ return metricsService.meter(getMetricName(FAILED_PACKAGE_IMPORTS, tags));
}
/**
@@ -166,7 +159,7 @@
* @return a Sling Metric timer
*/
public Timer getSendStoredStatusDuration() {
- return sendStoredStatusDuration;
+ return metricsService.timer(getMetricName(SEND_STORED_STATUS_DURATION, tags));
}
/**
@@ -175,7 +168,7 @@
* @return a Sling Metric timer
*/
public Timer getProcessQueueItemDuration() {
- return processQueueItemDuration;
+ return metricsService.timer(getMetricName(PROCESS_QUEUE_ITEM_DURATION, tags));
}
/**
@@ -185,7 +178,7 @@
* @return a Sling Metric timer
*/
public Timer getPackageDistributedDuration() {
- return packageDistributedDuration;
+ return metricsService.timer(getMetricName(REQUEST_DISTRIBUTED_DURATION, tags));
}
/**
@@ -195,7 +188,7 @@
* @return a Sling Metrics timer
*/
public Timer getPackageJournalDistributionDuration() {
- return packageJournalDistributionDuration;
+ return metricsService.timer(getMetricName(PACKAGE_JOURNAL_DISTRIBUTION_DURATION, tags));
}
/**
@@ -204,63 +197,58 @@
* @return a Sling Metric counter
*/
public Counter getPackageStatusCounter(Status status) {
- return metricsService.counter(getNameWithLabel(getMetricName("package_status_count"), "status", status.name()));
+ Tag tagStatus = Tag.of(TAG_STATUS, status.name());
+ String name = getMetricName(PACKAGE_STATUS_COUNT, Arrays.asList(tagSubName, tagEditable, tagStatus));
+ return metricsService.counter(name);
}
public Timer getImportPreProcessDuration() {
- return importPreProcessDuration;
+ return metricsService.timer(getMetricName(IMPORT_PRE_PROCESS_DURATION, tags));
}
public Counter getImportPreProcessSuccess() {
- return importPreProcessSuccess;
+ return metricsService.counter(getMetricName(IMPORT_PRE_PROCESS_SUCCESS_COUNT, tags));
}
public Counter getImportPreProcessRequest() {
- return importPreProcessRequest;
+ return metricsService.counter(getMetricName(IMPORT_PRE_PROCESS_REQUEST_COUNT, tags));
}
public Timer getImportPostProcessDuration() {
- return importPostProcessDuration;
+ return metricsService.timer(getMetricName(IMPORT_POST_PROCESS_DURATION, tags));
}
public Counter getImportPostProcessSuccess() {
- return importPostProcessSuccess;
+ return metricsService.counter(getMetricName(IMPORT_POST_PROCESS_SUCCESS_COUNT, tags));
}
public Counter getImportPostProcessRequest() {
- return importPostProcessRequest;
+ return metricsService.counter(getMetricName(IMPORT_POST_PROCESS_REQUEST_COUNT, tags));
}
public Timer getInvalidationProcessDuration() {
- return invalidationProcessDuration;
+ return metricsService.timer(getMetricName(INVALIDATION_PROCESS_DURATION, tags));
}
public Counter getInvalidationProcessSuccess() {
- return invalidationProcessSuccess;
+ return metricsService.counter(getMetricName(INVALIDATION_PROCESS_SUCCESS_COUNT, tags));
}
public Counter getInvalidationProcessRequest() {
- return invalidationProcessRequest;
+ return metricsService.counter(getMetricName(INVALIDATION_PROCESS_REQUEST_COUNT, tags));
}
public Counter getTransientImportErrors() {
- return transientImportErrors;
+ return metricsService.counter(getMetricName(TRANSIENT_IMPORT_ERRORS, tags));
}
public Counter getPermanentImportErrors() {
- return permanentImportErrors;
+ return metricsService.counter(getMetricName(PERMANENT_IMPORT_ERRORS, tags));
}
- public void currentRetries(String subAgentName, Supplier<Integer> retriesCallback) {
- String nameRetries = SubscriberMetrics.SUB_COMPONENT + ".current_retries;sub_name=" + subAgentName;
- metricsService.gauge(nameRetries, retriesCallback);
+ public void currentRetries(Supplier<Integer> retriesCallback) {
+ metricsService.gauge(getMetricName(CURRENT_RETRIES, tags), retriesCallback);
}
- private String getMetricName(String name) {
- return format("%s.%s", SUB_COMPONENT, name);
- }
-
- private String getNameWithLabel(String name, String label, String labelVal) {
- return format("%s;%s=%s", name, label, labelVal);
- }
}
+
\ No newline at end of file
diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/discovery/DiscoveryService.java b/src/main/java/org/apache/sling/distribution/journal/impl/discovery/DiscoveryService.java
index ef76bd1..b624d80 100644
--- a/src/main/java/org/apache/sling/distribution/journal/impl/discovery/DiscoveryService.java
+++ b/src/main/java/org/apache/sling/distribution/journal/impl/discovery/DiscoveryService.java
@@ -134,6 +134,10 @@
public TopologyView getTopologyView() {
return viewManager.getCurrentView();
}
+
+ public int getSubscriberCount(String pubAgentName) {
+ return getTopologyView().getSubscribedAgentIds(pubAgentName).size();
+ }
@Override
public void run() {
diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/publisher/DistributionPublisher.java b/src/main/java/org/apache/sling/distribution/journal/impl/publisher/DistributionPublisher.java
index 2bf24f5..2adec23 100644
--- a/src/main/java/org/apache/sling/distribution/journal/impl/publisher/DistributionPublisher.java
+++ b/src/main/java/org/apache/sling/distribution/journal/impl/publisher/DistributionPublisher.java
@@ -46,6 +46,7 @@
import org.apache.sling.distribution.journal.shared.Timed;
import org.apache.sling.distribution.journal.shared.Topics;
import org.apache.sling.api.resource.ResourceResolver;
+import org.apache.sling.commons.metrics.MetricsService;
import org.apache.sling.distribution.DistributionRequest;
import org.apache.sling.distribution.DistributionRequestState;
import org.apache.sling.distribution.DistributionResponse;
@@ -123,7 +124,7 @@
@Reference
Topics topics,
@Reference
- PublishMetrics publishMetrics,
+ MetricsService metricsService,
@Reference
PubQueueProvider pubQueueProvider,
@Reference(target = "(osgi.condition.id=toggle.FT_SLING-12218)", cardinality = OPTIONAL, policyOption = GREEDY)
@@ -131,13 +132,15 @@
PublisherConfiguration config,
BundleContext context) {
+ pubAgentName = requireNotBlank(config.name());
+
this.packageBuilder = packageBuilder;
this.factory = requireNonNull(factory);
this.eventAdmin = eventAdmin;
- this.publishMetrics = requireNonNull(publishMetrics);
+ requireNonNull(metricsService);
+ this.publishMetrics = new PublishMetrics(metricsService, pubAgentName);
this.pubQueueProvider = pubQueueProvider;
- pubAgentName = requireNotBlank(config.name());
distLog = new DefaultDistributionLog(pubAgentName, this.getClass(), DefaultDistributionLog.LogLevel.INFO);
distributionLogEventListener = new DistributionLogEventListener(context, distLog, pubAgentName);
@@ -148,9 +151,7 @@
pkgType = packageBuilder.getType();
this.sender = messagingProvider.createSender(topics.getPackageTopic());
- publishMetrics.subscriberCount(pubAgentName,
- () -> discoveryService.getTopologyView().getSubscribedAgentIds(pubAgentName).size()
- );
+ publishMetrics.subscriberCount(() -> discoveryService.getSubscriberCount(pubAgentName));
distLog.info("Started Publisher agent={} with packageBuilder={}, limitEnabled={}, queuedTimeout={}, queueSizeLimit={}, maxQueueSizeDelay={}",
pubAgentName, pkgType, limitEnabled, queuedTimeout, queueSizeLimit, maxQueueSizeDelay);
diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/publisher/PublishMetrics.java b/src/main/java/org/apache/sling/distribution/journal/impl/publisher/PublishMetrics.java
index ce53518..ef64ad5 100644
--- a/src/main/java/org/apache/sling/distribution/journal/impl/publisher/PublishMetrics.java
+++ b/src/main/java/org/apache/sling/distribution/journal/impl/publisher/PublishMetrics.java
@@ -18,8 +18,10 @@
*/
package org.apache.sling.distribution.journal.impl.publisher;
-import static java.lang.String.format;
+import static org.apache.sling.distribution.journal.metrics.TaggedMetrics.getMetricName;
+import java.util.Arrays;
+import java.util.List;
import java.util.function.Supplier;
import org.apache.sling.commons.metrics.Counter;
@@ -27,43 +29,27 @@
import org.apache.sling.commons.metrics.Meter;
import org.apache.sling.commons.metrics.MetricsService;
import org.apache.sling.commons.metrics.Timer;
-import org.osgi.service.component.annotations.Activate;
-import org.osgi.service.component.annotations.Component;
-import org.osgi.service.component.annotations.Reference;
+import org.apache.sling.distribution.journal.metrics.Tag;
-@Component(service = PublishMetrics.class)
public class PublishMetrics {
+ private static final String TAG_AGENT_NAME = "pub_name";
- public static final String BASE_COMPONENT = "distribution.journal";
+ public static final String PUB_COMPONENT = "distribution.journal.publisher.";
+ private static final String EXPORTED_PACKAGE_SIZE = PUB_COMPONENT + "exported_package_size";
+ private static final String ACCEPTED_REQUESTS = PUB_COMPONENT + "accepted_requests";
+ private static final String DROPPED_REQUESTS = PUB_COMPONENT + "dropped_requests";
+ private static final String BUILD_PACKAGE_DURATION = PUB_COMPONENT + "build_package_duration";
+ private static final String ENQUEUE_PACKAGE_DURATION = PUB_COMPONENT + "enqueue_package_duration";
+ private static final String QUEUE_CACHE_FETCH_COUNT = PUB_COMPONENT + "queue_cache_fetch_count";
+ private static final String QUEUE_ACCESS_ERROR_COUNT = PUB_COMPONENT + "queue_access_error_count";
+ private static final String SUBSCRIBER_COUNT = PUB_COMPONENT + "subscriber_count";
- public static final String PUB_COMPONENT = BASE_COMPONENT + ".publisher";
-
+ private final List<Tag> tags;
private final MetricsService metricsService;
- private final Histogram exportedPackageSize;
-
- private final Meter acceptedRequests;
-
- private final Meter droppedRequests;
-
- private final Timer buildPackageDuration;
-
- private final Timer enqueuePackageDuration;
-
- private final Counter queueCacheFetchCount;
-
- private final Counter queueAccessErrorCount;
-
- @Activate
- public PublishMetrics(@Reference MetricsService metricsService) {
+ public PublishMetrics(MetricsService metricsService, String pubAgentName) {
+ this.tags = Arrays.asList(Tag.of(TAG_AGENT_NAME, pubAgentName));
this.metricsService = metricsService;
- exportedPackageSize = metricsService.histogram(getMetricName("exported_package_size"));
- acceptedRequests = metricsService.meter(getMetricName("accepted_requests"));
- droppedRequests = metricsService.meter(getMetricName("dropped_requests"));
- buildPackageDuration = metricsService.timer(getMetricName("build_package_duration"));
- enqueuePackageDuration = metricsService.timer(getMetricName("enqueue_package_duration"));
- queueCacheFetchCount = metricsService.counter(getMetricName("queue_cache_fetch_count"));
- queueAccessErrorCount = metricsService.counter(getMetricName("queue_access_error_count"));
}
/**
@@ -72,7 +58,7 @@
* @return a Sling Metrics histogram
*/
public Histogram getExportedPackageSize() {
- return exportedPackageSize;
+ return metricsService.histogram(getMetricName(EXPORTED_PACKAGE_SIZE, tags));
}
/**
@@ -81,7 +67,7 @@
* @return a Sling Metrics meter
*/
public Meter getAcceptedRequests() {
- return acceptedRequests;
+ return metricsService.meter(getMetricName(ACCEPTED_REQUESTS, tags));
}
/**
@@ -90,7 +76,7 @@
* @return a Sling Metrics meter
*/
public Meter getDroppedRequests() {
- return droppedRequests;
+ return metricsService.meter(getMetricName(DROPPED_REQUESTS, tags));
}
/**
@@ -99,7 +85,7 @@
* @return a Sling Metric timer
*/
public Timer getBuildPackageDuration() {
- return buildPackageDuration;
+ return metricsService.timer(getMetricName(BUILD_PACKAGE_DURATION, tags));
}
/**
@@ -108,7 +94,7 @@
* @return a Sling Metric timer
*/
public Timer getEnqueuePackageDuration() {
- return enqueuePackageDuration;
+ return metricsService.timer(getMetricName(ENQUEUE_PACKAGE_DURATION, tags));
}
/**
@@ -117,7 +103,7 @@
* @return a Sling Metric counter
*/
public Counter getQueueCacheFetchCount() {
- return queueCacheFetchCount;
+ return metricsService.counter(getMetricName(QUEUE_CACHE_FETCH_COUNT, tags));
}
/**
@@ -126,17 +112,11 @@
* @return a Sling Metric counter
*/
public Counter getQueueAccessErrorCount() {
- return queueAccessErrorCount;
+ return metricsService.counter(getMetricName(QUEUE_ACCESS_ERROR_COUNT, tags));
}
- public void subscriberCount(String pubAgentName, Supplier<Integer> subscriberCountCallback) {
- metricsService.gauge(PublishMetrics.PUB_COMPONENT + ".subscriber_count;pub_name=" + pubAgentName,
- subscriberCountCallback);
-
- }
-
- private String getMetricName(String name) {
- return format("%s.%s", PUB_COMPONENT, name);
+ public void subscriberCount(Supplier<Integer> subscriberCountCallback) {
+ metricsService.gauge(getMetricName(SUBSCRIBER_COUNT, tags), subscriberCountCallback);
}
}
diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/DistributionSubscriber.java b/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/DistributionSubscriber.java
index f9fc84f..ff33c6f 100644
--- a/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/DistributionSubscriber.java
+++ b/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/DistributionSubscriber.java
@@ -52,6 +52,7 @@
import org.apache.jackrabbit.util.Text;
import org.apache.sling.api.resource.LoginException;
import org.apache.sling.api.resource.PersistenceException;
+import org.apache.sling.commons.metrics.MetricsService;
import org.apache.sling.commons.metrics.Timer;
import org.apache.sling.distribution.ImportPostProcessException;
import org.apache.sling.distribution.agent.DistributionAgentState;
@@ -119,7 +120,7 @@
private Precondition precondition;
@Reference
- private SubscriberMetrics subscriberMetrics;
+ private MetricsService metricsService;
@Reference
BookKeeperFactory bookKeeperFactory;
@@ -127,6 +128,8 @@
@Reference
private SubscriberReadyStore subscriberReadyStore;
+ private SubscriberMetrics subscriberMetrics;
+
private volatile Closeable idleReadyCheck; // NOSONAR
private volatile IdleCheck idleCheck; // NOSONAR
@@ -162,12 +165,15 @@
subAgentName = requireNotBlank(config.name());
requireNonNull(config);
requireNonNull(context);
+ requireNonNull(metricsService);
requireNonNull(packageBuilder);
requireNonNull(slingSettings);
requireNonNull(messagingProvider);
requireNonNull(topics);
requireNonNull(precondition);
requireNonNull(bookKeeperFactory);
+
+ this.subscriberMetrics = new SubscriberMetrics(metricsService, subAgentName, config.editable());
if (config.editable()) {
commandPoller = new CommandPoller(messagingProvider, topics, subSlingId, subAgentName, delay::signal);
@@ -197,7 +203,7 @@
config.packageHandling(),
packageNodeName,
config.contentPackageExtractorOverwritePrimaryTypesOfFolders());
- bookKeeper = bookKeeperFactory.create(packageBuilder, bkConfig, statusSender, logSender);
+ bookKeeper = bookKeeperFactory.create(packageBuilder, bkConfig, statusSender, logSender, this.subscriberMetrics);
long startOffset = bookKeeper.loadOffset() + 1;
String assign = startOffset > 0 ? messagingProvider.assignTo(startOffset) : null;
diff --git a/src/main/java/org/apache/sling/distribution/journal/metrics/TaggedMetrics.java b/src/main/java/org/apache/sling/distribution/journal/metrics/TaggedMetrics.java
index 0c52dd7..b34f294 100644
--- a/src/main/java/org/apache/sling/distribution/journal/metrics/TaggedMetrics.java
+++ b/src/main/java/org/apache/sling/distribution/journal/metrics/TaggedMetrics.java
@@ -18,6 +18,7 @@
*/
package org.apache.sling.distribution.journal.metrics;
+import java.util.Collections;
import java.util.List;
import java.util.stream.Collectors;
@@ -37,4 +38,8 @@
log.debug("metric={}", metric);
return metric;
}
+
+ public static String getMetricName(String metricName, Tag tag) {
+ return getMetricName(metricName, Collections.singletonList(tag));
+ }
}
diff --git a/src/test/java/org/apache/sling/distribution/journal/bookkeeper/BookKeeperTest.java b/src/test/java/org/apache/sling/distribution/journal/bookkeeper/BookKeeperTest.java
index bf72a26..2663457 100644
--- a/src/test/java/org/apache/sling/distribution/journal/bookkeeper/BookKeeperTest.java
+++ b/src/test/java/org/apache/sling/distribution/journal/bookkeeper/BookKeeperTest.java
@@ -88,9 +88,8 @@
@Before
public void before() {
- subscriberMetrics = new SubscriberMetrics(MetricsService.NOOP);
-
BookKeeperConfig bkConfig = new BookKeeperConfig("subAgentName", "subSlingId", true, 10, PackageHandling.Extract, "package", true);
+ subscriberMetrics = new SubscriberMetrics(MetricsService.NOOP, bkConfig.getSubAgentName(), bkConfig.isEditable());
bookKeeper = new BookKeeper(resolverFactory, subscriberMetrics, packageHandler, eventAdmin, sender, logSender, bkConfig,
importPreProcessor, importPostProcessor, invalidationProcessor);
}
diff --git a/src/test/java/org/apache/sling/distribution/journal/impl/publisher/DistributionPublisherTest.java b/src/test/java/org/apache/sling/distribution/journal/impl/publisher/DistributionPublisherTest.java
index d7541e9..206cb29 100644
--- a/src/test/java/org/apache/sling/distribution/journal/impl/publisher/DistributionPublisherTest.java
+++ b/src/test/java/org/apache/sling/distribution/journal/impl/publisher/DistributionPublisherTest.java
@@ -47,7 +47,6 @@
import org.apache.commons.lang3.time.StopWatch;
import org.apache.sling.api.resource.ResourceResolver;
-import org.apache.sling.commons.metrics.Counter;
import org.apache.sling.commons.metrics.MetricsService;
import org.apache.sling.commons.metrics.internal.MetricsServiceImpl;
import org.apache.sling.distribution.DistributionRequest;
@@ -108,8 +107,6 @@
@Mock
private DistributionPackageBuilder packageBuilder;
- private PublishMetrics publishMetrics;
-
private OsgiContext context = new OsgiContext();
private DistributionPublisher publisher;
@@ -128,11 +125,12 @@
@Spy
private Topics topics = new Topics();
+
+ private MetricsService metricsService;
@Before
public void before() throws Exception {
- MetricsService metricsService = context.registerInjectActivateService(MetricsServiceImpl.class);
- publishMetrics = new PublishMetrics(metricsService);
+ metricsService = context.registerInjectActivateService(MetricsServiceImpl.class);
when(packageBuilder.getType()).thenReturn("journal");
Map<String, String> props = Map.of("name", PUB1AGENT1,
"maxQueueSizeDelay", "1000");
@@ -141,7 +139,7 @@
BundleContext bcontext = context.bundleContext();
when(messagingProvider.<PackageMessage>createSender(Mockito.anyString())).thenReturn(sender);
publisher = new DistributionPublisher(messagingProvider, packageBuilder, discoveryService, factory,
- eventAdmin, topics, publishMetrics, pubQueueProvider, Condition.INSTANCE, config, bcontext);
+ eventAdmin, topics, metricsService, pubQueueProvider, Condition.INSTANCE, config, bcontext);
when(pubQueueProvider.getQueuedNotifier()).thenReturn(queuedNotifier);
}
@@ -250,8 +248,8 @@
DistributionQueue queue = publisher.getQueue("i_am_not_a_queue");
assertNull(queue);
- Counter counter = publishMetrics.getQueueAccessErrorCount();
- assertEquals("Wrong queue counter expected",1, counter.getCount());
+ long count = getQueueAccessErrorCount();
+ assertEquals("Wrong queue counter expected",1, count);
}
@Test
@@ -264,8 +262,12 @@
fail("Expected exception not thrown");
} catch (RuntimeException expectedException) {
}
- Counter counter = publishMetrics.getQueueAccessErrorCount();
- assertEquals("Wrong getQueue error counter",1, counter.getCount());
+ long count = getQueueAccessErrorCount();
+ assertEquals("Wrong getQueue error counter",1, count);
+ }
+
+ private long getQueueAccessErrorCount() {
+ return new PublishMetrics(metricsService, PUB1AGENT1).getQueueAccessErrorCount().getCount();
}
@Test(expected = DistributionException.class)
diff --git a/src/test/java/org/apache/sling/distribution/journal/impl/subscriber/SubscriberTest.java b/src/test/java/org/apache/sling/distribution/journal/impl/subscriber/SubscriberTest.java
index c73b4b5..56ab9fc 100644
--- a/src/test/java/org/apache/sling/distribution/journal/impl/subscriber/SubscriberTest.java
+++ b/src/test/java/org/apache/sling/distribution/journal/impl/subscriber/SubscriberTest.java
@@ -74,7 +74,6 @@
import org.apache.sling.distribution.journal.bookkeeper.BookKeeper;
import org.apache.sling.distribution.journal.bookkeeper.BookKeeperFactory;
import org.apache.sling.distribution.journal.bookkeeper.LocalStore;
-import org.apache.sling.distribution.journal.bookkeeper.SubscriberMetrics;
import org.apache.sling.distribution.journal.shared.NoOpImportPreProcessor;
import org.apache.sling.distribution.journal.shared.NoOpImportPostProcessor;
import org.apache.sling.distribution.journal.impl.precondition.Precondition;
@@ -113,7 +112,6 @@
import org.osgi.service.event.EventAdmin;
import org.osgi.util.converter.Converters;
-@SuppressWarnings("unchecked")
@RunWith(MockitoJUnitRunner.class)
public class SubscriberTest {
@@ -180,7 +178,7 @@
private MessageSender<PackageStatusMessage> statusSender;
@Spy
- private SubscriberMetrics subscriberMetrics = new SubscriberMetrics(MetricsService.NOOP);
+ private MetricsService metricsService = MetricsService.NOOP;
@Spy
private ImportPreProcessor importPreProcessor = new NoOpImportPreProcessor();
diff --git a/src/test/java/org/apache/sling/distribution/journal/shared/SubscriberMetricsTest.java b/src/test/java/org/apache/sling/distribution/journal/shared/SubscriberMetricsTest.java
index 85eb95c..a3f0878 100644
--- a/src/test/java/org/apache/sling/distribution/journal/shared/SubscriberMetricsTest.java
+++ b/src/test/java/org/apache/sling/distribution/journal/shared/SubscriberMetricsTest.java
@@ -42,7 +42,7 @@
@Before
public void before() {
MetricsService metricsService = MetricsService.NOOP;
- metrics = new SubscriberMetrics(metricsService);
+ metrics = new SubscriberMetrics(metricsService, "subAgentName", true);
}
public static void mockBehaviour(MetricsService metricsService) {