SLING-8665: Adds metrics for jobs by simply exposing the queue statistics (both global and per queue) as gauges.
diff --git a/pom.xml b/pom.xml
index 0e441fc..0668f5f 100644
--- a/pom.xml
+++ b/pom.xml
@@ -263,6 +263,12 @@
<version>3.1.0</version>
<scope>provided</scope>
</dependency>
+ <dependency>
+ <artifactId>metrics-core</artifactId>
+ <version>3.2.4</version>
+ <groupId>io.dropwizard.metrics</groupId>
+ <scope>provided</scope>
+ </dependency>
<dependency>
<groupId>org.apache.sling</groupId>
<artifactId>org.apache.sling.serviceusermapper</artifactId>
diff --git a/src/main/java/org/apache/sling/event/impl/jobs/stats/GaugeSupport.java b/src/main/java/org/apache/sling/event/impl/jobs/stats/GaugeSupport.java
new file mode 100644
index 0000000..493c97a
--- /dev/null
+++ b/src/main/java/org/apache/sling/event/impl/jobs/stats/GaugeSupport.java
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sling.event.impl.jobs.stats;
+
+import com.codahale.metrics.Gauge;
+import com.codahale.metrics.MetricRegistry;
+import org.apache.sling.event.jobs.Statistics;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/** Helper class that holds gauges for relevant queue statistics. */
+class GaugeSupport {
+
+ static final String GAUGE_NAME_PREFIX = "event.jobs";
+ static final String QUEUE_PREFIX = "queue";
+ static final String CANCELLED_METRIC_SUFFIX = ".cancelled.count";
+ static final String FINISHED_METRIC_SUFFIX = ".finished.count";
+ static final String FAILED__METRIC_SUFFIX = ".failed.count";
+ static final String QUEUED_METRIC_SUFFIX = ".queued.count";
+ static final String PROCESSED_METRIC_SUFFIX = ".processed.count";
+ static final String ACTIVE_METRIC_SUFFIX = ".active.count";
+ static final String AVG_WAITING_TIME_METRIC_SUFFIX = ".averageWaitingTime";
+ static final String AVG_PROCESSING_TIME_METRIC_SUFFIX = ".averageProcessingTime";
+
+ private final MetricRegistry metricRegistry;
+ private Map<String, Gauge> gaugeList = new HashMap<>();
+
+ /** Create a new GaugeSupport instance for the global queue.
+ *
+ * @param globalQueueStats the global queueStats
+ * @param metricRegistry the (sling) metric registry */
+ GaugeSupport(final Statistics globalQueueStats, MetricRegistry metricRegistry) {
+ this(null, globalQueueStats, metricRegistry);
+ }
+
+ /** Creates a new GaugeSupport instance. Registers gauges for jobs (based on the queueStats).
+ *
+ * @param queueName name of the queue
+ * @param queueStats queueStats of that queue
+ * @param metricRegistry the (sling) metric registry */
+ GaugeSupport(String queueName, final Statistics queueStats, MetricRegistry metricRegistry) {
+ this.metricRegistry = metricRegistry;
+ if (metricRegistry != null) {
+ gaugeList.put(getMetricName(queueName, FINISHED_METRIC_SUFFIX), new Gauge() {
+
+ public Object getValue() {
+ return queueStats.getNumberOfFinishedJobs();
+ }
+ });
+ gaugeList.put(getMetricName(queueName, CANCELLED_METRIC_SUFFIX), new Gauge() {
+
+ public Object getValue() {
+ return queueStats.getNumberOfCancelledJobs();
+ }
+ });
+ gaugeList.put(getMetricName(queueName, FAILED__METRIC_SUFFIX), new Gauge() {
+
+ public Object getValue() {
+ return queueStats.getNumberOfFailedJobs();
+ }
+ });
+ gaugeList.put(getMetricName(queueName, QUEUED_METRIC_SUFFIX), new Gauge() {
+
+ public Object getValue() {
+ return queueStats.getNumberOfQueuedJobs();
+ }
+ });
+ gaugeList.put(getMetricName(queueName, PROCESSED_METRIC_SUFFIX), new Gauge() {
+
+ public Object getValue() {
+ return queueStats.getNumberOfProcessedJobs();
+ }
+ });
+ gaugeList.put(getMetricName(queueName, ACTIVE_METRIC_SUFFIX), new Gauge() {
+
+ public Object getValue() {
+ return queueStats.getNumberOfActiveJobs();
+ }
+ });
+ gaugeList.put(getMetricName(queueName, AVG_WAITING_TIME_METRIC_SUFFIX), new Gauge() {
+
+ public Object getValue() {
+ return queueStats.getAverageWaitingTime();
+ }
+ });
+ gaugeList.put(getMetricName(queueName, AVG_PROCESSING_TIME_METRIC_SUFFIX), new Gauge() {
+
+ public Object getValue() {
+ return queueStats.getAverageProcessingTime();
+ }
+ });
+ for (Map.Entry<String, Gauge> entry : gaugeList.entrySet()) {
+ metricRegistry.register(entry.getKey(), entry.getValue());
+ }
+ }
+ }
+
+ private String getMetricName(String queueName, String metricSuffix) {
+ return GAUGE_NAME_PREFIX + (queueName != null ? "." + QUEUE_PREFIX + "." + queueName : "") + metricSuffix;
+ }
+
+ /** Unregisters all job gauges of the queue. */
+ void shutdown() {
+ if (metricRegistry != null) {
+ for (String metricName : gaugeList.keySet()) {
+ metricRegistry.remove(metricName);
+ }
+ }
+ }
+
+}
diff --git a/src/main/java/org/apache/sling/event/impl/jobs/stats/StatisticsManager.java b/src/main/java/org/apache/sling/event/impl/jobs/stats/StatisticsManager.java
index 455cb1e..3700250 100644
--- a/src/main/java/org/apache/sling/event/impl/jobs/stats/StatisticsManager.java
+++ b/src/main/java/org/apache/sling/event/impl/jobs/stats/StatisticsManager.java
@@ -22,13 +22,14 @@
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
+import com.codahale.metrics.MetricRegistry;
import org.apache.sling.event.impl.jobs.InternalJobState;
import org.apache.sling.event.impl.jobs.config.JobManagerConfiguration;
import org.apache.sling.event.jobs.Statistics;
import org.apache.sling.event.jobs.TopicStatistics;
import org.osgi.framework.Constants;
-import org.osgi.service.component.annotations.Component;
-import org.osgi.service.component.annotations.Reference;
+import org.osgi.service.component.annotations.*;
+
/**
* The statistics manager keeps track of all statistics related tasks.
@@ -43,6 +44,12 @@
@Reference
private JobManagerConfiguration configuration;
+ /**
+ * The sling metric registry
+ */
+ @Reference(target = "(name=sling)", cardinality = ReferenceCardinality.OPTIONAL)
+ private MetricRegistry metricRegistry;
+
/** Global statistics. */
private final StatisticsImpl globalStatistics = new StatisticsImpl() {
@@ -57,9 +64,15 @@
};
+ /** Gauges for the global job statistics. */
+ private GaugeSupport globalGauges;
+
/** Statistics per topic. */
private final ConcurrentMap<String, TopicStatistics> topicStatistics = new ConcurrentHashMap<>();
+ /** Gauges for the statistics per topic. */
+ private final ConcurrentMap<String, GaugeSupport> topicGauges = new ConcurrentHashMap<>();
+
/** Statistics per queue. */
private final ConcurrentMap<String, Statistics> queueStatistics = new ConcurrentHashMap<>();
@@ -105,6 +118,7 @@
if ( queueStats == null ) {
queueStatistics.putIfAbsent(queueName, new StatisticsImpl());
queueStats = (StatisticsImpl)queueStatistics.get(queueName);
+ topicGauges.putIfAbsent(queueName, new GaugeSupport(queueName, queueStats, metricRegistry));
}
return queueStats;
}
@@ -141,7 +155,6 @@
if ( queueStats != null ) {
queueStats.finishedJob(processingTime);
}
-
}
}
@@ -181,4 +194,17 @@
queueStats.decQueued();
}
}
+
+ @Activate
+ protected void activate() {
+ globalGauges = new GaugeSupport(globalStatistics, metricRegistry);
+ }
+
+ @Deactivate
+ protected void deactivate() {
+ globalGauges.shutdown();
+ for (GaugeSupport gaugeSupport : topicGauges.values()) {
+ gaugeSupport.shutdown();
+ }
+ }
}
diff --git a/src/test/java/org/apache/sling/event/impl/jobs/stats/StatisticsManagerTest.java b/src/test/java/org/apache/sling/event/impl/jobs/stats/StatisticsManagerTest.java
new file mode 100644
index 0000000..3c1937b
--- /dev/null
+++ b/src/test/java/org/apache/sling/event/impl/jobs/stats/StatisticsManagerTest.java
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sling.event.impl.jobs.stats;
+
+import com.codahale.metrics.Gauge;
+import com.codahale.metrics.MetricRegistry;
+import org.apache.sling.event.impl.TestUtil;
+import org.apache.sling.event.impl.jobs.InternalJobState;
+import org.apache.sling.event.jobs.Statistics;
+import org.junit.Before;
+import org.junit.Test;
+
+import static org.apache.sling.event.impl.jobs.stats.GaugeSupport.*;
+import static org.junit.Assert.assertEquals;
+
+public class StatisticsManagerTest {
+
+ private static final String TEST_QUEUE_NAME = "queue_name";
+ private static final String TEST_TOPIC = "testtopic";
+
+ private StatisticsManager statisticsManager;
+ private MetricRegistry metricRegistry;
+
+ private StatisticsImpl statistics = new StatisticsImpl();
+
+ @Before
+ public void setup() {
+ statisticsManager = new StatisticsManager();
+ metricRegistry = new MetricRegistry();
+ TestUtil.setFieldValue(statisticsManager, "globalStatistics", statistics);
+ TestUtil.setFieldValue(statisticsManager, "metricRegistry", metricRegistry);
+ statisticsManager.activate();
+ }
+
+ @Test
+ public void testGlobalGaugesAreRemovedOnDeactivate() {
+ statisticsManager.jobQueued(TEST_QUEUE_NAME, TEST_TOPIC);
+ assertEquals("Less than 16 metrics present (8 global + 8 topic).",
+ 16, metricRegistry.getMetrics().size());
+ statisticsManager.deactivate();
+ assertEquals(0, metricRegistry.getMetrics().size());
+ }
+
+ @Test
+ public void testJobStarted() {
+ long queueTime = 10L;
+ statisticsManager.jobStarted(TEST_QUEUE_NAME, TEST_TOPIC, queueTime);
+ Gauge topicMetric = getTopicMetric(ACTIVE_METRIC_SUFFIX);
+ Gauge waitingTimeTopicMetric = getTopicMetric(AVG_WAITING_TIME_METRIC_SUFFIX);
+ Gauge globalMetric = getGlobalMetric(ACTIVE_METRIC_SUFFIX);
+ Gauge waitingTimeGlobalMetric = getGlobalMetric(AVG_WAITING_TIME_METRIC_SUFFIX);
+
+ Statistics queueStatistics = statisticsManager.getQueueStatistics(TEST_QUEUE_NAME);
+
+ assertEquals(1L, queueStatistics.getNumberOfActiveJobs());
+ assertEquals(1L, topicMetric.getValue());
+ assertEquals(1L, globalMetric.getValue());
+ assertEquals(queueStatistics.getAverageWaitingTime(), waitingTimeTopicMetric.getValue());
+ assertEquals(queueStatistics.getAverageWaitingTime(), waitingTimeGlobalMetric.getValue());
+ }
+
+ @Test
+ public void testJobQueueDequeue() {
+ statisticsManager.jobQueued(TEST_QUEUE_NAME, TEST_TOPIC);
+ Gauge topicMetric = getTopicMetric(QUEUED_METRIC_SUFFIX);
+ Gauge globalMetric = getGlobalMetric(QUEUED_METRIC_SUFFIX);
+ Statistics queueStatistics = statisticsManager.getQueueStatistics(TEST_QUEUE_NAME);
+ assertEquals(1L, queueStatistics.getNumberOfQueuedJobs());
+ assertEquals(1L, topicMetric.getValue());
+ assertEquals(1L, globalMetric.getValue());
+
+ statisticsManager.jobDequeued(TEST_QUEUE_NAME, TEST_TOPIC);
+ assertEquals(0L, queueStatistics.getNumberOfQueuedJobs());
+ assertEquals(0L, topicMetric.getValue());
+ assertEquals(0L, globalMetric.getValue());
+ }
+
+ @Test
+ public void testJobCancelled() {
+ statisticsManager.jobEnded(TEST_QUEUE_NAME, TEST_TOPIC, InternalJobState.CANCELLED, 0L);
+ Gauge topicMetric = getTopicMetric(CANCELLED_METRIC_SUFFIX);
+ Gauge globalMetric = getGlobalMetric(CANCELLED_METRIC_SUFFIX);
+ Statistics queueStatistics = statisticsManager.getQueueStatistics(TEST_QUEUE_NAME);
+ assertEquals(1L, queueStatistics.getNumberOfCancelledJobs());
+ assertEquals(1L, topicMetric.getValue());
+ assertEquals(1L, globalMetric.getValue());
+ }
+
+ @Test
+ public void testJobFailed() {
+ statisticsManager.jobEnded(TEST_QUEUE_NAME, TEST_TOPIC, InternalJobState.FAILED, 0L);
+ Gauge topicMetric = getTopicMetric(FAILED__METRIC_SUFFIX);
+ Gauge globalMetric = getGlobalMetric(FAILED__METRIC_SUFFIX);
+ Statistics queueStatistics = statisticsManager.getQueueStatistics(TEST_QUEUE_NAME);
+ assertEquals(1L, queueStatistics.getNumberOfFailedJobs());
+ assertEquals(1L, topicMetric.getValue());
+ assertEquals(1L, globalMetric.getValue());
+ }
+
+ @Test
+ public void testJobFinished() {
+ long processingTime = 10L;
+ statisticsManager.jobEnded(TEST_QUEUE_NAME, TEST_TOPIC, InternalJobState.SUCCEEDED, processingTime);
+ Gauge finishedTopicMetric = getTopicMetric(FINISHED_METRIC_SUFFIX);
+ Gauge processedTopicMetric = getTopicMetric(PROCESSED_METRIC_SUFFIX);
+ Gauge processingTopicTimeMetric = getTopicMetric(AVG_PROCESSING_TIME_METRIC_SUFFIX);
+ Gauge finishedGlobalMetric = getGlobalMetric(FINISHED_METRIC_SUFFIX);
+ Gauge processedGlobalMetric = getGlobalMetric(PROCESSED_METRIC_SUFFIX);
+ Gauge processingGlobalTimeMetric = getGlobalMetric(AVG_PROCESSING_TIME_METRIC_SUFFIX);
+
+ Statistics queueStatistics = statisticsManager.getQueueStatistics(TEST_QUEUE_NAME);
+
+ assertEquals(1L, queueStatistics.getNumberOfFinishedJobs());
+ assertEquals(1L, finishedTopicMetric.getValue());
+ assertEquals(1L, processedTopicMetric.getValue());
+ assertEquals(queueStatistics.getAverageProcessingTime(), processingTopicTimeMetric.getValue());
+ assertEquals(1L, finishedGlobalMetric.getValue());
+ assertEquals(1L, processedGlobalMetric.getValue());
+ assertEquals(queueStatistics.getAverageProcessingTime(), processingGlobalTimeMetric.getValue());
+ }
+
+ private Gauge getTopicMetric(String metricSuffix) {
+ return (Gauge) metricRegistry.getMetrics().get(GAUGE_NAME_PREFIX +
+ "." + QUEUE_PREFIX + "." + TEST_QUEUE_NAME + metricSuffix);
+ }
+
+ private Gauge getGlobalMetric(String metricSuffix) {
+ return (Gauge) metricRegistry.getMetrics().get(GAUGE_NAME_PREFIX + metricSuffix);
+ }
+
+
+}
\ No newline at end of file
diff --git a/src/test/java/org/apache/sling/event/it/AbstractJobHandlingTest.java b/src/test/java/org/apache/sling/event/it/AbstractJobHandlingTest.java
index c51ecd9..464ef74 100644
--- a/src/test/java/org/apache/sling/event/it/AbstractJobHandlingTest.java
+++ b/src/test/java/org/apache/sling/event/it/AbstractJobHandlingTest.java
@@ -186,6 +186,8 @@
mavenBundle("org.apache.commons", "commons-lang3", "3.5"),
mavenBundle("commons-pool", "commons-pool", "1.6"),
+ mavenBundle("io.dropwizard.metrics", "metrics-core", "3.2.4"),
+
mavenBundle("org.apache.servicemix.bundles", "org.apache.servicemix.bundles.concurrent", "1.3.4_1"),
mavenBundle("org.apache.geronimo.bundles", "commons-httpclient", "3.1_1"),
@@ -209,6 +211,7 @@
mavenBundle("org.apache.sling", "org.apache.sling.commons.johnzon", "1.0.0"),
mavenBundle("org.apache.sling", "org.apache.sling.commons.scheduler", "2.4.14"),
mavenBundle("org.apache.sling", "org.apache.sling.commons.threads", "3.2.4"),
+ mavenBundle("org.apache.sling", "org.apache.sling.commons.metrics", "1.2.6"),
mavenBundle("org.apache.sling", "org.apache.sling.auth.core", "1.3.12"),
mavenBundle("org.apache.sling", "org.apache.sling.discovery.api", "1.0.2"),