SLING-8665: Adds metrics for jobs by simply exposing the queue statistics (both global and per queue) as gauges.
diff --git a/pom.xml b/pom.xml
index 0e441fc..0668f5f 100644
--- a/pom.xml
+++ b/pom.xml
@@ -263,6 +263,12 @@
             <version>3.1.0</version>
             <scope>provided</scope>
         </dependency>
+         <dependency>
+            <artifactId>metrics-core</artifactId>
+            <version>3.2.4</version>
+            <groupId>io.dropwizard.metrics</groupId>
+            <scope>provided</scope>
+        </dependency>
         <dependency>
             <groupId>org.apache.sling</groupId>
             <artifactId>org.apache.sling.serviceusermapper</artifactId>
diff --git a/src/main/java/org/apache/sling/event/impl/jobs/stats/GaugeSupport.java b/src/main/java/org/apache/sling/event/impl/jobs/stats/GaugeSupport.java
new file mode 100644
index 0000000..493c97a
--- /dev/null
+++ b/src/main/java/org/apache/sling/event/impl/jobs/stats/GaugeSupport.java
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sling.event.impl.jobs.stats;
+
+import com.codahale.metrics.Gauge;
+import com.codahale.metrics.MetricRegistry;
+import org.apache.sling.event.jobs.Statistics;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/** Helper class that holds gauges for relevant queue statistics. */
+class GaugeSupport {
+
+    static final String GAUGE_NAME_PREFIX = "event.jobs";
+    static final String QUEUE_PREFIX = "queue";
+    static final String CANCELLED_METRIC_SUFFIX = ".cancelled.count";
+    static final String FINISHED_METRIC_SUFFIX = ".finished.count";
+    static final String FAILED__METRIC_SUFFIX = ".failed.count";
+    static final String QUEUED_METRIC_SUFFIX = ".queued.count";
+    static final String PROCESSED_METRIC_SUFFIX = ".processed.count";
+    static final String ACTIVE_METRIC_SUFFIX = ".active.count";
+    static final String AVG_WAITING_TIME_METRIC_SUFFIX = ".averageWaitingTime";
+    static final String AVG_PROCESSING_TIME_METRIC_SUFFIX = ".averageProcessingTime";
+
+    private final MetricRegistry metricRegistry;
+    private Map<String, Gauge> gaugeList = new HashMap<>();
+
+    /** Create a new GaugeSupport instance for the global queue.
+     *
+     * @param globalQueueStats the global queueStats
+     * @param metricRegistry the (sling) metric registry */
+    GaugeSupport(final Statistics globalQueueStats, MetricRegistry metricRegistry) {
+        this(null, globalQueueStats, metricRegistry);
+    }
+
+    /** Creates a new GaugeSupport instance. Registers gauges for jobs (based on the queueStats).
+     *
+     * @param queueName name of the queue
+     * @param queueStats queueStats of that queue
+     * @param metricRegistry the (sling) metric registry */
+    GaugeSupport(String queueName, final Statistics queueStats, MetricRegistry metricRegistry) {
+        this.metricRegistry = metricRegistry;
+        if (metricRegistry != null) {
+            gaugeList.put(getMetricName(queueName, FINISHED_METRIC_SUFFIX), new Gauge() {
+
+                public Object getValue() {
+                    return queueStats.getNumberOfFinishedJobs();
+                }
+            });
+            gaugeList.put(getMetricName(queueName, CANCELLED_METRIC_SUFFIX), new Gauge() {
+
+                public Object getValue() {
+                    return queueStats.getNumberOfCancelledJobs();
+                }
+            });
+            gaugeList.put(getMetricName(queueName, FAILED__METRIC_SUFFIX), new Gauge() {
+
+                public Object getValue() {
+                    return queueStats.getNumberOfFailedJobs();
+                }
+            });
+            gaugeList.put(getMetricName(queueName, QUEUED_METRIC_SUFFIX), new Gauge() {
+
+                public Object getValue() {
+                    return queueStats.getNumberOfQueuedJobs();
+                }
+            });
+            gaugeList.put(getMetricName(queueName, PROCESSED_METRIC_SUFFIX), new Gauge() {
+
+                public Object getValue() {
+                    return queueStats.getNumberOfProcessedJobs();
+                }
+            });
+            gaugeList.put(getMetricName(queueName, ACTIVE_METRIC_SUFFIX), new Gauge() {
+
+                public Object getValue() {
+                    return queueStats.getNumberOfActiveJobs();
+                }
+            });
+            gaugeList.put(getMetricName(queueName, AVG_WAITING_TIME_METRIC_SUFFIX), new Gauge() {
+
+                public Object getValue() {
+                    return queueStats.getAverageWaitingTime();
+                }
+            });
+            gaugeList.put(getMetricName(queueName, AVG_PROCESSING_TIME_METRIC_SUFFIX), new Gauge() {
+
+                public Object getValue() {
+                    return queueStats.getAverageProcessingTime();
+                }
+            });
+            for (Map.Entry<String, Gauge> entry : gaugeList.entrySet()) {
+                metricRegistry.register(entry.getKey(), entry.getValue());
+            }
+        }
+    }
+
+    private String getMetricName(String queueName, String metricSuffix) {
+        return GAUGE_NAME_PREFIX + (queueName != null ? "." + QUEUE_PREFIX + "." + queueName : "") + metricSuffix;
+    }
+
+    /** Unregisters all job gauges of the queue. */
+    void shutdown() {
+        if (metricRegistry != null) {
+            for (String metricName : gaugeList.keySet()) {
+                metricRegistry.remove(metricName);
+            }
+        }
+    }
+
+}
diff --git a/src/main/java/org/apache/sling/event/impl/jobs/stats/StatisticsManager.java b/src/main/java/org/apache/sling/event/impl/jobs/stats/StatisticsManager.java
index 455cb1e..3700250 100644
--- a/src/main/java/org/apache/sling/event/impl/jobs/stats/StatisticsManager.java
+++ b/src/main/java/org/apache/sling/event/impl/jobs/stats/StatisticsManager.java
@@ -22,13 +22,14 @@
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 
+import com.codahale.metrics.MetricRegistry;
 import org.apache.sling.event.impl.jobs.InternalJobState;
 import org.apache.sling.event.impl.jobs.config.JobManagerConfiguration;
 import org.apache.sling.event.jobs.Statistics;
 import org.apache.sling.event.jobs.TopicStatistics;
 import org.osgi.framework.Constants;
