Merge pull request #8 from stefan-egli/SLING-9906
SLING-9906 : Avoid retrying to load jobs if classes are missing
diff --git a/pom.xml b/pom.xml
index 0668f5f..f18ea94 100644
--- a/pom.xml
+++ b/pom.xml
@@ -374,5 +374,22 @@
<artifactId>org.apache.sling.event.api</artifactId>
<version>1.0.0</version>
</dependency>
+ <dependency>
+ <groupId>org.apache.sling</groupId>
+ <artifactId>org.apache.sling.commons.testing</artifactId>
+ <version>2.0.10</version>
+ </dependency>
+ <dependency>
+ <groupId>org.mockito</groupId>
+ <artifactId>mockito-all</artifactId>
+ <version>1.9.5</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.sling</groupId>
+ <artifactId>org.apache.sling.testing.sling-mock-oak</artifactId>
+ <version>2.0.0</version>
+ <scope>test</scope>
+ </dependency>
</dependencies>
</project>
diff --git a/src/main/java/org/apache/sling/event/impl/jobs/JobImpl.java b/src/main/java/org/apache/sling/event/impl/jobs/JobImpl.java
index c95aaef..567f871 100644
--- a/src/main/java/org/apache/sling/event/impl/jobs/JobImpl.java
+++ b/src/main/java/org/apache/sling/event/impl/jobs/JobImpl.java
@@ -37,6 +37,13 @@
*/
public class JobImpl implements Job, Comparable<JobImpl> {
+ public enum ReadErrorType {
+ NONE,
+ RUNTIMEEXCEPTION,
+ CLASSNOTFOUNDEXCEPTION,
+ OTHER_EXCEPTION
+ }
+
/** Internal job property containing the resource path. */
public static final String PROPERTY_RESOURCE_PATH = "slingevent:path";
@@ -104,20 +111,26 @@
return this.readErrorList != null;
}
+ public ReadErrorType getReadErrorType() {
+ if ( this.readErrorList == null || this.readErrorList.isEmpty() ) {
+ return ReadErrorType.NONE;
+ } else {
+ for(final Exception e : this.readErrorList) {
+ if ( e instanceof RuntimeException ) {
+ return ReadErrorType.RUNTIMEEXCEPTION;
+ } else if ( e.getCause() != null && e.getCause() instanceof ClassNotFoundException ) {
+ return ReadErrorType.CLASSNOTFOUNDEXCEPTION;
+ }
+ }
+ return ReadErrorType.OTHER_EXCEPTION;
+ }
+ }
+
/**
* Is the error recoverable?
*/
public boolean isReadErrorRecoverable() {
- boolean result = true;
- if ( this.readErrorList != null ) {
- for(final Exception e : this.readErrorList) {
- if ( e instanceof RuntimeException ) {
- result = false;
- break;
- }
- }
- }
- return result;
+ return getReadErrorType() != ReadErrorType.RUNTIMEEXCEPTION;
}
/**
diff --git a/src/main/java/org/apache/sling/event/impl/jobs/Utility.java b/src/main/java/org/apache/sling/event/impl/jobs/Utility.java
index 0449acb..1add893 100644
--- a/src/main/java/org/apache/sling/event/impl/jobs/Utility.java
+++ b/src/main/java/org/apache/sling/event/impl/jobs/Utility.java
@@ -28,6 +28,7 @@
import java.util.Iterator;
import java.util.List;
import java.util.Map;
+import java.util.concurrent.atomic.AtomicInteger;
import org.apache.sling.api.resource.PersistenceException;
import org.apache.sling.api.resource.Resource;
@@ -40,6 +41,12 @@
public abstract class Utility {
+ /** SLING-9906 : silence read errors after this many logs in this VM */
+ private static final int READ_ERROR_SILENCE_LIMIT = 1000;
+
+ /** SLING-9906 : count read errors to be able to silence */
+ private static final AtomicInteger readErrorWarnCount = new AtomicInteger();
+
/**
* Check if the job topic is a valid OSGI event name (see 113.3.1 of the OSGI spec)
* @return <code>null</code> if the topic is syntactically correct otherwise an error description is returned
@@ -194,7 +201,26 @@
final List<Exception> readErrorList = (List<Exception>) jobProperties.get(ResourceHelper.PROPERTY_MARKER_READ_ERROR_LIST);
if ( readErrorList != null ) {
for(final Exception e : readErrorList) {
- logger.warn("Unable to read job from " + resource.getPath(), e);
+ final int c = readErrorWarnCount.getAndIncrement();
+ final String prefix;
+ if ( c == READ_ERROR_SILENCE_LIMIT ) {
+ logger.warn("Too many 'Unable to read job from ' messages - silencing 99% of them from now on.");
+ continue;
+ } else if ( c > READ_ERROR_SILENCE_LIMIT && c % 100 != 0 ) {
+ // SLING-9906 : then silence the log altogether
+ continue;
+ } else if ( c > READ_ERROR_SILENCE_LIMIT ) {
+ prefix = "[unsilenced] ";
+ } else {
+ prefix = "";
+ }
+ if ( e.getCause() != null && e.getCause() instanceof ClassNotFoundException ) {
+ // SLING-9906 : suppress exception in ClassNotFoundException case
+ // as this can happen many times in case of a not deployed class.
+ logger.warn(prefix + "Unable to read job from " + resource.getPath() + ", exception: " + e + ", cause: " + e.getCause());
+ } else {
+ logger.warn(prefix + "Unable to read job from " + resource.getPath(), e);
+ }
}
}
job = new JobImpl(topic,
diff --git a/src/main/java/org/apache/sling/event/impl/jobs/config/JobManagerConfiguration.java b/src/main/java/org/apache/sling/event/impl/jobs/config/JobManagerConfiguration.java
index 8322e1a..71db186 100644
--- a/src/main/java/org/apache/sling/event/impl/jobs/config/JobManagerConfiguration.java
+++ b/src/main/java/org/apache/sling/event/impl/jobs/config/JobManagerConfiguration.java
@@ -117,6 +117,16 @@
/** Configuration property for the scheduled jobs path. */
public static final String PROPERTY_SCHEDULED_JOBS_PATH = "job.scheduled.jobs.path";
+ static JobManagerConfiguration newForTest(ResourceResolverFactory resourceResolverFactory,
+ QueueConfigurationManager queueConfigurationManager,
+ Map<String, Object> activateProps, Config config) {
+ final JobManagerConfiguration jobMgrConfig = new JobManagerConfiguration();
+ jobMgrConfig.resourceResolverFactory = resourceResolverFactory;
+ jobMgrConfig.queueConfigManager = queueConfigurationManager;
+ jobMgrConfig.activate(activateProps, config);
+ return jobMgrConfig;
+ }
+
/** The jobs base path with a slash. */
private String jobsBasePathWithSlash;
diff --git a/src/main/java/org/apache/sling/event/impl/jobs/queues/JobQueueImpl.java b/src/main/java/org/apache/sling/event/impl/jobs/queues/JobQueueImpl.java
index 5cabbbf..be6f485 100644
--- a/src/main/java/org/apache/sling/event/impl/jobs/queues/JobQueueImpl.java
+++ b/src/main/java/org/apache/sling/event/impl/jobs/queues/JobQueueImpl.java
@@ -123,14 +123,29 @@
* @param config The queue configuration
* @param services The queue services
* @param topics The topics handled by this queue
+ * @param haltedTopics reference to pass newly halted topics back
*
* @return {@code JobQueueImpl} if there are jobs to process, {@code null} otherwise.
*/
public static JobQueueImpl createQueue(final String name,
final InternalQueueConfiguration config,
final QueueServices services,
- final Set<String> topics) {
+ final Set<String> topics,
+ final Set<String> haltedTopicsBackRef) {
final QueueJobCache cache = new QueueJobCache(services.configuration, name, services.statisticsManager, config.getType(), topics);
+ // the cache might contain newly halted topics.
+ // these we need to retain, to avoid scanning them going forward.
+ // but since the cache might be empty and thus discarded,
+ // we pass them back explicitly in provided haltedTopicsRef
+ if ( !cache.getNewlyHaltedTopics().isEmpty() ) {
+ for (String haltedTopic : cache.getNewlyHaltedTopics() ) {
+ if (haltedTopicsBackRef.add(haltedTopic)) {
+ LoggerFactory.getLogger(JobQueueImpl.class.getName() + '.' + name)
+ .warn("createQueue : topic halted due to ClassNotFoundExceptions : "
+ + haltedTopic);
+ }
+ }
+ }
if ( cache.isEmpty() ) {
return null;
}
@@ -712,5 +727,9 @@
this.requeue(handler);
}
}
+
+ QueueJobCache getCache() {
+ return cache;
+ }
}
diff --git a/src/main/java/org/apache/sling/event/impl/jobs/queues/QueueJobCache.java b/src/main/java/org/apache/sling/event/impl/jobs/queues/QueueJobCache.java
index ebcc92d..f4190e6 100644
--- a/src/main/java/org/apache/sling/event/impl/jobs/queues/QueueJobCache.java
+++ b/src/main/java/org/apache/sling/event/impl/jobs/queues/QueueJobCache.java
@@ -66,6 +66,9 @@
/** The set of new topics to scan. */
private final Set<String> topicsWithNewJobs = new HashSet<String>();
+ /** The set of new topics to pause. */
+ private final Set<String> newlyHaltedTopics = new HashSet<String>();
+
/** The cache of current objects. */
private final List<JobImpl> cache = new ArrayList<JobImpl>();
@@ -119,6 +122,12 @@
return result;
}
+ Set<String> getNewlyHaltedTopics() {
+ synchronized ( this.topicsWithNewJobs ) {
+ return new HashSet<>(this.newlyHaltedTopics);
+ }
+ }
+
public void setIsBlocked(final boolean value) {
this.queueIsBlocked.set(value);
}
@@ -166,6 +175,7 @@
if ( doFull ) {
checkingTopics.addAll(this.topics);
}
+ checkingTopics.removeAll(newlyHaltedTopics);
if ( !checkingTopics.isEmpty() ) {
this.loadJobs(queue.getName(), checkingTopics, statisticsManager);
}
@@ -273,6 +283,7 @@
final List<JobImpl> list = new ArrayList<JobImpl>();
final AtomicBoolean scanTopic = new AtomicBoolean(false);
+ final AtomicBoolean haltTopic = new AtomicBoolean(false);
JobTopicTraverser.traverse(logger, topicResource, new JobTopicTraverser.JobCallback() {
@@ -288,19 +299,32 @@
logger.debug("Ignoring job {} - processing already started.", job);
} else {
// error reading job
- scanTopic.set(true);
- if ( job.isReadErrorRecoverable() ) {
- logger.debug("Ignoring job {} due to recoverable read errors.", job);
- } else {
+ switch( job.getReadErrorType() ) {
+ case CLASSNOTFOUNDEXCEPTION : {
+ haltTopic.set(true);
+ break;
+ }
+ case RUNTIMEEXCEPTION : {
+ scanTopic.set(true);
logger.debug("Failing job {} due to unrecoverable read errors.", job);
final JobHandler handler = new JobHandler(job, null, configuration);
handler.finished(JobState.ERROR, true, null);
+ break;
+ }
+ default: {
+ scanTopic.set(true);
+ logger.debug("Ignoring job {} due to recoverable read errors.", job);
+ }
}
}
return list.size() < maxPreloadLimit;
}
});
- if ( scanTopic.get() ) {
+ if ( haltTopic.get() ) {
+ synchronized ( this.topicsWithNewJobs ) {
+ this.newlyHaltedTopics.add(topic);
+ }
+ } else if ( scanTopic.get() ) {
synchronized ( this.topicsWithNewJobs ) {
this.topicsWithNewJobs.add(topic);
}
@@ -317,7 +341,12 @@
public void handleNewTopics(final Set<String> topics) {
logger.debug("Update cache to handle new event for topics {}", topics);
synchronized ( this.topicsWithNewJobs ) {
- this.topicsWithNewJobs.addAll(topics);
+ final Set<String> nonHaltedTopics = new HashSet<>(topics);
+ if (!Collections.disjoint(topics, newlyHaltedTopics)) {
+ logger.warn("handleNewTopics : sets not disjoint as expected. topics: " + topics + ", newlyHaltedTopics: " + newlyHaltedTopics);
+ nonHaltedTopics.removeAll(newlyHaltedTopics);
+ }
+ this.topicsWithNewJobs.addAll(nonHaltedTopics);
}
this.topics.addAll(topics);
}
diff --git a/src/main/java/org/apache/sling/event/impl/jobs/queues/QueueManager.java b/src/main/java/org/apache/sling/event/impl/jobs/queues/QueueManager.java
index 40edb37..4336ad5 100644
--- a/src/main/java/org/apache/sling/event/impl/jobs/queues/QueueManager.java
+++ b/src/main/java/org/apache/sling/event/impl/jobs/queues/QueueManager.java
@@ -27,6 +27,7 @@
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentSkipListSet;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.sling.api.resource.Resource;
@@ -79,6 +80,20 @@
public class QueueManager
implements Runnable, EventHandler, ConfigurationChangeListener {
+ static QueueManager newForTest(EventAdmin eventAdmin, JobConsumerManager jobConsumerManager,
+ QueuesMBean queuesMBean, ThreadPoolManager threadPoolManager, ThreadPool threadPool,
+ JobManagerConfiguration configuration, StatisticsManager statisticsManager) {
+ final QueueManager qm = new QueueManager();
+ qm.eventAdmin = eventAdmin;
+ qm.jobConsumerManager = jobConsumerManager;
+ qm.queuesMBean = queuesMBean;
+ qm.threadPoolManager = threadPoolManager;
+ qm.threadPool = threadPool;
+ qm.configuration = configuration;
+ qm.statisticsManager = statisticsManager;
+ return qm;
+ }
+
/** Default logger. */
private final Logger logger = LoggerFactory.getLogger(this.getClass());
@@ -122,6 +137,9 @@
/** The queue services. */
private volatile QueueServices queueServices;
+ /** The set of new topics to pause. */
+ private final Set<String> haltedTopics = new ConcurrentSkipListSet<String>();
+
/**
* Activate this component.
* @param props Configuration properties
@@ -166,7 +184,7 @@
* is idle for two consecutive clean up calls, it is removed.
* @see java.lang.Runnable#run()
*/
- private void maintain() {
+ void maintain() {
this.schedulerRuns++;
logger.debug("Queue manager maintenance: Starting #{}", this.schedulerRuns);
@@ -219,6 +237,9 @@
private void start(final QueueInfo queueInfo,
final Set<String> topics) {
final InternalQueueConfiguration config = queueInfo.queueConfiguration;
+ final Set<String> filteredTopics = new HashSet<String>(topics);
+ filteredTopics.removeAll(haltedTopics);
+
// get or create queue
boolean isNewQueue = false;
JobQueueImpl queue = null;
@@ -232,7 +253,7 @@
queue = null;
}
if ( queue == null ) {
- queue = JobQueueImpl.createQueue(queueInfo.queueName, config, queueServices, topics);
+ queue = JobQueueImpl.createQueue(queueInfo.queueName, config, queueServices, filteredTopics, haltedTopics);
// on startup the queue might be empty and we get null back from createQueue
if ( queue != null ) {
isNewQueue = true;
@@ -244,7 +265,7 @@
if ( queue != null ) {
logger.debug("Starting queue {}", queueInfo.queueName);
if ( !isNewQueue ) {
- queue.wakeUpQueue(topics);
+ queue.wakeUpQueue(filteredTopics);
}
queue.startJobs();
}
@@ -356,6 +377,7 @@
if ( this.configuration != null ) {
logger.debug("Topology changed {}", active);
this.isActive.set(active);
+ clearHaltedTopics("configurationChanged : unhalted topics due to configuration change");
if ( active ) {
fullTopicScan();
} else {
@@ -364,7 +386,21 @@
}
}
- private void fullTopicScan() {
+ private void clearHaltedTopics(String logPrefix) {
+ final String haltedTopicsToString;
+ // Note: the synchronized below is just to avoid wrong logging about unhalting,
+ // the haltedTopics access itself isn't prevented by this (and it is a concurrent set)
+ synchronized( haltedTopics ) {
+ if ( haltedTopics.isEmpty() ) {
+ return;
+ }
+ haltedTopicsToString = haltedTopics.toString();
+ haltedTopics.clear();
+ }
+ logger.info(logPrefix + " : " + haltedTopicsToString);
+ }
+
+ void fullTopicScan() {
logger.debug("Scanning repository for existing topics...");
final Set<String> topics = this.scanTopics();
final Map<QueueInfo, Set<String>> mapping = this.updateTopicMapping(topics);
@@ -405,6 +441,10 @@
*/
@Override
public void handleEvent(final Event event) {
+ if ( ResourceHelper.BUNDLE_EVENT_STARTED.equals(event.getTopic())
+ || ResourceHelper.BUNDLE_EVENT_UPDATED.equals(event.getTopic()) ) {
+ clearHaltedTopics("handleEvent: unhalted topics due to bundle started/updated event");
+ }
final String topic = (String)event.getProperty(NotificationConstants.NOTIFICATION_PROPERTY_JOB_TOPIC);
if ( this.isActive.get() && topic != null ) {
logger.debug("Received event {}", topic);
diff --git a/src/test/java/org/apache/sling/event/impl/jobs/config/JobManagerConfigurationTestFactory.java b/src/test/java/org/apache/sling/event/impl/jobs/config/JobManagerConfigurationTestFactory.java
new file mode 100644
index 0000000..3ece2a3
--- /dev/null
+++ b/src/test/java/org/apache/sling/event/impl/jobs/config/JobManagerConfigurationTestFactory.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sling.event.impl.jobs.config;
+
+import java.lang.annotation.Annotation;
+import java.util.HashMap;
+
+import org.apache.sling.api.resource.ResourceResolverFactory;
+import org.apache.sling.event.impl.jobs.config.JobManagerConfiguration.Config;
+
+public class JobManagerConfigurationTestFactory {
+
+ public static JobManagerConfiguration create(String jobsRoot,
+ ResourceResolverFactory resourceResolverFactory,
+ QueueConfigurationManager queueConfigurationManager) throws NoSuchFieldException {
+ final JobManagerConfiguration real = JobManagerConfiguration.newForTest(
+ resourceResolverFactory, queueConfigurationManager,
+ new HashMap<String, Object>(), new Config() {
+
+ @Override
+ public Class<? extends Annotation> annotationType() {
+ return null;
+ }
+
+ @Override
+ public boolean job_consumermanager_disableDistribution() {
+ return false;
+ }
+
+ @Override
+ public long startup_delay() {
+ return 0;
+ }
+
+ @Override
+ public int cleanup_period() {
+ return 0;
+ }
+
+ });
+ return real;
+ }
+
+}
diff --git a/src/test/java/org/apache/sling/event/impl/jobs/queues/TestTopicHalting.java b/src/test/java/org/apache/sling/event/impl/jobs/queues/TestTopicHalting.java
new file mode 100644
index 0000000..a75c92d
--- /dev/null
+++ b/src/test/java/org/apache/sling/event/impl/jobs/queues/TestTopicHalting.java
@@ -0,0 +1,311 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sling.event.impl.jobs.queues;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+
+import java.io.Externalizable;
+import java.io.IOException;
+import java.io.ObjectInput;
+import java.io.ObjectOutput;
+import java.util.Calendar;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import javax.jcr.ItemExistsException;
+import javax.jcr.PathNotFoundException;
+import javax.jcr.RepositoryException;
+import javax.jcr.lock.LockException;
+import javax.jcr.nodetype.ConstraintViolationException;
+import javax.jcr.version.VersionException;
+
+import org.apache.sling.api.resource.Resource;
+import org.apache.sling.api.resource.ResourceResolver;
+import org.apache.sling.api.resource.ResourceResolverFactory;
+import org.apache.sling.commons.scheduler.Scheduler;
+import org.apache.sling.commons.threads.ThreadPool;
+import org.apache.sling.commons.threads.ThreadPoolManager;
+import org.apache.sling.event.impl.jobs.JobConsumerManager;
+import org.apache.sling.event.impl.jobs.config.InternalQueueConfiguration;
+import org.apache.sling.event.impl.jobs.config.InternalQueueConfiguration.Config;
+import org.apache.sling.event.impl.jobs.config.JobManagerConfiguration;
+import org.apache.sling.event.impl.jobs.config.JobManagerConfigurationTestFactory;
+import org.apache.sling.event.impl.jobs.config.QueueConfigurationManager;
+import org.apache.sling.event.impl.jobs.config.QueueConfigurationManager.QueueInfo;
+import org.apache.sling.event.impl.jobs.jmx.QueuesMBeanImpl;
+import org.apache.sling.event.impl.jobs.scheduling.JobSchedulerImpl;
+import org.apache.sling.event.impl.jobs.stats.StatisticsManager;
+import org.apache.sling.event.impl.support.Environment;
+import org.apache.sling.event.impl.support.ResourceHelper;
+import org.apache.sling.event.jobs.Job;
+import org.apache.sling.event.jobs.Queue;
+import org.apache.sling.event.jobs.QueueConfiguration.ThreadPriority;
+import org.apache.sling.event.jobs.QueueConfiguration.Type;
+import org.apache.sling.testing.mock.osgi.MockOsgi;
+import org.apache.sling.testing.mock.sling.MockSling;
+import org.apache.sling.testing.mock.sling.ResourceResolverType;
+import org.apache.sling.testing.mock.sling.builder.ContentBuilder;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Mock;
+import org.mockito.Mockito;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.runners.MockitoJUnitRunner;
+import org.mockito.stubbing.Answer;
+import org.osgi.framework.BundleContext;
+import org.osgi.service.component.ComponentContext;
+import org.osgi.service.event.EventAdmin;
+
+/**
+ * SLING-9906.
+ * This tests a new behaviour of a topic introduced with SLING-9906.
+ * When there are ClassNotFoundException read errors when loading a job,
+ * this is now considered a special, temporarily blocking case which
+ * is likely due to a missing bundle. To prevent this exception from
+ * reoccurring over and over again, the topic is now "halted", ie job
+ * execution is stopped for this topic. It is "unhalted" once some
+ * configuration changes (such as a JobConsumer/Executor being added).
+ */
+@RunWith(MockitoJUnitRunner.class)
+public class TestTopicHalting {
+
+ private static AtomicInteger cnfeCount = new AtomicInteger();
+
+ @Mock
+ private JobSchedulerImpl jobScheduler;
+
+ @Mock
+ private EventAdmin eventAdmin;
+
+ @Mock
+ private JobConsumerManager jobConsumerManager;
+
+ @Mock
+ private ThreadPoolManager threadPoolManager;
+
+ @Mock
+ private ThreadPool threadPool;
+
+ @Mock
+ private StatisticsManager statisticsManager;
+
+ @Mock
+ private QueueConfigurationManager queueConfigurationManager;
+
+ @Mock
+ private Scheduler scheduler;
+
+ private JobManagerConfiguration configuration;
+
+ private QueuesMBeanImpl queuesMBean;
+
+ private String ownSlingId;
+
+ private int jobCnt;
+
+ private ResourceResolverFactory factory;
+
+ private ComponentContext componentContext;
+
+ private BundleContext bundleContext;
+
+ /** object under test */
+ private QueueManager queueManager;
+
+ @Before
+ public void setUp() throws Throwable {
+ cnfeCount.set(0);
+ ownSlingId = UUID.randomUUID().toString();
+ Environment.APPLICATION_ID = ownSlingId;
+ componentContext = MockOsgi.newComponentContext();
+ bundleContext = componentContext.getBundleContext();
+
+ factory = MockSling.newResourceResolverFactory(ResourceResolverType.JCR_OAK, bundleContext);
+
+ queuesMBean = new QueuesMBeanImpl();
+ queuesMBean.activate(bundleContext);
+
+ configuration = JobManagerConfigurationTestFactory.create(JobManagerConfiguration.DEFAULT_REPOSITORY_PATH,
+ factory, queueConfigurationManager);
+
+ queueManager = QueueManager.newForTest(eventAdmin, jobConsumerManager,
+ queuesMBean, threadPoolManager, threadPool, configuration, statisticsManager);
+
+ initQueueConfigurationManagerMocks();
+
+ queueManager.activate(null);
+
+ @SuppressWarnings("deprecation")
+ ResourceResolver resourceResolver = factory.getAdministrativeResourceResolver(null);
+ ContentBuilder contentBuilder = new ContentBuilder(resourceResolver);
+
+ final String topic = "aTopic";
+ for (int year = 2019; year <= 2022; year++) {
+ for (int month = 1; month <= 12; month++) {
+ createJob(contentBuilder, ownSlingId, topic, year, month);
+ }
+ }
+ resourceResolver.commit();
+ }
+
+ private void initQueueConfigurationManagerMocks() {
+ Mockito.when(queueConfigurationManager.getQueueInfo(Mockito.anyString())).thenAnswer(new Answer<QueueInfo>() {
+
+ private final Map<String, QueueInfo> queueInfos = new HashMap<>();
+
+ @Override
+ public QueueInfo answer(InvocationOnMock invocation) throws Throwable {
+ final String topic = (String) invocation.getArguments()[0];
+ QueueInfo queueInfo = queueInfos.get(topic);
+ if ( queueInfo == null ) {
+ queueInfo = createQueueInfo(topic);
+ queueInfos.put(topic, queueInfo);
+ }
+ return queueInfo;
+ }
+
+ private QueueInfo createQueueInfo(final String topic) {
+ final QueueInfo result = new QueueInfo();
+ result.queueName = "Queue for topic=" + topic;
+ Map<String, Object> props = new HashMap<>();
+ Config cconfig = Mockito.mock(Config.class);
+ Mockito.when(cconfig.queue_priority()).thenReturn(ThreadPriority.NORM.name());
+ Mockito.when(cconfig.queue_type()).thenReturn(Type.ORDERED.name());
+ Mockito.when(cconfig.queue_maxparallel()).thenReturn(1.0);
+ result.queueConfiguration = InternalQueueConfiguration.fromConfiguration(props, cconfig);
+ result.targetId = ownSlingId;
+ return result;
+ }
+
+ });
+ }
+
+ @Test
+ public void testUnhalting() throws Throwable {
+ assertNotNull(queueManager);
+ assertEquals(0, cnfeCount.get());
+ queueManager.configurationChanged(true);
+ int expectedCnfeCount = 2 * jobCnt;
+ assertEquals(expectedCnfeCount, cnfeCount.get());
+ queueManager.maintain();
+ assertEquals(expectedCnfeCount, cnfeCount.get());
+ queueManager.maintain();
+ assertEquals(expectedCnfeCount, cnfeCount.get());
+ queueManager.maintain();
+ assertEquals(expectedCnfeCount, cnfeCount.get());
+ queueManager.configurationChanged(true);
+ expectedCnfeCount += 2 * jobCnt;
+ assertEquals(expectedCnfeCount, cnfeCount.get());
+ queueManager.maintain();
+ assertEquals(expectedCnfeCount, cnfeCount.get());
+ queueManager.maintain();
+ assertEquals(expectedCnfeCount, cnfeCount.get());
+ queueManager.maintain();
+ assertEquals(expectedCnfeCount, cnfeCount.get());
+ }
+
+ @Test
+ public void testFullTopicScan() throws Throwable {
+ assertNotNull(queueManager);
+ assertEquals(0, cnfeCount.get());
+ for( int i = 0; i < 50; i++ ) {
+ queueManager.fullTopicScan();
+ assertEquals(2 * jobCnt, cnfeCount.get());
+ }
+ }
+
+ @Test
+ public void testMaintain() throws Throwable {
+ assertNotNull(queueManager);
+ assertEquals(0, cnfeCount.get());
+ // configurationChanged(true) is going to do a fullTopicScan()
+ queueManager.configurationChanged(true);
+ // due to fullTopicScan -> 2 times the jobs are loaded causing a CNFE
+ int expectedCnfeCount = 2 * jobCnt;
+ assertEquals(expectedCnfeCount, cnfeCount.get());
+ for( int i = 0; i < 50; i++ ) {
+ // schedulerRuns % 3 == 1
+ queueManager.maintain();
+ assertEquals(expectedCnfeCount, cnfeCount.get());
+ // schedulerRuns % 3 == 2
+ queueManager.maintain();
+ assertEquals(expectedCnfeCount, cnfeCount.get());
+ // schedulerRuns % 3 == 0
+ queueManager.maintain();
+ // if we weren't halting the topic due to CNFE, we'd now be doing:
+ // expectedCnfeCount += 2 * jobCnt;
+ assertEquals(expectedCnfeCount, cnfeCount.get());
+ }
+ }
+
+ @Test
+ public void testConfigurationChanged() throws Throwable {
+ assertNotNull(queueManager);
+ assertEquals(0, cnfeCount.get());
+ queueManager.configurationChanged(false);
+ assertEquals(0, cnfeCount.get());
+ queueManager.configurationChanged(true);
+ assertEquals(2 * jobCnt, cnfeCount.get());
+ Iterable<Queue> qit = queueManager.getQueues();
+ assertNotNull(qit);
+ Iterator<Queue> it = qit.iterator();
+ assertNotNull(it);
+ assertFalse(it.hasNext());
+ }
+
+ private Resource createJob(ContentBuilder contentBuilder, String localSlingId, String topic, int year, int month) throws ItemExistsException, PathNotFoundException, VersionException, ConstraintViolationException, LockException, RepositoryException {
+ // /var/eventing/jobs/assigned/<slingId>/<topic>/2020/10/13/19/26
+ String applicationId = localSlingId;
+ String counter = String.valueOf(jobCnt++);
+ String jobId = year + "/" + month + "/1/20/0/" + applicationId + "_" + counter;
+ String path = JobManagerConfiguration.DEFAULT_REPOSITORY_PATH + "/assigned/" + localSlingId + "/" + topic + "/" + jobId;
+
+ final UnDeserializableDataObject uddao = new UnDeserializableDataObject();
+ return contentBuilder.resource(path,
+ ResourceHelper.PROPERTY_JOB_TOPIC, topic,
+ ResourceHelper.PROPERTY_JOB_ID, jobId,
+ Job.PROPERTY_JOB_CREATED, Calendar.getInstance(),
+ "uddao", uddao);
+ }
+
+ private static final class UnDeserializableDataObject implements Externalizable {
+ private static final long serialVersionUID = 1L;
+
+ public UnDeserializableDataObject() {
+ // we'll allow this one
+ }
+
+ @Override
+ public void writeExternal(ObjectOutput out) throws IOException {
+ out.write(42);
+ }
+
+ @Override
+ public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
+ cnfeCount.incrementAndGet();
+ throw new ClassNotFoundException("UnDeserializableDataObject");
+ }
+ }
+}
diff --git a/src/test/java/org/apache/sling/event/it/AbstractJobHandlingTest.java b/src/test/java/org/apache/sling/event/it/AbstractJobHandlingTest.java
index 464ef74..87e296c 100644
--- a/src/test/java/org/apache/sling/event/it/AbstractJobHandlingTest.java
+++ b/src/test/java/org/apache/sling/event/it/AbstractJobHandlingTest.java
@@ -23,6 +23,7 @@
import static org.ops4j.pax.exam.CoreOptions.junitBundles;
import static org.ops4j.pax.exam.CoreOptions.mavenBundle;
import static org.ops4j.pax.exam.CoreOptions.options;
+import static org.ops4j.pax.exam.CoreOptions.repository;
import static org.ops4j.pax.exam.CoreOptions.systemProperty;
import static org.ops4j.pax.exam.CoreOptions.when;
@@ -104,6 +105,7 @@
final String slingHome = new File(buildDir + File.separatorChar + "sling_" + System.currentTimeMillis()).getAbsolutePath();
return options(
+ repository("https://repo.maven.apache.org/maven2/").id("central"),
frameworkProperty("sling.home").value(slingHome),
frameworkProperty("repository.home").value(slingHome + File.separatorChar + "repository"),
when( localRepo.length() > 0 ).useOptions(