Merge pull request #1 from apache/master
merge from upstream
diff --git a/Jenkinsfile b/Jenkinsfile
new file mode 100644
index 0000000..f582519
--- /dev/null
+++ b/Jenkinsfile
@@ -0,0 +1,20 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+slingOsgiBundleBuild()
diff --git a/README.md b/README.md
index 45122a3..6e33377 100644
--- a/README.md
+++ b/README.md
@@ -1,6 +1,6 @@
-[<img src="http://sling.apache.org/res/logos/sling.png"/>](http://sling.apache.org)
+[![Apache Sling](https://sling.apache.org/res/logos/sling.png)](https://sling.apache.org)
- [![Build Status](https://builds.apache.org/buildStatus/icon?job=sling-org-apache-sling-event-1.8)](https://builds.apache.org/view/S-Z/view/Sling/job/sling-org-apache-sling-event-1.8) [![Test Status](https://img.shields.io/jenkins/t/https/builds.apache.org/view/S-Z/view/Sling/job/sling-org-apache-sling-event-1.8.svg)](https://builds.apache.org/view/S-Z/view/Sling/job/sling-org-apache-sling-event-1.8/test_results_analyzer/) [![Maven Central](https://maven-badges.herokuapp.com/maven-central/org.apache.sling/org.apache.sling.event/badge.svg)](http://search.maven.org/#search%7Cga%7C1%7Cg%3A%22org.apache.sling%22%20a%3A%22org.apache.sling.event%22) [![JavaDocs](https://www.javadoc.io/badge/org.apache.sling/org.apache.sling.event.svg)](https://www.javadoc.io/doc/org.apache.sling/org.apache.sling.event) [![License](https://img.shields.io/badge/License-Apache%202.0-blue.svg)](https://www.apache.org/licenses/LICENSE-2.0)
+ [![Build Status](https://ci-builds.apache.org/job/Sling/job/modules/job/sling-org-apache-sling-event/job/master/badge/icon)](https://ci-builds.apache.org/job/Sling/job/modules/job/sling-org-apache-sling-event/job/master/) [![Test Status](https://img.shields.io/jenkins/tests.svg?jobUrl=https://ci-builds.apache.org/job/Sling/job/modules/job/sling-org-apache-sling-event/job/master/)](https://ci-builds.apache.org/job/Sling/job/modules/job/sling-org-apache-sling-event/job/master/test/?width=800&height=600) [![Coverage](https://sonarcloud.io/api/project_badges/measure?project=apache_sling-org-apache-sling-event&metric=coverage)](https://sonarcloud.io/dashboard?id=apache_sling-org-apache-sling-event) [![Sonarcloud Status](https://sonarcloud.io/api/project_badges/measure?project=apache_sling-org-apache-sling-event&metric=alert_status)](https://sonarcloud.io/dashboard?id=apache_sling-org-apache-sling-event) [![JavaDoc](https://www.javadoc.io/badge/org.apache.sling/org.apache.sling.event.svg)](https://www.javadoc.io/doc/org.apache.sling/org-apache-sling-event) [![Maven Central](https://maven-badges.herokuapp.com/maven-central/org.apache.sling/org.apache.sling.event/badge.svg)](https://search.maven.org/#search%7Cga%7C1%7Cg%3A%22org.apache.sling%22%20a%3A%22org.apache.sling.event%22) [![License](https://img.shields.io/badge/License-Apache%202.0-blue.svg)](https://www.apache.org/licenses/LICENSE-2.0)
# Apache Sling Event Support
diff --git a/pom.xml b/pom.xml
index 0e441fc..0668f5f 100644
--- a/pom.xml
+++ b/pom.xml
@@ -263,6 +263,12 @@
<version>3.1.0</version>
<scope>provided</scope>
</dependency>
+ <dependency>
+ <artifactId>metrics-core</artifactId>
+ <version>3.2.4</version>
+ <groupId>io.dropwizard.metrics</groupId>
+ <scope>provided</scope>
+ </dependency>
<dependency>
<groupId>org.apache.sling</groupId>
<artifactId>org.apache.sling.serviceusermapper</artifactId>
diff --git a/src/main/java/org/apache/sling/event/impl/jobs/notifications/NewJobSender.java b/src/main/java/org/apache/sling/event/impl/jobs/notifications/NewJobSender.java
index dc16be2..668508d 100644
--- a/src/main/java/org/apache/sling/event/impl/jobs/notifications/NewJobSender.java
+++ b/src/main/java/org/apache/sling/event/impl/jobs/notifications/NewJobSender.java
@@ -76,6 +76,7 @@
properties.put(ResourceChangeListener.PATHS, this.configuration.getLocalJobsPath());
this.listenerRegistration = bundleContext.registerService(ResourceChangeListener.class, this, properties);
+ logger.debug("Registered resource event listener for {}", this.configuration.getLocalJobsPath());
}
/**
@@ -88,14 +89,15 @@
this.listenerRegistration.unregister();
this.listenerRegistration = null;
}
+ logger.debug("Deactivating resource event listener");
}
@Override
public void onChange(final List<ResourceChange> resourceChanges) {
for(final ResourceChange resourceChange : resourceChanges) {
- logger.debug("Received event {}", resourceChange);
+ final String path = resourceChange.getPath();
- final String path = resourceChange.getPath();
+ logger.debug("Received event {} : {}", resourceChange.getType().name(), path);
final int topicStart = this.configuration.getLocalJobsPath().length() + 1;
final int topicEnd = path.indexOf('/', topicStart);
@@ -109,12 +111,13 @@
properties.put(NotificationConstants.NOTIFICATION_PROPERTY_JOB_ID, jobId);
properties.put(NotificationConstants.NOTIFICATION_PROPERTY_JOB_TOPIC, topic);
- // we also set internally the queue name
+ // we also set internally the queue name
final String queueName = this.configuration.getQueueConfigurationManager().getQueueInfo(topic).queueName;
properties.put(Job.PROPERTY_JOB_QUEUE_NAME, queueName);
final Event jobEvent = new Event(NotificationConstants.TOPIC_JOB_ADDED, properties);
// as this is send within handling an event, we do sync call
+ logger.debug("Sending event {} : {}", topic, jobId);
this.eventAdmin.sendEvent(jobEvent);
}
}
diff --git a/src/main/java/org/apache/sling/event/impl/jobs/queues/JobQueueImpl.java b/src/main/java/org/apache/sling/event/impl/jobs/queues/JobQueueImpl.java
index 34ff143..5cabbbf 100644
--- a/src/main/java/org/apache/sling/event/impl/jobs/queues/JobQueueImpl.java
+++ b/src/main/java/org/apache/sling/event/impl/jobs/queues/JobQueueImpl.java
@@ -455,15 +455,18 @@
retryCount++;
if ( retries != -1 && retryCount > retries ) {
if ( this.logger.isDebugEnabled() ) {
- this.logger.debug("Cancelled job {}", Utility.toString(handler.getJob()));
+ this.logger.error("Cancelled job {} after {} unsuccessful retries",
+ Utility.toString(handler.getJob()),
+ retries);
}
info.finalState = InternalJobState.CANCELLED;
} else {
info.reschedule = true;
handler.getJob().retry();
- if ( this.logger.isDebugEnabled() ) {
- this.logger.debug("Failed job {}", Utility.toString(handler.getJob()));
- }
+ this.logger.warn("Failed job {}, will retry {} more time(s), retryCount={}",
+ Utility.toString(handler.getJob()),
+ retries-retryCount,
+ retryCount);
info.finalState = InternalJobState.FAILED;
}
break;
diff --git a/src/main/java/org/apache/sling/event/impl/jobs/queues/QueueManager.java b/src/main/java/org/apache/sling/event/impl/jobs/queues/QueueManager.java
index 11f24a9..40edb37 100644
--- a/src/main/java/org/apache/sling/event/impl/jobs/queues/QueueManager.java
+++ b/src/main/java/org/apache/sling/event/impl/jobs/queues/QueueManager.java
@@ -242,6 +242,7 @@
}
}
if ( queue != null ) {
+ logger.debug("Starting queue {}", queueInfo.queueName);
if ( !isNewQueue ) {
queue.wakeUpQueue(topics);
}
@@ -406,6 +407,7 @@
public void handleEvent(final Event event) {
final String topic = (String)event.getProperty(NotificationConstants.NOTIFICATION_PROPERTY_JOB_TOPIC);
if ( this.isActive.get() && topic != null ) {
+ logger.debug("Received event {}", topic);
final QueueInfo info = this.configuration.getQueueConfigurationManager().getQueueInfo(topic);
this.start(info, Collections.singleton(topic));
}
diff --git a/src/main/java/org/apache/sling/event/impl/jobs/stats/GaugeSupport.java b/src/main/java/org/apache/sling/event/impl/jobs/stats/GaugeSupport.java
new file mode 100644
index 0000000..a73dca8
--- /dev/null
+++ b/src/main/java/org/apache/sling/event/impl/jobs/stats/GaugeSupport.java
@@ -0,0 +1,173 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sling.event.impl.jobs.stats;
+
+import com.codahale.metrics.Gauge;
+import com.codahale.metrics.MetricRegistry;
+import org.apache.sling.event.jobs.Statistics;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
+/**
+ * Helper class that holds gauges for relevant queue statistics.
+ */
+class GaugeSupport {
+
+ static final String GAUGE_NAME_PREFIX = "event.jobs";
+ static final String QUEUE_PREFIX = "queue";
+ static final String CANCELLED_METRIC_SUFFIX = ".cancelled.count";
+ static final String FINISHED_METRIC_SUFFIX = ".finished.count";
+ static final String FAILED__METRIC_SUFFIX = ".failed.count";
+ static final String QUEUED_METRIC_SUFFIX = ".queued.count";
+ static final String PROCESSED_METRIC_SUFFIX = ".processed.count";
+ static final String ACTIVE_METRIC_SUFFIX = ".active.count";
+ static final String AVG_WAITING_TIME_METRIC_SUFFIX = ".averageWaitingTime";
+ static final String AVG_PROCESSING_TIME_METRIC_SUFFIX = ".averageProcessingTime";
+
+ private final MetricRegistry metricRegistry;
+ private final Map<String, Gauge<Long>> gaugeList = new HashMap<>();
+ private final Set<String> gaugeMetricNames = new HashSet<>();
+ private final String queueName;
+
+ /**
+ * Create a new GaugeSupport instance for the global queue.
+ *
+ * @param globalQueueStats the global queueStats
+ * @param metricRegistry the (sling) metric registry
+ */
+ GaugeSupport(final Statistics globalQueueStats, final MetricRegistry metricRegistry) {
+ this(null, globalQueueStats, metricRegistry);
+ }
+
+ /**
+ * Creates a new GaugeSupport instance. Registers gauges for jobs (based on the queueStats).
+ *
+ * @param queueName name of the queue
+ * @param queueStats queueStats of that queue
+ * @param metricRegistry the (sling) metric registry
+ */
+ GaugeSupport(final String queueName, final Statistics queueStats, final MetricRegistry metricRegistry) {
+ this.metricRegistry = metricRegistry;
+ this.queueName = getSanitizedQueueName(queueName);
+ if (metricRegistry != null && queueStats != null) {
+ gaugeList.put(FINISHED_METRIC_SUFFIX, new Gauge<Long>() {
+
+ public Long getValue() {
+ return queueStats.getNumberOfFinishedJobs();
+ }
+ });
+ gaugeList.put(CANCELLED_METRIC_SUFFIX, new Gauge<Long>() {
+
+ public Long getValue() {
+ return queueStats.getNumberOfCancelledJobs();
+ }
+ });
+ gaugeList.put(FAILED__METRIC_SUFFIX, new Gauge<Long>() {
+
+ public Long getValue() {
+ return queueStats.getNumberOfFailedJobs();
+ }
+ });
+ gaugeList.put(QUEUED_METRIC_SUFFIX, new Gauge<Long>() {
+
+ public Long getValue() {
+ return queueStats.getNumberOfQueuedJobs();
+ }
+ });
+ gaugeList.put(PROCESSED_METRIC_SUFFIX, new Gauge<Long>() {
+
+ public Long getValue() {
+ return queueStats.getNumberOfProcessedJobs();
+ }
+ });
+ gaugeList.put(ACTIVE_METRIC_SUFFIX, new Gauge<Long>() {
+
+ public Long getValue() {
+ return queueStats.getNumberOfActiveJobs();
+ }
+ });
+ gaugeList.put(AVG_WAITING_TIME_METRIC_SUFFIX, new Gauge<Long>() {
+
+ public Long getValue() {
+ return queueStats.getAverageWaitingTime();
+ }
+ });
+ gaugeList.put(AVG_PROCESSING_TIME_METRIC_SUFFIX, new Gauge<Long>() {
+
+ public Long getValue() {
+ return queueStats.getAverageProcessingTime();
+ }
+ });
+ }
+ }
+
+ /**
+ * Unregisters all job gauges of the queue.
+ */
+ void shutdown() {
+ if (metricRegistry != null) {
+ for (String metricName : gaugeMetricNames) {
+ try {
+ metricRegistry.remove(metricName);
+ } catch (RuntimeException e) {
+ // ignore
+ }
+ }
+ }
+ }
+
+ /**
+ * Initializes the metric registry with the gauges.
+ */
+ void initialize() {
+ for (Map.Entry<String, Gauge<Long>> entry : gaugeList.entrySet()) {
+ registerWithSuffix(entry.getKey(), 0, entry.getValue());
+ }
+ }
+
+ private void registerWithSuffix(String suffix, int count, Gauge<Long> value) {
+ try {
+ String metricName = getMetricName(queueName, count, suffix);
+ metricRegistry.register(metricName, value);
+ gaugeMetricNames.add(metricName);
+ } catch (IllegalArgumentException e) {
+ if (queueName != null) {
+ registerWithSuffix(suffix, count + 1, value);
+ }
+ }
+ }
+
+ private String getMetricName(final String queueName, int count, final String metricSuffix) {
+ String metricName = (queueName != null ? "." + QUEUE_PREFIX + "." + queueName : "");
+ if (count > 0) {
+ metricName = metricName + "_" + count;
+ }
+ return GAUGE_NAME_PREFIX + metricName + metricSuffix;
+ }
+
+ private String getSanitizedQueueName(String queueName) {
+ if (queueName == null) {
+ return null;
+ }
+ return queueName.replaceAll("[^a-zA-Z\\d]", "_").toLowerCase();
+ }
+}
diff --git a/src/main/java/org/apache/sling/event/impl/jobs/stats/StatisticsManager.java b/src/main/java/org/apache/sling/event/impl/jobs/stats/StatisticsManager.java
index 455cb1e..8e037b3 100644
--- a/src/main/java/org/apache/sling/event/impl/jobs/stats/StatisticsManager.java
+++ b/src/main/java/org/apache/sling/event/impl/jobs/stats/StatisticsManager.java
@@ -22,13 +22,14 @@
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
+import com.codahale.metrics.MetricRegistry;
import org.apache.sling.event.impl.jobs.InternalJobState;
import org.apache.sling.event.impl.jobs.config.JobManagerConfiguration;
import org.apache.sling.event.jobs.Statistics;
import org.apache.sling.event.jobs.TopicStatistics;
import org.osgi.framework.Constants;
-import org.osgi.service.component.annotations.Component;
-import org.osgi.service.component.annotations.Reference;
+import org.osgi.service.component.annotations.*;
+
/**
* The statistics manager keeps track of all statistics related tasks.
@@ -43,6 +44,12 @@
@Reference
private JobManagerConfiguration configuration;
+ /**
+ * The sling metric registry
+ */
+ @Reference(target = "(name=sling)", cardinality = ReferenceCardinality.OPTIONAL)
+ private MetricRegistry metricRegistry;
+
/** Global statistics. */
private final StatisticsImpl globalStatistics = new StatisticsImpl() {
@@ -57,9 +64,15 @@
};
+ /** Gauges for the global job statistics. */
+ private GaugeSupport globalGauges;
+
/** Statistics per topic. */
private final ConcurrentMap<String, TopicStatistics> topicStatistics = new ConcurrentHashMap<>();
+ /** Gauges for the statistics per topic. */
+ private final ConcurrentMap<String, GaugeSupport> queueGauges = new ConcurrentHashMap<>();
+
/** Statistics per queue. */
private final ConcurrentMap<String, Statistics> queueStatistics = new ConcurrentHashMap<>();
@@ -105,6 +118,12 @@
if ( queueStats == null ) {
queueStatistics.putIfAbsent(queueName, new StatisticsImpl());
queueStats = (StatisticsImpl)queueStatistics.get(queueName);
+ if (metricRegistry != null) {
+ GaugeSupport gaugeSupport = new GaugeSupport(queueName, queueStats, metricRegistry);
+ if (queueGauges.putIfAbsent(queueName, gaugeSupport) == null) {
+ gaugeSupport.initialize();
+ }
+ }
}
return queueStats;
}
@@ -141,7 +160,6 @@
if ( queueStats != null ) {
queueStats.finishedJob(processingTime);
}
-
}
}
@@ -181,4 +199,22 @@
queueStats.decQueued();
}
}
+
+ @Activate
+ protected void activate() {
+ if (metricRegistry != null) {
+ globalGauges = new GaugeSupport(globalStatistics, metricRegistry);
+ globalGauges.initialize();
+ }
+ }
+
+ @Deactivate
+ protected void deactivate() {
+ if (globalGauges != null) {
+ globalGauges.shutdown();
+ }
+ for (GaugeSupport gaugeSupport : queueGauges.values()) {
+ gaugeSupport.shutdown();
+ }
+ }
}
diff --git a/src/test/java/org/apache/sling/event/impl/jobs/stats/StatisticsManagerTest.java b/src/test/java/org/apache/sling/event/impl/jobs/stats/StatisticsManagerTest.java
new file mode 100644
index 0000000..71ef1f7
--- /dev/null
+++ b/src/test/java/org/apache/sling/event/impl/jobs/stats/StatisticsManagerTest.java
@@ -0,0 +1,178 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sling.event.impl.jobs.stats;
+
+import com.codahale.metrics.Gauge;
+import com.codahale.metrics.MetricRegistry;
+import org.apache.sling.event.impl.TestUtil;
+import org.apache.sling.event.impl.jobs.InternalJobState;
+import org.apache.sling.event.jobs.Statistics;
+import org.junit.Before;
+import org.junit.Test;
+
+import static org.apache.sling.event.impl.jobs.stats.GaugeSupport.*;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+
+public class StatisticsManagerTest {
+
+ private static final String TEST_QUEUE_NAME = "queue_name";
+ private static final String TEST_TOPIC = "testtopic";
+
+ private StatisticsManager statisticsManager;
+ private MetricRegistry metricRegistry;
+
+ private StatisticsImpl statistics = new StatisticsImpl();
+
+ @Before
+ public void setup() {
+ statisticsManager = new StatisticsManager();
+ metricRegistry = new MetricRegistry();
+ TestUtil.setFieldValue(statisticsManager, "globalStatistics", statistics);
+ TestUtil.setFieldValue(statisticsManager, "metricRegistry", metricRegistry);
+ statisticsManager.activate();
+ }
+
+ @Test
+ public void testActivateDeactivateWithMissingRegistry() {
+ statisticsManager = new StatisticsManager();
+ TestUtil.setFieldValue(statisticsManager, "globalStatistics", statistics);
+ statisticsManager.activate();
+ assertNull(TestUtil.getFieldValue(statisticsManager, "metricRegistry"));
+ statisticsManager.deactivate();
+ }
+
+ @Test
+ public void testGlobalGaugesAreRemovedOnDeactivate() {
+ statisticsManager.jobQueued(TEST_QUEUE_NAME, TEST_TOPIC);
+ assertEquals("Less than 16 metrics present (8 global + 8 topic).",
+ 16, metricRegistry.getMetrics().size());
+ statisticsManager.deactivate();
+ assertEquals(0, metricRegistry.getMetrics().size());
+ }
+
+ @Test
+ public void testJobStarted() {
+ long queueTime = 10L;
+ statisticsManager.jobStarted(TEST_QUEUE_NAME, TEST_TOPIC, queueTime);
+ Gauge topicMetric = getTopicMetric(ACTIVE_METRIC_SUFFIX);
+ Gauge waitingTimeTopicMetric = getTopicMetric(AVG_WAITING_TIME_METRIC_SUFFIX);
+ Gauge globalMetric = getGlobalMetric(ACTIVE_METRIC_SUFFIX);
+ Gauge waitingTimeGlobalMetric = getGlobalMetric(AVG_WAITING_TIME_METRIC_SUFFIX);
+
+ Statistics queueStatistics = statisticsManager.getQueueStatistics(TEST_QUEUE_NAME);
+
+ assertEquals(1L, queueStatistics.getNumberOfActiveJobs());
+ assertEquals((Long) 1L, topicMetric.getValue());
+ assertEquals((Long) 1L, globalMetric.getValue());
+ assertEquals((Long) queueStatistics.getAverageWaitingTime(), waitingTimeTopicMetric.getValue());
+ assertEquals((Long) queueStatistics.getAverageWaitingTime(), waitingTimeGlobalMetric.getValue());
+ }
+
+ @Test
+ public void testJobQueueDequeue() {
+ statisticsManager.jobQueued(TEST_QUEUE_NAME, TEST_TOPIC);
+ Gauge topicMetric = getTopicMetric(QUEUED_METRIC_SUFFIX);
+ Gauge globalMetric = getGlobalMetric(QUEUED_METRIC_SUFFIX);
+ Statistics queueStatistics = statisticsManager.getQueueStatistics(TEST_QUEUE_NAME);
+ assertEquals(1L, queueStatistics.getNumberOfQueuedJobs());
+ assertEquals((Long) 1L, topicMetric.getValue());
+ assertEquals((Long) 1L, globalMetric.getValue());
+
+ statisticsManager.jobDequeued(TEST_QUEUE_NAME, TEST_TOPIC);
+ assertEquals(0L, queueStatistics.getNumberOfQueuedJobs());
+ assertEquals((Long) 0L, topicMetric.getValue());
+ assertEquals((Long) 0L, globalMetric.getValue());
+ }
+
+ @Test
+ public void testJobCancelled() {
+ statisticsManager.jobEnded(TEST_QUEUE_NAME, TEST_TOPIC, InternalJobState.CANCELLED, 0L);
+ Gauge topicMetric = getTopicMetric(CANCELLED_METRIC_SUFFIX);
+ Gauge globalMetric = getGlobalMetric(CANCELLED_METRIC_SUFFIX);
+ Statistics queueStatistics = statisticsManager.getQueueStatistics(TEST_QUEUE_NAME);
+ assertEquals(1L, queueStatistics.getNumberOfCancelledJobs());
+ assertEquals((Long) 1L, topicMetric.getValue());
+ assertEquals((Long) 1L, globalMetric.getValue());
+ }
+
+ @Test
+ public void testJobFailed() {
+ statisticsManager.jobEnded(TEST_QUEUE_NAME, TEST_TOPIC, InternalJobState.FAILED, 0L);
+ Gauge topicMetric = getTopicMetric(FAILED__METRIC_SUFFIX);
+ Gauge globalMetric = getGlobalMetric(FAILED__METRIC_SUFFIX);
+ Statistics queueStatistics = statisticsManager.getQueueStatistics(TEST_QUEUE_NAME);
+ assertEquals(1L, queueStatistics.getNumberOfFailedJobs());
+ assertEquals((Long) 1L, topicMetric.getValue());
+ assertEquals((Long) 1L, globalMetric.getValue());
+ }
+
+ @Test
+ public void testJobFinished() {
+ long processingTime = 10L;
+ statisticsManager.jobEnded(TEST_QUEUE_NAME, TEST_TOPIC, InternalJobState.SUCCEEDED, processingTime);
+ Gauge finishedTopicMetric = getTopicMetric(FINISHED_METRIC_SUFFIX);
+ Gauge processedTopicMetric = getTopicMetric(PROCESSED_METRIC_SUFFIX);
+ Gauge processingTopicTimeMetric = getTopicMetric(AVG_PROCESSING_TIME_METRIC_SUFFIX);
+ Gauge finishedGlobalMetric = getGlobalMetric(FINISHED_METRIC_SUFFIX);
+ Gauge processedGlobalMetric = getGlobalMetric(PROCESSED_METRIC_SUFFIX);
+ Gauge processingGlobalTimeMetric = getGlobalMetric(AVG_PROCESSING_TIME_METRIC_SUFFIX);
+
+ Statistics queueStatistics = statisticsManager.getQueueStatistics(TEST_QUEUE_NAME);
+
+ assertEquals(1L, queueStatistics.getNumberOfFinishedJobs());
+ assertEquals((Long) 1L, finishedTopicMetric.getValue());
+ assertEquals((Long) 1L, processedTopicMetric.getValue());
+ assertEquals((Long) queueStatistics.getAverageProcessingTime(), processingTopicTimeMetric.getValue());
+ assertEquals((Long) 1L, finishedGlobalMetric.getValue());
+ assertEquals((Long) 1L, processedGlobalMetric.getValue());
+ assertEquals((Long) queueStatistics.getAverageProcessingTime(), processingGlobalTimeMetric.getValue());
+ }
+
+ @Test
+ public void testQueueWithSpecialCharsIsSanitized() {
+ String queueName = "Topic*With%Special/Chars";
+ String queueName2 = "topic$with?special§chars";
+ String queueName3 = "topic with<special>chars";
+ statisticsManager.jobQueued(queueName, TEST_TOPIC);
+ statisticsManager.jobQueued(queueName2, TEST_TOPIC);
+ statisticsManager.jobQueued(queueName3, TEST_TOPIC);
+
+ Gauge topicMetric = (Gauge) metricRegistry.getMetrics().get(GAUGE_NAME_PREFIX +
+ "." + QUEUE_PREFIX + ".topic_with_special_chars" + QUEUED_METRIC_SUFFIX);
+ Gauge topicMetric2 = (Gauge) metricRegistry.getMetrics().get(GAUGE_NAME_PREFIX +
+ "." + QUEUE_PREFIX + ".topic_with_special_chars_1" + QUEUED_METRIC_SUFFIX);
+ Gauge topicMetric3 = (Gauge) metricRegistry.getMetrics().get(GAUGE_NAME_PREFIX +
+ "." + QUEUE_PREFIX + ".topic_with_special_chars_2" + QUEUED_METRIC_SUFFIX);
+ assertEquals(1L, topicMetric.getValue());
+ assertEquals(1L, topicMetric2.getValue());
+ assertEquals(1L, topicMetric3.getValue());
+ }
+
+ private Gauge getTopicMetric(String metricSuffix) {
+ return (Gauge) metricRegistry.getMetrics().get(GAUGE_NAME_PREFIX +
+ "." + QUEUE_PREFIX + "." + TEST_QUEUE_NAME + metricSuffix);
+ }
+
+ private Gauge getGlobalMetric(String metricSuffix) {
+ return (Gauge) metricRegistry.getMetrics().get(GAUGE_NAME_PREFIX + metricSuffix);
+ }
+
+
+}
\ No newline at end of file
diff --git a/src/test/java/org/apache/sling/event/it/AbstractJobHandlingTest.java b/src/test/java/org/apache/sling/event/it/AbstractJobHandlingTest.java
index c51ecd9..464ef74 100644
--- a/src/test/java/org/apache/sling/event/it/AbstractJobHandlingTest.java
+++ b/src/test/java/org/apache/sling/event/it/AbstractJobHandlingTest.java
@@ -186,6 +186,8 @@
mavenBundle("org.apache.commons", "commons-lang3", "3.5"),
mavenBundle("commons-pool", "commons-pool", "1.6"),
+ mavenBundle("io.dropwizard.metrics", "metrics-core", "3.2.4"),
+
mavenBundle("org.apache.servicemix.bundles", "org.apache.servicemix.bundles.concurrent", "1.3.4_1"),
mavenBundle("org.apache.geronimo.bundles", "commons-httpclient", "3.1_1"),
@@ -209,6 +211,7 @@
mavenBundle("org.apache.sling", "org.apache.sling.commons.johnzon", "1.0.0"),
mavenBundle("org.apache.sling", "org.apache.sling.commons.scheduler", "2.4.14"),
mavenBundle("org.apache.sling", "org.apache.sling.commons.threads", "3.2.4"),
+ mavenBundle("org.apache.sling", "org.apache.sling.commons.metrics", "1.2.6"),
mavenBundle("org.apache.sling", "org.apache.sling.auth.core", "1.3.12"),
mavenBundle("org.apache.sling", "org.apache.sling.discovery.api", "1.0.2"),