-import org.osgi.service.component.annotations.Component;
-import org.osgi.service.component.annotations.Reference;
+import org.osgi.service.component.annotations.*;
+
 
 /**
  * The statistics manager keeps track of all statistics related tasks.
@@ -43,6 +44,12 @@
     @Reference
     private JobManagerConfiguration configuration;
 
+    /**
+     * The sling metric registry
+     */
+    @Reference(target = "(name=sling)", cardinality = ReferenceCardinality.OPTIONAL)
+    private MetricRegistry metricRegistry;
+
     /** Global statistics. */
     private final StatisticsImpl globalStatistics = new StatisticsImpl() {
 
@@ -57,9 +64,15 @@
 
     };
 
+    /** Gauges for the global job statistics. */
+    private GaugeSupport globalGauges;
+
     /** Statistics per topic. */
     private final ConcurrentMap<String, TopicStatistics> topicStatistics = new ConcurrentHashMap<>();
 
+    /** Gauges for the statistics per topic. */
+    private final ConcurrentMap<String, GaugeSupport> topicGauges = new ConcurrentHashMap<>();
+
     /** Statistics per queue. */
     private final ConcurrentMap<String, Statistics> queueStatistics = new ConcurrentHashMap<>();
 
@@ -105,6 +118,7 @@
         if ( queueStats == null ) {
             queueStatistics.putIfAbsent(queueName, new StatisticsImpl());
             queueStats = (StatisticsImpl)queueStatistics.get(queueName);
+            topicGauges.putIfAbsent(queueName, new GaugeSupport(queueName, queueStats, metricRegistry));
         }
         return queueStats;
     }
@@ -141,7 +155,6 @@
             if ( queueStats != null ) {
                 queueStats.finishedJob(processingTime);
             }
-
         }
     }
 
@@ -181,4 +194,17 @@
             queueStats.decQueued();
         }
     }
+
+    @Activate
+    protected void activate() {
+        globalGauges = new GaugeSupport(globalStatistics, metricRegistry);
+    }
+
+    @Deactivate
+    protected void deactivate() {
+        globalGauges.shutdown();
+        for (GaugeSupport gaugeSupport : topicGauges.values()) {
+            gaugeSupport.shutdown();
+        }
+    }
 }
