Merge pull request #1 from apache/master

merge from upstream
diff --git a/Jenkinsfile b/Jenkinsfile
new file mode 100644
index 0000000..f582519
--- /dev/null
+++ b/Jenkinsfile
@@ -0,0 +1,20 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+slingOsgiBundleBuild()
diff --git a/README.md b/README.md
index 45122a3..6e33377 100644
--- a/README.md
+++ b/README.md
@@ -1,6 +1,6 @@
-[<img src="http://sling.apache.org/res/logos/sling.png"/>](http://sling.apache.org)
+[![Apache Sling](https://sling.apache.org/res/logos/sling.png)](https://sling.apache.org)
 
- [![Build Status](https://builds.apache.org/buildStatus/icon?job=sling-org-apache-sling-event-1.8)](https://builds.apache.org/view/S-Z/view/Sling/job/sling-org-apache-sling-event-1.8) [![Test Status](https://img.shields.io/jenkins/t/https/builds.apache.org/view/S-Z/view/Sling/job/sling-org-apache-sling-event-1.8.svg)](https://builds.apache.org/view/S-Z/view/Sling/job/sling-org-apache-sling-event-1.8/test_results_analyzer/) [![Maven Central](https://maven-badges.herokuapp.com/maven-central/org.apache.sling/org.apache.sling.event/badge.svg)](http://search.maven.org/#search%7Cga%7C1%7Cg%3A%22org.apache.sling%22%20a%3A%22org.apache.sling.event%22) [![JavaDocs](https://www.javadoc.io/badge/org.apache.sling/org.apache.sling.event.svg)](https://www.javadoc.io/doc/org.apache.sling/org.apache.sling.event) [![License](https://img.shields.io/badge/License-Apache%202.0-blue.svg)](https://www.apache.org/licenses/LICENSE-2.0)
+&#32;[![Build Status](https://ci-builds.apache.org/job/Sling/job/modules/job/sling-org-apache-sling-event/job/master/badge/icon)](https://ci-builds.apache.org/job/Sling/job/modules/job/sling-org-apache-sling-event/job/master/)&#32;[![Test Status](https://img.shields.io/jenkins/tests.svg?jobUrl=https://ci-builds.apache.org/job/Sling/job/modules/job/sling-org-apache-sling-event/job/master/)](https://ci-builds.apache.org/job/Sling/job/modules/job/sling-org-apache-sling-event/job/master/test/?width=800&height=600)&#32;[![Coverage](https://sonarcloud.io/api/project_badges/measure?project=apache_sling-org-apache-sling-event&metric=coverage)](https://sonarcloud.io/dashboard?id=apache_sling-org-apache-sling-event)&#32;[![Sonarcloud Status](https://sonarcloud.io/api/project_badges/measure?project=apache_sling-org-apache-sling-event&metric=alert_status)](https://sonarcloud.io/dashboard?id=apache_sling-org-apache-sling-event)&#32;[![JavaDoc](https://www.javadoc.io/badge/org.apache.sling/org.apache.sling.event.svg)](https://www.javadoc.io/doc/org.apache.sling/org-apache-sling-event)&#32;[![Maven Central](https://maven-badges.herokuapp.com/maven-central/org.apache.sling/org.apache.sling.event/badge.svg)](https://search.maven.org/#search%7Cga%7C1%7Cg%3A%22org.apache.sling%22%20a%3A%22org.apache.sling.event%22) [![License](https://img.shields.io/badge/License-Apache%202.0-blue.svg)](https://www.apache.org/licenses/LICENSE-2.0)
 
 # Apache Sling Event Support
 
diff --git a/pom.xml b/pom.xml
index 0e441fc..0668f5f 100644
--- a/pom.xml
+++ b/pom.xml
@@ -263,6 +263,12 @@
             <version>3.1.0</version>
             <scope>provided</scope>
         </dependency>
+         <dependency>
+            <artifactId>metrics-core</artifactId>
+            <version>3.2.4</version>
+            <groupId>io.dropwizard.metrics</groupId>
+            <scope>provided</scope>
+        </dependency>
         <dependency>
             <groupId>org.apache.sling</groupId>
             <artifactId>org.apache.sling.serviceusermapper</artifactId>
diff --git a/src/main/java/org/apache/sling/event/impl/jobs/notifications/NewJobSender.java b/src/main/java/org/apache/sling/event/impl/jobs/notifications/NewJobSender.java
index dc16be2..668508d 100644
--- a/src/main/java/org/apache/sling/event/impl/jobs/notifications/NewJobSender.java
+++ b/src/main/java/org/apache/sling/event/impl/jobs/notifications/NewJobSender.java
@@ -76,6 +76,7 @@
         properties.put(ResourceChangeListener.PATHS, this.configuration.getLocalJobsPath());
 
         this.listenerRegistration = bundleContext.registerService(ResourceChangeListener.class, this, properties);
+        logger.debug("Registered resource event listener for {}", this.configuration.getLocalJobsPath());
     }
 
     /**
@@ -88,14 +89,15 @@
             this.listenerRegistration.unregister();
             this.listenerRegistration = null;
         }
+        logger.debug("Deactivating resource event listener");
     }
 
     @Override
 	public void onChange(final List<ResourceChange> resourceChanges) {
     	for(final ResourceChange resourceChange : resourceChanges) {
-    		logger.debug("Received event {}", resourceChange);
+            final String path = resourceChange.getPath();
 
-    		final String path = resourceChange.getPath();
+            logger.debug("Received event {} : {}", resourceChange.getType().name(), path);
 
     		final int topicStart = this.configuration.getLocalJobsPath().length() + 1;
     		final int topicEnd = path.indexOf('/', topicStart);
@@ -109,12 +111,13 @@
                 	properties.put(NotificationConstants.NOTIFICATION_PROPERTY_JOB_ID, jobId);
                     properties.put(NotificationConstants.NOTIFICATION_PROPERTY_JOB_TOPIC, topic);
 
-                 // we also set internally the queue name
+                     // we also set internally the queue name
                     final String queueName = this.configuration.getQueueConfigurationManager().getQueueInfo(topic).queueName;
                     properties.put(Job.PROPERTY_JOB_QUEUE_NAME, queueName);
 
                     final Event jobEvent = new Event(NotificationConstants.TOPIC_JOB_ADDED, properties);
                     // as this is send within handling an event, we do sync call
+                    logger.debug("Sending event {} : {}", topic, jobId);
                     this.eventAdmin.sendEvent(jobEvent);
                 }
     		}
diff --git a/src/main/java/org/apache/sling/event/impl/jobs/queues/JobQueueImpl.java b/src/main/java/org/apache/sling/event/impl/jobs/queues/JobQueueImpl.java
index 34ff143..5cabbbf 100644
--- a/src/main/java/org/apache/sling/event/impl/jobs/queues/JobQueueImpl.java
+++ b/src/main/java/org/apache/sling/event/impl/jobs/queues/JobQueueImpl.java
@@ -455,15 +455,18 @@
                 retryCount++;
                 if ( retries != -1 && retryCount > retries ) {
                     if ( this.logger.isDebugEnabled() ) {
-                        this.logger.debug("Cancelled job {}", Utility.toString(handler.getJob()));
+                        this.logger.error("Cancelled job {} after {} unsuccessful retries",
+                                Utility.toString(handler.getJob()),
+                                retries);
                     }
                     info.finalState = InternalJobState.CANCELLED;
                 } else {
                     info.reschedule = true;
                     handler.getJob().retry();
-                    if ( this.logger.isDebugEnabled() ) {
-                        this.logger.debug("Failed job {}", Utility.toString(handler.getJob()));
-                    }
+                    this.logger.warn("Failed job {}, will retry {} more time(s), retryCount={}",
+                            Utility.toString(handler.getJob()),
+                            retries-retryCount,
+                            retryCount);
                     info.finalState = InternalJobState.FAILED;
                 }
                 break;
diff --git a/src/main/java/org/apache/sling/event/impl/jobs/queues/QueueManager.java b/src/main/java/org/apache/sling/event/impl/jobs/queues/QueueManager.java
index 11f24a9..40edb37 100644
--- a/src/main/java/org/apache/sling/event/impl/jobs/queues/QueueManager.java
+++ b/src/main/java/org/apache/sling/event/impl/jobs/queues/QueueManager.java
@@ -242,6 +242,7 @@
             }
         }
         if ( queue != null ) {
+            logger.debug("Starting queue {}", queueInfo.queueName);
             if ( !isNewQueue ) {
                 queue.wakeUpQueue(topics);
             }
@@ -406,6 +407,7 @@
     public void handleEvent(final Event event) {
         final String topic = (String)event.getProperty(NotificationConstants.NOTIFICATION_PROPERTY_JOB_TOPIC);
         if ( this.isActive.get() && topic != null ) {
+            logger.debug("Received event {}", topic);
             final QueueInfo info = this.configuration.getQueueConfigurationManager().getQueueInfo(topic);
             this.start(info, Collections.singleton(topic));
         }
diff --git a/src/main/java/org/apache/sling/event/impl/jobs/stats/GaugeSupport.java b/src/main/java/org/apache/sling/event/impl/jobs/stats/GaugeSupport.java
new file mode 100644
index 0000000..a73dca8
--- /dev/null
+++ b/src/main/java/org/apache/sling/event/impl/jobs/stats/GaugeSupport.java
@@ -0,0 +1,173 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sling.event.impl.jobs.stats;
+
+import com.codahale.metrics.Gauge;
+import com.codahale.metrics.MetricRegistry;
+import org.apache.sling.event.jobs.Statistics;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
+/**
+ * Helper class that holds gauges for relevant queue statistics.
+ */
+class GaugeSupport {
+
+    static final String GAUGE_NAME_PREFIX = "event.jobs";
+    static final String QUEUE_PREFIX = "queue";
+    static final String CANCELLED_METRIC_SUFFIX = ".cancelled.count";
+    static final String FINISHED_METRIC_SUFFIX = ".finished.count";
+    static final String FAILED__METRIC_SUFFIX = ".failed.count";
+    static final String QUEUED_METRIC_SUFFIX = ".queued.count";
+    static final String PROCESSED_METRIC_SUFFIX = ".processed.count";
+    static final String ACTIVE_METRIC_SUFFIX = ".active.count";
+    static final String AVG_WAITING_TIME_METRIC_SUFFIX = ".averageWaitingTime";
+    static final String AVG_PROCESSING_TIME_METRIC_SUFFIX = ".averageProcessingTime";
+
+    private final MetricRegistry metricRegistry;
+    private final Map<String, Gauge<Long>> gaugeList = new HashMap<>();
+    private final Set<String> gaugeMetricNames = new HashSet<>();
+    private final String queueName;
+
+    /**
+     * Create a new GaugeSupport instance for the global queue.
+     *
+     * @param globalQueueStats the global queueStats
+     * @param metricRegistry   the (sling) metric registry
+     */
+    GaugeSupport(final Statistics globalQueueStats, final MetricRegistry metricRegistry) {
+        this(null, globalQueueStats, metricRegistry);
+    }
+
+    /**
+     * Creates a new GaugeSupport instance. Registers gauges for jobs (based on the queueStats).
+     *
+     * @param queueName      name of the queue
+     * @param queueStats     queueStats of that queue
+     * @param metricRegistry the (sling) metric registry
+     */
+    GaugeSupport(final String queueName, final Statistics queueStats, final MetricRegistry metricRegistry) {
+        this.metricRegistry = metricRegistry;
+        this.queueName = getSanitizedQueueName(queueName);
+        if (metricRegistry != null && queueStats != null) {
+            gaugeList.put(FINISHED_METRIC_SUFFIX, new Gauge<Long>() {
+
+                public Long getValue() {
+                    return queueStats.getNumberOfFinishedJobs();
+                }
+            });
+            gaugeList.put(CANCELLED_METRIC_SUFFIX, new Gauge<Long>() {
+
+                public Long getValue() {
+                    return queueStats.getNumberOfCancelledJobs();
+                }
+            });
+            gaugeList.put(FAILED__METRIC_SUFFIX, new Gauge<Long>() {
+
+                public Long getValue() {
+                    return queueStats.getNumberOfFailedJobs();
+                }
+            });
+            gaugeList.put(QUEUED_METRIC_SUFFIX, new Gauge<Long>() {
+
+                public Long getValue() {
+                    return queueStats.getNumberOfQueuedJobs();
+                }
+            });
+            gaugeList.put(PROCESSED_METRIC_SUFFIX, new Gauge<Long>() {
+
+                public Long getValue() {
+                    return queueStats.getNumberOfProcessedJobs();
+                }
+            });
+            gaugeList.put(ACTIVE_METRIC_SUFFIX, new Gauge<Long>() {
+
+                public Long getValue() {
+                    return queueStats.getNumberOfActiveJobs();
+                }
+            });
+            gaugeList.put(AVG_WAITING_TIME_METRIC_SUFFIX, new Gauge<Long>() {
+
+                public Long getValue() {
+                    return queueStats.getAverageWaitingTime();
+                }
+            });
+            gaugeList.put(AVG_PROCESSING_TIME_METRIC_SUFFIX, new Gauge<Long>() {
+
+                public Long getValue() {
+                    return queueStats.getAverageProcessingTime();
+                }
+            });
+        }
+    }
+
+    /**
+     * Unregisters all job gauges of the queue.
+     */
+    void shutdown() {
+        if (metricRegistry != null) {
+           for (String metricName : gaugeMetricNames) {
+                try {
+                    metricRegistry.remove(metricName);
+                } catch (RuntimeException e) {
+                    // ignore
+                }
+            }
+        }
+    }
+
+    /**
+     * Initializes the metric registry with the gauges.
+     */
+    void initialize() {
+        for (Map.Entry<String, Gauge<Long>> entry : gaugeList.entrySet()) {
+            registerWithSuffix(entry.getKey(), 0, entry.getValue());
+        }
+    }
+
+    private void registerWithSuffix(String suffix, int count, Gauge<Long> value) {
+        try {
+            String metricName = getMetricName(queueName, count, suffix);
+            metricRegistry.register(metricName, value);
+            gaugeMetricNames.add(metricName);
+        } catch (IllegalArgumentException e) {
+            if (queueName != null) {
+                registerWithSuffix(suffix, count + 1, value);
+            }
+        }
+    }
+
+    private String getMetricName(final String queueName, int count, final String metricSuffix) {
+        String metricName = (queueName != null ? "." + QUEUE_PREFIX + "." + queueName : "");
+        if (count > 0) {
+            metricName = metricName + "_" + count;
+        }
+        return GAUGE_NAME_PREFIX + metricName + metricSuffix;
+    }
+
+    private String getSanitizedQueueName(String queueName) {
+        if (queueName == null) {
+            return null;
+        }
+        return queueName.replaceAll("[^a-zA-Z\\d]", "_").toLowerCase();
+    }
+}
diff --git a/src/main/java/org/apache/sling/event/impl/jobs/stats/StatisticsManager.java b/src/main/java/org/apache/sling/event/impl/jobs/stats/StatisticsManager.java
index 455cb1e..8e037b3 100644
--- a/src/main/java/org/apache/sling/event/impl/jobs/stats/StatisticsManager.java
+++ b/src/main/java/org/apache/sling/event/impl/jobs/stats/StatisticsManager.java
@@ -22,13 +22,14 @@
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 
+import com.codahale.metrics.MetricRegistry;
 import org.apache.sling.event.impl.jobs.InternalJobState;
 import org.apache.sling.event.impl.jobs.config.JobManagerConfiguration;
 import org.apache.sling.event.jobs.Statistics;
 import org.apache.sling.event.jobs.TopicStatistics;
 import org.osgi.framework.Constants;
-import org.osgi.service.component.annotations.Component;
-import org.osgi.service.component.annotations.Reference;
+import org.osgi.service.component.annotations.*;
+
 
 /**
  * The statistics manager keeps track of all statistics related tasks.
@@ -43,6 +44,12 @@
     @Reference
     private JobManagerConfiguration configuration;
 
+    /**
+     * The sling metric registry
+     */
+    @Reference(target = "(name=sling)", cardinality = ReferenceCardinality.OPTIONAL)
+    private MetricRegistry metricRegistry;
+
     /** Global statistics. */
     private final StatisticsImpl globalStatistics = new StatisticsImpl() {
 
@@ -57,9 +64,15 @@
 
     };
 
+    /** Gauges for the global job statistics. */
+    private GaugeSupport globalGauges;
+
     /** Statistics per topic. */
     private final ConcurrentMap<String, TopicStatistics> topicStatistics = new ConcurrentHashMap<>();
 
+    /** Gauges for the statistics per topic. */
+    private final ConcurrentMap<String, GaugeSupport> queueGauges = new ConcurrentHashMap<>();
+
     /** Statistics per queue. */
     private final ConcurrentMap<String, Statistics> queueStatistics = new ConcurrentHashMap<>();
 
@@ -105,6 +118,12 @@
         if ( queueStats == null ) {
             queueStatistics.putIfAbsent(queueName, new StatisticsImpl());
             queueStats = (StatisticsImpl)queueStatistics.get(queueName);
+            if (metricRegistry != null) {
+                GaugeSupport gaugeSupport = new GaugeSupport(queueName, queueStats, metricRegistry);
+                if (queueGauges.putIfAbsent(queueName, gaugeSupport) == null) {
+                    gaugeSupport.initialize();
+                }
+            }
         }
         return queueStats;
     }
@@ -141,7 +160,6 @@
             if ( queueStats != null ) {
                 queueStats.finishedJob(processingTime);
             }
-
         }
     }
 
@@ -181,4 +199,22 @@
             queueStats.decQueued();
         }
     }