diff --git a/src/test/java/org/apache/sling/event/impl/jobs/stats/StatisticsManagerTest.java b/src/test/java/org/apache/sling/event/impl/jobs/stats/StatisticsManagerTest.java
new file mode 100644
index 0000000..3c1937b
--- /dev/null
+++ b/src/test/java/org/apache/sling/event/impl/jobs/stats/StatisticsManagerTest.java
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sling.event.impl.jobs.stats;
+
+import com.codahale.metrics.Gauge;
+import com.codahale.metrics.MetricRegistry;
+import org.apache.sling.event.impl.TestUtil;
+import org.apache.sling.event.impl.jobs.InternalJobState;
+import org.apache.sling.event.jobs.Statistics;
+import org.junit.Before;
+import org.junit.Test;
+
+import static org.apache.sling.event.impl.jobs.stats.GaugeSupport.*;
+import static org.junit.Assert.assertEquals;
+
+public class StatisticsManagerTest {
+
+    private static final String TEST_QUEUE_NAME = "queue_name";
+    private static final String TEST_TOPIC = "testtopic";
+
+    private StatisticsManager statisticsManager;
+    private MetricRegistry metricRegistry;
+
+    private StatisticsImpl statistics = new StatisticsImpl();
+
+    @Before
+    public void setup() {
+        statisticsManager = new StatisticsManager();
+        metricRegistry = new MetricRegistry();
+        TestUtil.setFieldValue(statisticsManager, "globalStatistics", statistics);
+        TestUtil.setFieldValue(statisticsManager, "metricRegistry", metricRegistry);
+        statisticsManager.activate();
+    }
+
+    @Test
+    public void testGlobalGaugesAreRemovedOnDeactivate() {
+        statisticsManager.jobQueued(TEST_QUEUE_NAME, TEST_TOPIC);
+        assertEquals("Less than 16 metrics present (8 global + 8 topic).",
+                16, metricRegistry.getMetrics().size());
+        statisticsManager.deactivate();
+        assertEquals(0, metricRegistry.getMetrics().size());
+    }
+
+    @Test
+    public void testJobStarted() {
+        long queueTime = 10L;
+        statisticsManager.jobStarted(TEST_QUEUE_NAME, TEST_TOPIC, queueTime);
+        Gauge topicMetric = getTopicMetric(ACTIVE_METRIC_SUFFIX);
+        Gauge waitingTimeTopicMetric = getTopicMetric(AVG_WAITING_TIME_METRIC_SUFFIX);
+        Gauge globalMetric = getGlobalMetric(ACTIVE_METRIC_SUFFIX);
+        Gauge waitingTimeGlobalMetric = getGlobalMetric(AVG_WAITING_TIME_METRIC_SUFFIX);
+
+        Statistics queueStatistics = statisticsManager.getQueueStatistics(TEST_QUEUE_NAME);
+
+        assertEquals(1L, queueStatistics.getNumberOfActiveJobs());
+        assertEquals(1L, topicMetric.getValue());
+        assertEquals(1L, globalMetric.getValue());
+        assertEquals(queueStatistics.getAverageWaitingTime(), waitingTimeTopicMetric.getValue());
+        assertEquals(queueStatistics.getAverageWaitingTime(), waitingTimeGlobalMetric.getValue());
+    }
+
+    @Test
+    public void testJobQueueDequeue() {
+        statisticsManager.jobQueued(TEST_QUEUE_NAME, TEST_TOPIC);
+        Gauge topicMetric = getTopicMetric(QUEUED_METRIC_SUFFIX);
+        Gauge globalMetric = getGlobalMetric(QUEUED_METRIC_SUFFIX);
+        Statistics queueStatistics = statisticsManager.getQueueStatistics(TEST_QUEUE_NAME);
+        assertEquals(1L, queueStatistics.getNumberOfQueuedJobs());
+        assertEquals(1L, topicMetric.getValue());
+        assertEquals(1L, globalMetric.getValue());
+
+        statisticsManager.jobDequeued(TEST_QUEUE_NAME, TEST_TOPIC);
+        assertEquals(0L, queueStatistics.getNumberOfQueuedJobs());
+        assertEquals(0L, topicMetric.getValue());
+        assertEquals(0L, globalMetric.getValue());
+    }
+
+    @Test
+    public void testJobCancelled() {
+        statisticsManager.jobEnded(TEST_QUEUE_NAME, TEST_TOPIC, InternalJobState.CANCELLED, 0L);
+        Gauge topicMetric = getTopicMetric(CANCELLED_METRIC_SUFFIX);
+        Gauge globalMetric = getGlobalMetric(CANCELLED_METRIC_SUFFIX);
+        Statistics queueStatistics = statisticsManager.getQueueStatistics(TEST_QUEUE_NAME);
+        assertEquals(1L, queueStatistics.getNumberOfCancelledJobs());
+        assertEquals(1L, topicMetric.getValue());
+        assertEquals(1L, globalMetric.getValue());
+    }
+
+    @Test
+    public void testJobFailed() {
+        statisticsManager.jobEnded(TEST_QUEUE_NAME, TEST_TOPIC, InternalJobState.FAILED, 0L);
+        Gauge topicMetric = getTopicMetric(FAILED__METRIC_SUFFIX);
+        Gauge globalMetric = getGlobalMetric(FAILED__METRIC_SUFFIX);
+        Statistics queueStatistics = statisticsManager.getQueueStatistics(TEST_QUEUE_NAME);
+        assertEquals(1L, queueStatistics.getNumberOfFailedJobs());
+        assertEquals(1L, topicMetric.getValue());
+        assertEquals(1L, globalMetric.getValue());
+    }
+
+    @Test
+    public void testJobFinished() {
+        long processingTime = 10L;
+        statisticsManager.jobEnded(TEST_QUEUE_NAME, TEST_TOPIC, InternalJobState.SUCCEEDED, processingTime);
+        Gauge finishedTopicMetric = getTopicMetric(FINISHED_METRIC_SUFFIX);
+        Gauge processedTopicMetric = getTopicMetric(PROCESSED_METRIC_SUFFIX);
+        Gauge processingTopicTimeMetric = getTopicMetric(AVG_PROCESSING_TIME_METRIC_SUFFIX);
+        Gauge finishedGlobalMetric = getGlobalMetric(FINISHED_METRIC_SUFFIX);
+        Gauge processedGlobalMetric = getGlobalMetric(PROCESSED_METRIC_SUFFIX);
+        Gauge processingGlobalTimeMetric = getGlobalMetric(AVG_PROCESSING_TIME_METRIC_SUFFIX);
+
+        Statistics queueStatistics = statisticsManager.getQueueStatistics(TEST_QUEUE_NAME);
+
+        assertEquals(1L, queueStatistics.getNumberOfFinishedJobs());
+        assertEquals(1L, finishedTopicMetric.getValue());
+        assertEquals(1L, processedTopicMetric.getValue());
+        assertEquals(queueStatistics.getAverageProcessingTime(), processingTopicTimeMetric.getValue());
+        assertEquals(1L, finishedGlobalMetric.getValue());
+        assertEquals(1L, processedGlobalMetric.getValue());
+        assertEquals(queueStatistics.getAverageProcessingTime(), processingGlobalTimeMetric.getValue());
+    }
+
+    private Gauge getTopicMetric(String metricSuffix) {
+        return (Gauge) metricRegistry.getMetrics().get(GAUGE_NAME_PREFIX +
+                "." + QUEUE_PREFIX + "." + TEST_QUEUE_NAME + metricSuffix);
+    }
+
+    private Gauge getGlobalMetric(String metricSuffix) {
+        return (Gauge) metricRegistry.getMetrics().get(GAUGE_NAME_PREFIX + metricSuffix);
+    }
+
+
+}
\ No newline at end of file
diff --git a/src/test/java/org/apache/sling/event/it/AbstractJobHandlingTest.java b/src/test/java/org/apache/sling/event/it/AbstractJobHandlingTest.java
index c51ecd9..464ef74 100644
--- a/src/test/java/org/apache/sling/event/it/AbstractJobHandlingTest.java
+++ b/src/test/java/org/apache/sling/event/it/AbstractJobHandlingTest.java
@@ -186,6 +186,8 @@
                 mavenBundle("org.apache.commons", "commons-lang3", "3.5"),
                 mavenBundle("commons-pool", "commons-pool", "1.6"),
 
+                mavenBundle("io.dropwizard.metrics", "metrics-core", "3.2.4"),
+
                 mavenBundle("org.apache.servicemix.bundles", "org.apache.servicemix.bundles.concurrent", "1.3.4_1"),
 
                 mavenBundle("org.apache.geronimo.bundles", "commons-httpclient", "3.1_1"),
@@ -209,6 +211,7 @@
                 mavenBundle("org.apache.sling", "org.apache.sling.commons.johnzon", "1.0.0"),
                 mavenBundle("org.apache.sling", "org.apache.sling.commons.scheduler", "2.4.14"),
                 mavenBundle("org.apache.sling", "org.apache.sling.commons.threads", "3.2.4"),
+                mavenBundle("org.apache.sling", "org.apache.sling.commons.metrics", "1.2.6"),
 
                 mavenBundle("org.apache.sling", "org.apache.sling.auth.core", "1.3.12"),
                 mavenBundle("org.apache.sling", "org.apache.sling.discovery.api", "1.0.2"),