+
+    @Activate
+    protected void activate() {
+        if (metricRegistry != null) {
+            globalGauges = new GaugeSupport(globalStatistics, metricRegistry);
+            globalGauges.initialize();
+        }
+    }
+
+    @Deactivate
+    protected void deactivate() {
+        if (globalGauges != null) {
+            globalGauges.shutdown();
+        }
+        for (GaugeSupport gaugeSupport : queueGauges.values()) {
+            gaugeSupport.shutdown();
+        }
+    }
 }
diff --git a/src/test/java/org/apache/sling/event/impl/jobs/stats/StatisticsManagerTest.java b/src/test/java/org/apache/sling/event/impl/jobs/stats/StatisticsManagerTest.java
new file mode 100644
index 0000000..71ef1f7
--- /dev/null
+++ b/src/test/java/org/apache/sling/event/impl/jobs/stats/StatisticsManagerTest.java
@@ -0,0 +1,178 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sling.event.impl.jobs.stats;
+
+import com.codahale.metrics.Gauge;
+import com.codahale.metrics.MetricRegistry;
+import org.apache.sling.event.impl.TestUtil;
+import org.apache.sling.event.impl.jobs.InternalJobState;
+import org.apache.sling.event.jobs.Statistics;
+import org.junit.Before;
+import org.junit.Test;
+
+import static org.apache.sling.event.impl.jobs.stats.GaugeSupport.*;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+
+public class StatisticsManagerTest {
+
+    private static final String TEST_QUEUE_NAME = "queue_name";
+    private static final String TEST_TOPIC = "testtopic";
+
+    private StatisticsManager statisticsManager;
+    private MetricRegistry metricRegistry;
+
+    private StatisticsImpl statistics = new StatisticsImpl();
+
+    @Before
+    public void setup() {
+        statisticsManager = new StatisticsManager();
+        metricRegistry = new MetricRegistry();
+        TestUtil.setFieldValue(statisticsManager, "globalStatistics", statistics);
+        TestUtil.setFieldValue(statisticsManager, "metricRegistry", metricRegistry);
+        statisticsManager.activate();
+    }
+
+    @Test
+    public void testActivateDeactivateWithMissingRegistry() {
+        statisticsManager = new StatisticsManager();
+        TestUtil.setFieldValue(statisticsManager, "globalStatistics", statistics);
+        statisticsManager.activate();
+        assertNull(TestUtil.getFieldValue(statisticsManager, "metricRegistry"));
+        statisticsManager.deactivate();
+    }
+
+    @Test
+    public void testGlobalGaugesAreRemovedOnDeactivate() {
+        statisticsManager.jobQueued(TEST_QUEUE_NAME, TEST_TOPIC);
+        assertEquals("Less than 16 metrics present (8 global + 8 topic).",
+                16, metricRegistry.getMetrics().size());
+        statisticsManager.deactivate();
+        assertEquals(0, metricRegistry.getMetrics().size());
+    }
+
+    @Test
+    public void testJobStarted() {
+        long queueTime = 10L;
+        statisticsManager.jobStarted(TEST_QUEUE_NAME, TEST_TOPIC, queueTime);
+        Gauge topicMetric = getTopicMetric(ACTIVE_METRIC_SUFFIX);
+        Gauge waitingTimeTopicMetric = getTopicMetric(AVG_WAITING_TIME_METRIC_SUFFIX);
+        Gauge globalMetric = getGlobalMetric(ACTIVE_METRIC_SUFFIX);
+        Gauge waitingTimeGlobalMetric = getGlobalMetric(AVG_WAITING_TIME_METRIC_SUFFIX);
+
+        Statistics queueStatistics = statisticsManager.getQueueStatistics(TEST_QUEUE_NAME);
+
+        assertEquals(1L, queueStatistics.getNumberOfActiveJobs());
+        assertEquals((Long) 1L, topicMetric.getValue());
+        assertEquals((Long) 1L, globalMetric.getValue());
+        assertEquals((Long) queueStatistics.getAverageWaitingTime(), waitingTimeTopicMetric.getValue());
+        assertEquals((Long) queueStatistics.getAverageWaitingTime(), waitingTimeGlobalMetric.getValue());
+    }
+
+    @Test
+    public void testJobQueueDequeue() {
+        statisticsManager.jobQueued(TEST_QUEUE_NAME, TEST_TOPIC);
+        Gauge topicMetric = getTopicMetric(QUEUED_METRIC_SUFFIX);
+        Gauge globalMetric = getGlobalMetric(QUEUED_METRIC_SUFFIX);
+        Statistics queueStatistics = statisticsManager.getQueueStatistics(TEST_QUEUE_NAME);
+        assertEquals(1L, queueStatistics.getNumberOfQueuedJobs());
+        assertEquals((Long) 1L, topicMetric.getValue());
+        assertEquals((Long) 1L, globalMetric.getValue());
+
+        statisticsManager.jobDequeued(TEST_QUEUE_NAME, TEST_TOPIC);
+        assertEquals(0L, queueStatistics.getNumberOfQueuedJobs());
+        assertEquals((Long) 0L, topicMetric.getValue());
+        assertEquals((Long) 0L, globalMetric.getValue());
+    }
+
+    @Test
+    public void testJobCancelled() {
+        statisticsManager.jobEnded(TEST_QUEUE_NAME, TEST_TOPIC, InternalJobState.CANCELLED, 0L);
+        Gauge topicMetric = getTopicMetric(CANCELLED_METRIC_SUFFIX);
+        Gauge globalMetric = getGlobalMetric(CANCELLED_METRIC_SUFFIX);
+        Statistics queueStatistics = statisticsManager.getQueueStatistics(TEST_QUEUE_NAME);
+        assertEquals(1L, queueStatistics.getNumberOfCancelledJobs());
+        assertEquals((Long) 1L, topicMetric.getValue());
+        assertEquals((Long) 1L, globalMetric.getValue());
+    }
+
+    @Test
+    public void testJobFailed() {
+        statisticsManager.jobEnded(TEST_QUEUE_NAME, TEST_TOPIC, InternalJobState.FAILED, 0L);
+        Gauge topicMetric = getTopicMetric(FAILED__METRIC_SUFFIX);
+        Gauge globalMetric = getGlobalMetric(FAILED__METRIC_SUFFIX);
+        Statistics queueStatistics = statisticsManager.getQueueStatistics(TEST_QUEUE_NAME);
+        assertEquals(1L, queueStatistics.getNumberOfFailedJobs());
+        assertEquals((Long) 1L, topicMetric.getValue());
+        assertEquals((Long) 1L, globalMetric.getValue());
+    }
+
+    @Test
+    public void testJobFinished() {
+        long processingTime = 10L;
+        statisticsManager.jobEnded(TEST_QUEUE_NAME, TEST_TOPIC, InternalJobState.SUCCEEDED, processingTime);
+        Gauge finishedTopicMetric = getTopicMetric(FINISHED_METRIC_SUFFIX);
+        Gauge processedTopicMetric = getTopicMetric(PROCESSED_METRIC_SUFFIX);
+        Gauge processingTopicTimeMetric = getTopicMetric(AVG_PROCESSING_TIME_METRIC_SUFFIX);
+        Gauge finishedGlobalMetric = getGlobalMetric(FINISHED_METRIC_SUFFIX);
+        Gauge processedGlobalMetric = getGlobalMetric(PROCESSED_METRIC_SUFFIX);
+        Gauge processingGlobalTimeMetric = getGlobalMetric(AVG_PROCESSING_TIME_METRIC_SUFFIX);
+
+        Statistics queueStatistics = statisticsManager.getQueueStatistics(TEST_QUEUE_NAME);
+
+        assertEquals(1L, queueStatistics.getNumberOfFinishedJobs());
+        assertEquals((Long) 1L, finishedTopicMetric.getValue());
+        assertEquals((Long) 1L, processedTopicMetric.getValue());
+        assertEquals((Long) queueStatistics.getAverageProcessingTime(), processingTopicTimeMetric.getValue());
+        assertEquals((Long) 1L, finishedGlobalMetric.getValue());
+        assertEquals((Long) 1L, processedGlobalMetric.getValue());
+        assertEquals((Long) queueStatistics.getAverageProcessingTime(), processingGlobalTimeMetric.getValue());
+    }
+
+    @Test
+    public void testQueueWithSpecialCharsIsSanitized() {
+        String queueName = "Topic*With%Special/Chars";
+        String queueName2 = "topic$with?special§chars";
+        String queueName3 = "topic with<special>chars";
+        statisticsManager.jobQueued(queueName, TEST_TOPIC);
+        statisticsManager.jobQueued(queueName2, TEST_TOPIC);
+        statisticsManager.jobQueued(queueName3, TEST_TOPIC);
+
+        Gauge topicMetric = (Gauge) metricRegistry.getMetrics().get(GAUGE_NAME_PREFIX +
+                "." + QUEUE_PREFIX + ".topic_with_special_chars" + QUEUED_METRIC_SUFFIX);
+        Gauge topicMetric2 = (Gauge) metricRegistry.getMetrics().get(GAUGE_NAME_PREFIX +
+                "." + QUEUE_PREFIX + ".topic_with_special_chars_1" + QUEUED_METRIC_SUFFIX);
+        Gauge topicMetric3 = (Gauge) metricRegistry.getMetrics().get(GAUGE_NAME_PREFIX +
+                "." + QUEUE_PREFIX + ".topic_with_special_chars_2" + QUEUED_METRIC_SUFFIX);
+        assertEquals(1L, topicMetric.getValue());
+        assertEquals(1L, topicMetric2.getValue());
+        assertEquals(1L, topicMetric3.getValue());
+    }
+
+    private Gauge getTopicMetric(String metricSuffix) {
+        return (Gauge) metricRegistry.getMetrics().get(GAUGE_NAME_PREFIX +
+                "." + QUEUE_PREFIX + "." + TEST_QUEUE_NAME + metricSuffix);
+    }
+
+    private Gauge getGlobalMetric(String metricSuffix) {
+        return (Gauge) metricRegistry.getMetrics().get(GAUGE_NAME_PREFIX + metricSuffix);
+    }
+
+
+}
\ No newline at end of file
diff --git a/src/test/java/org/apache/sling/event/it/AbstractJobHandlingTest.java b/src/test/java/org/apache/sling/event/it/AbstractJobHandlingTest.java
index c51ecd9..464ef74 100644
--- a/src/test/java/org/apache/sling/event/it/AbstractJobHandlingTest.java
+++ b/src/test/java/org/apache/sling/event/it/AbstractJobHandlingTest.java
@@ -186,6 +186,8 @@
                 mavenBundle("org.apache.commons", "commons-lang3", "3.5"),
                 mavenBundle("commons-pool", "commons-pool", "1.6"),
 
+                mavenBundle("io.dropwizard.metrics", "metrics-core", "3.2.4"),
+
                 mavenBundle("org.apache.servicemix.bundles", "org.apache.servicemix.bundles.concurrent", "1.3.4_1"),
 
                 mavenBundle("org.apache.geronimo.bundles", "commons-httpclient", "3.1_1"),
@@ -209,6 +211,7 @@
                 mavenBundle("org.apache.sling", "org.apache.sling.commons.johnzon", "1.0.0"),
                 mavenBundle("org.apache.sling", "org.apache.sling.commons.scheduler", "2.4.14"),
                 mavenBundle("org.apache.sling", "org.apache.sling.commons.threads", "3.2.4"),
+                mavenBundle("org.apache.sling", "org.apache.sling.commons.metrics", "1.2.6"),
 
                 mavenBundle("org.apache.sling", "org.apache.sling.auth.core", "1.3.12"),
                 mavenBundle("org.apache.sling", "org.apache.sling.discovery.api", "1.0.2"),