Merge pull request #8 from stefan-egli/SLING-9906

SLING-9906 : Avoid retrying to load jobs if classes are missing
diff --git a/pom.xml b/pom.xml
index 0668f5f..f18ea94 100644
--- a/pom.xml
+++ b/pom.xml
@@ -374,5 +374,22 @@
     		<artifactId>org.apache.sling.event.api</artifactId>
     		<version>1.0.0</version>
     	</dependency>
+        <dependency>
+            <groupId>org.apache.sling</groupId>
+            <artifactId>org.apache.sling.commons.testing</artifactId>
+            <version>2.0.10</version>
+        </dependency>
+        <dependency>
+            <groupId>org.mockito</groupId>
+            <artifactId>mockito-all</artifactId>
+            <version>1.9.5</version>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.sling</groupId>
+            <artifactId>org.apache.sling.testing.sling-mock-oak</artifactId>
+            <version>2.0.0</version>
+            <scope>test</scope>
+        </dependency>
     </dependencies>
 </project>
diff --git a/src/main/java/org/apache/sling/event/impl/jobs/JobImpl.java b/src/main/java/org/apache/sling/event/impl/jobs/JobImpl.java
index c95aaef..567f871 100644
--- a/src/main/java/org/apache/sling/event/impl/jobs/JobImpl.java
+++ b/src/main/java/org/apache/sling/event/impl/jobs/JobImpl.java
@@ -37,6 +37,13 @@
  */
 public class JobImpl implements Job, Comparable<JobImpl> {
 
+    public enum ReadErrorType {
+        NONE,
+        RUNTIMEEXCEPTION,
+        CLASSNOTFOUNDEXCEPTION,
+        OTHER_EXCEPTION
+    }
+
     /** Internal job property containing the resource path. */
     public static final String PROPERTY_RESOURCE_PATH = "slingevent:path";
 
@@ -104,20 +111,26 @@
         return this.readErrorList != null;
     }
 
+    public ReadErrorType getReadErrorType() {
+        if ( this.readErrorList == null || this.readErrorList.isEmpty() ) {
+            return ReadErrorType.NONE;
+        } else {
+            for(final Exception e : this.readErrorList) {
+                if ( e instanceof RuntimeException ) {
+                    return ReadErrorType.RUNTIMEEXCEPTION;
+                } else if ( e.getCause() != null && e.getCause() instanceof ClassNotFoundException ) {
+                    return ReadErrorType.CLASSNOTFOUNDEXCEPTION;
+                }
+            }
+            return ReadErrorType.OTHER_EXCEPTION;
+        }
+    }
+
     /**
      * Is the error recoverable?
      */
     public boolean isReadErrorRecoverable() {
-        boolean result = true;
-        if ( this.readErrorList != null ) {
-            for(final Exception e : this.readErrorList) {
-                if ( e instanceof RuntimeException ) {
-                    result = false;
-                    break;
-                }
-            }
-        }
-        return result;
+        return getReadErrorType() != ReadErrorType.RUNTIMEEXCEPTION;
     }
 
     /**
diff --git a/src/main/java/org/apache/sling/event/impl/jobs/Utility.java b/src/main/java/org/apache/sling/event/impl/jobs/Utility.java
index 0449acb..1add893 100644
--- a/src/main/java/org/apache/sling/event/impl/jobs/Utility.java
+++ b/src/main/java/org/apache/sling/event/impl/jobs/Utility.java
@@ -28,6 +28,7 @@
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.sling.api.resource.PersistenceException;
 import org.apache.sling.api.resource.Resource;
@@ -40,6 +41,12 @@
 
 public abstract class Utility {
 
+    /** SLING-9906 : silence read errors after this many logs in this VM */
+    private static final int READ_ERROR_SILENCE_LIMIT = 1000;
+
+    /** SLING-9906 : count read errors to be able to silence */
+    private static final AtomicInteger readErrorWarnCount = new AtomicInteger();
+
     /**
      * Check if the job topic is a valid OSGI event name (see 113.3.1 of the OSGI spec)
      * @return <code>null</code> if the topic is syntactically correct otherwise an error description is returned
@@ -194,7 +201,26 @@
                     final List<Exception> readErrorList = (List<Exception>) jobProperties.get(ResourceHelper.PROPERTY_MARKER_READ_ERROR_LIST);
                     if ( readErrorList != null ) {
                         for(final Exception e : readErrorList) {
-                            logger.warn("Unable to read job from " + resource.getPath(), e);
+                            final int c = readErrorWarnCount.getAndIncrement();
+                            final String prefix;
+                            if ( c == READ_ERROR_SILENCE_LIMIT ) {
+                                logger.warn("Too many 'Unable to read job from ' messages - silencing 99% of them from now on.");
+                                continue;
+                            } else if ( c > READ_ERROR_SILENCE_LIMIT && c % 100 != 0 ) {
+                                // SLING-9906 : then silence the log altogether
+                                continue;
+                            } else if ( c > READ_ERROR_SILENCE_LIMIT ) {
+                                prefix = "[unsilenced] ";
+                            } else {
+                                prefix = "";
+                            }
+                            if ( e.getCause() != null && e.getCause() instanceof ClassNotFoundException ) {
+                                // SLING-9906 : suppress exception in ClassNotFoundException case
+                                // as this can happen many times in case of a not deployed class.
+                                logger.warn(prefix + "Unable to read job from " + resource.getPath() + ", exception: " + e + ", cause: " + e.getCause());
+                            } else {
+                                logger.warn(prefix + "Unable to read job from " + resource.getPath(), e);
+                            }
                         }
                     }
                     job = new JobImpl(topic,
diff --git a/src/main/java/org/apache/sling/event/impl/jobs/config/JobManagerConfiguration.java b/src/main/java/org/apache/sling/event/impl/jobs/config/JobManagerConfiguration.java
index 8322e1a..71db186 100644
--- a/src/main/java/org/apache/sling/event/impl/jobs/config/JobManagerConfiguration.java
+++ b/src/main/java/org/apache/sling/event/impl/jobs/config/JobManagerConfiguration.java
@@ -117,6 +117,16 @@
     /** Configuration property for the scheduled jobs path. */
     public static final String PROPERTY_SCHEDULED_JOBS_PATH = "job.scheduled.jobs.path";
 
+    static JobManagerConfiguration newForTest(ResourceResolverFactory resourceResolverFactory,
+            QueueConfigurationManager queueConfigurationManager,
+            Map<String, Object> activateProps, Config config) {
+        final JobManagerConfiguration jobMgrConfig = new JobManagerConfiguration();
+        jobMgrConfig.resourceResolverFactory = resourceResolverFactory;
+        jobMgrConfig.queueConfigManager = queueConfigurationManager;
+        jobMgrConfig.activate(activateProps, config);
+        return jobMgrConfig;
+    }
+
     /** The jobs base path with a slash. */
     private String jobsBasePathWithSlash;
 
diff --git a/src/main/java/org/apache/sling/event/impl/jobs/queues/JobQueueImpl.java b/src/main/java/org/apache/sling/event/impl/jobs/queues/JobQueueImpl.java
index 5cabbbf..be6f485 100644
--- a/src/main/java/org/apache/sling/event/impl/jobs/queues/JobQueueImpl.java
+++ b/src/main/java/org/apache/sling/event/impl/jobs/queues/JobQueueImpl.java
@@ -123,14 +123,29 @@
      * @param config The queue configuration
      * @param services The queue services
      * @param topics The topics handled by this queue
+     * @param haltedTopics reference to pass newly halted topics back
      *
      * @return {@code JobQueueImpl} if there are jobs to process, {@code null} otherwise.
      */
     public static JobQueueImpl createQueue(final String name,
                         final InternalQueueConfiguration config,
                         final QueueServices services,
-                        final Set<String> topics) {
+                        final Set<String> topics,
+                        final Set<String> haltedTopicsBackRef) {
         final QueueJobCache cache = new QueueJobCache(services.configuration, name, services.statisticsManager, config.getType(), topics);
+        // the cache might contain newly halted topics.
+        // these we need to retain, to avoid scanning them going forward.
+        // but since the cache might be empty and thus discarded,
+        // we pass them back explicitly in provided haltedTopicsRef
+        if ( !cache.getNewlyHaltedTopics().isEmpty() ) {
+            for (String haltedTopic : cache.getNewlyHaltedTopics() ) {
+                if (haltedTopicsBackRef.add(haltedTopic)) {
+                    LoggerFactory.getLogger(JobQueueImpl.class.getName() + '.' + name)
+                            .warn("createQueue : topic halted due to ClassNotFoundExceptions : "
+                                    + haltedTopic);
+                }
+            }
+        }
         if ( cache.isEmpty() ) {
             return null;
         }
@@ -712,5 +727,9 @@
             this.requeue(handler);
         }
     }
+
+    QueueJobCache getCache() {
+        return cache;
+    }
 }
 
diff --git a/src/main/java/org/apache/sling/event/impl/jobs/queues/QueueJobCache.java b/src/main/java/org/apache/sling/event/impl/jobs/queues/QueueJobCache.java
index ebcc92d..f4190e6 100644
--- a/src/main/java/org/apache/sling/event/impl/jobs/queues/QueueJobCache.java
+++ b/src/main/java/org/apache/sling/event/impl/jobs/queues/QueueJobCache.java
@@ -66,6 +66,9 @@
     /** The set of new topics to scan. */
     private final Set<String> topicsWithNewJobs = new HashSet<String>();
 
+    /** The set of new topics to pause. */
+    private final Set<String> newlyHaltedTopics = new HashSet<String>();
+
     /** The cache of current objects. */
     private final List<JobImpl> cache = new ArrayList<JobImpl>();
 
@@ -119,6 +122,12 @@
         return result;
     }
 
+    Set<String> getNewlyHaltedTopics() {
+        synchronized ( this.topicsWithNewJobs ) {
+            return new HashSet<>(this.newlyHaltedTopics);
+        }
+    }
+
     public void setIsBlocked(final boolean value) {
         this.queueIsBlocked.set(value);
     }
@@ -166,6 +175,7 @@
                         if ( doFull ) {
                             checkingTopics.addAll(this.topics);
                         }
+                        checkingTopics.removeAll(newlyHaltedTopics);
                         if ( !checkingTopics.isEmpty() ) {
                             this.loadJobs(queue.getName(), checkingTopics, statisticsManager);
                         }
@@ -273,6 +283,7 @@
         final List<JobImpl> list = new ArrayList<JobImpl>();
 
         final AtomicBoolean scanTopic = new AtomicBoolean(false);
+        final AtomicBoolean haltTopic = new AtomicBoolean(false);
 
         JobTopicTraverser.traverse(logger, topicResource, new JobTopicTraverser.JobCallback() {
 
@@ -288,19 +299,32 @@
                     logger.debug("Ignoring job {} - processing already started.", job);
                 } else {
                     // error reading job
-                    scanTopic.set(true);
-                    if ( job.isReadErrorRecoverable() ) {
-                        logger.debug("Ignoring job {} due to recoverable read errors.", job);
-                    } else {
+                    switch( job.getReadErrorType() ) {
+                    case CLASSNOTFOUNDEXCEPTION : {
+                        haltTopic.set(true);
+                        break;
+                    }
+                    case RUNTIMEEXCEPTION : {
+                        scanTopic.set(true);
                         logger.debug("Failing job {} due to unrecoverable read errors.", job);
                         final JobHandler handler = new JobHandler(job, null, configuration);
                         handler.finished(JobState.ERROR, true, null);
+                        break;
+                    }
+                    default: {
+                        scanTopic.set(true);
+                        logger.debug("Ignoring job {} due to recoverable read errors.", job);
+                    }
                     }
                 }
                 return list.size() < maxPreloadLimit;
             }
         });
-        if ( scanTopic.get() ) {
+        if ( haltTopic.get() ) {
+            synchronized ( this.topicsWithNewJobs ) {
+                this.newlyHaltedTopics.add(topic);
+            }
+        } else if ( scanTopic.get() ) {
             synchronized ( this.topicsWithNewJobs ) {
                 this.topicsWithNewJobs.add(topic);
             }
@@ -317,7 +341,12 @@
     public void handleNewTopics(final Set<String> topics) {
         logger.debug("Update cache to handle new event for topics {}", topics);
         synchronized ( this.topicsWithNewJobs ) {
-            this.topicsWithNewJobs.addAll(topics);
+            final Set<String> nonHaltedTopics = new HashSet<>(topics);
+            if (!Collections.disjoint(topics, newlyHaltedTopics)) {
+                logger.warn("handleNewTopics : sets not disjoint as expected. topics: " + topics + ", newlyHaltedTopics: " + newlyHaltedTopics);
+                nonHaltedTopics.removeAll(newlyHaltedTopics);
+            }
+            this.topicsWithNewJobs.addAll(nonHaltedTopics);
         }
         this.topics.addAll(topics);
     }
diff --git a/src/main/java/org/apache/sling/event/impl/jobs/queues/QueueManager.java b/src/main/java/org/apache/sling/event/impl/jobs/queues/QueueManager.java
index 40edb37..4336ad5 100644
--- a/src/main/java/org/apache/sling/event/impl/jobs/queues/QueueManager.java
+++ b/src/main/java/org/apache/sling/event/impl/jobs/queues/QueueManager.java
@@ -27,6 +27,7 @@
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentSkipListSet;
 import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.apache.sling.api.resource.Resource;
@@ -79,6 +80,20 @@
 public class QueueManager
     implements Runnable, EventHandler, ConfigurationChangeListener {
 
+    static QueueManager newForTest(EventAdmin eventAdmin, JobConsumerManager jobConsumerManager,
+            QueuesMBean queuesMBean, ThreadPoolManager threadPoolManager, ThreadPool threadPool,
+            JobManagerConfiguration configuration, StatisticsManager statisticsManager) {
+        final QueueManager qm = new QueueManager();
+        qm.eventAdmin = eventAdmin;
+        qm.jobConsumerManager = jobConsumerManager;
+        qm.queuesMBean = queuesMBean;
+        qm.threadPoolManager = threadPoolManager;
+        qm.threadPool = threadPool;
+        qm.configuration = configuration;
+        qm.statisticsManager = statisticsManager;
+        return qm;
+    }
+
     /** Default logger. */
     private final Logger logger = LoggerFactory.getLogger(this.getClass());
 
@@ -122,6 +137,9 @@
     /** The queue services. */
     private volatile QueueServices queueServices;
 
+    /** The set of new topics to pause. */
+    private final Set<String> haltedTopics = new ConcurrentSkipListSet<String>();
+
     /**
      * Activate this component.
      * @param props Configuration properties
@@ -166,7 +184,7 @@
      * is idle for two consecutive clean up calls, it is removed.
      * @see java.lang.Runnable#run()
      */
-    private void maintain() {
+    void maintain() {
         this.schedulerRuns++;
         logger.debug("Queue manager maintenance: Starting #{}", this.schedulerRuns);
 
@@ -219,6 +237,9 @@
     private void start(final QueueInfo queueInfo,
                        final Set<String> topics) {
         final InternalQueueConfiguration config = queueInfo.queueConfiguration;
+        final Set<String> filteredTopics = new HashSet<String>(topics);
+        filteredTopics.removeAll(haltedTopics);
+
         // get or create queue
         boolean isNewQueue = false;
         JobQueueImpl queue = null;
@@ -232,7 +253,7 @@
                 queue = null;
             }
             if ( queue == null ) {
-                queue = JobQueueImpl.createQueue(queueInfo.queueName, config, queueServices, topics);
+                queue = JobQueueImpl.createQueue(queueInfo.queueName, config, queueServices, filteredTopics, haltedTopics);
                 // on startup the queue might be empty and we get null back from createQueue
                 if ( queue != null ) {
                     isNewQueue = true;
@@ -244,7 +265,7 @@
         if ( queue != null ) {
             logger.debug("Starting queue {}", queueInfo.queueName);
             if ( !isNewQueue ) {
-                queue.wakeUpQueue(topics);
+                queue.wakeUpQueue(filteredTopics);
             }
             queue.startJobs();
         }
@@ -356,6 +377,7 @@
         if ( this.configuration != null ) {
             logger.debug("Topology changed {}", active);
             this.isActive.set(active);
+            clearHaltedTopics("configurationChanged : unhalted topics due to configuration change");
             if ( active ) {
                 fullTopicScan();
             } else {
@@ -364,7 +386,21 @@
         }
     }
 
-    private void fullTopicScan() {
+    private void clearHaltedTopics(String logPrefix) {
+        final String haltedTopicsToString;
+        // Note: the synchronized below is just to avoid wrong logging about unhalting,
+        // the haltedTopics access itself isn't prevented by this (and it is a concurrent set)
+        synchronized( haltedTopics ) {
+            if ( haltedTopics.isEmpty() ) {
+                return;
+            }
+            haltedTopicsToString = haltedTopics.toString();
+            haltedTopics.clear();
+        }
+        logger.info(logPrefix + " : " + haltedTopicsToString);
+    }
+
+    void fullTopicScan() {
         logger.debug("Scanning repository for existing topics...");
         final Set<String> topics = this.scanTopics();
         final Map<QueueInfo, Set<String>> mapping = this.updateTopicMapping(topics);
@@ -405,6 +441,10 @@
      */
     @Override
     public void handleEvent(final Event event) {
+        if ( ResourceHelper.BUNDLE_EVENT_STARTED.equals(event.getTopic())
+                || ResourceHelper.BUNDLE_EVENT_UPDATED.equals(event.getTopic()) ) {
+            clearHaltedTopics("handleEvent: unhalted topics due to bundle started/updated event");
+       }
         final String topic = (String)event.getProperty(NotificationConstants.NOTIFICATION_PROPERTY_JOB_TOPIC);
         if ( this.isActive.get() && topic != null ) {
             logger.debug("Received event {}", topic);
diff --git a/src/test/java/org/apache/sling/event/impl/jobs/config/JobManagerConfigurationTestFactory.java b/src/test/java/org/apache/sling/event/impl/jobs/config/JobManagerConfigurationTestFactory.java
new file mode 100644
index 0000000..3ece2a3
--- /dev/null
+++ b/src/test/java/org/apache/sling/event/impl/jobs/config/JobManagerConfigurationTestFactory.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sling.event.impl.jobs.config;
+
+import java.lang.annotation.Annotation;
+import java.util.HashMap;
+
+import org.apache.sling.api.resource.ResourceResolverFactory;
+import org.apache.sling.event.impl.jobs.config.JobManagerConfiguration.Config;
+
+public class JobManagerConfigurationTestFactory {
+
+    public static JobManagerConfiguration create(String jobsRoot, 
+            ResourceResolverFactory resourceResolverFactory, 
+            QueueConfigurationManager queueConfigurationManager) throws NoSuchFieldException {
+        final JobManagerConfiguration real = JobManagerConfiguration.newForTest(
+                resourceResolverFactory, queueConfigurationManager,
+                new HashMap<String, Object>(), new Config() {
+
+            @Override
+            public Class<? extends Annotation> annotationType() {
+                return null;
+            }
+
+            @Override
+            public boolean job_consumermanager_disableDistribution() {
+                return false;
+            }
+
+            @Override
+            public long startup_delay() {
+                return 0;
+            }
+
+            @Override
+            public int cleanup_period() {
+                return 0;
+            }
+            
+        });
+        return real;
+    }
+    
+}
diff --git a/src/test/java/org/apache/sling/event/impl/jobs/queues/TestTopicHalting.java b/src/test/java/org/apache/sling/event/impl/jobs/queues/TestTopicHalting.java
new file mode 100644
index 0000000..a75c92d
--- /dev/null
+++ b/src/test/java/org/apache/sling/event/impl/jobs/queues/TestTopicHalting.java
@@ -0,0 +1,311 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sling.event.impl.jobs.queues;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+
+import java.io.Externalizable;
+import java.io.IOException;
+import java.io.ObjectInput;
+import java.io.ObjectOutput;
+import java.util.Calendar;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import javax.jcr.ItemExistsException;
+import javax.jcr.PathNotFoundException;
+import javax.jcr.RepositoryException;
+import javax.jcr.lock.LockException;
+import javax.jcr.nodetype.ConstraintViolationException;
+import javax.jcr.version.VersionException;
+
+import org.apache.sling.api.resource.Resource;
+import org.apache.sling.api.resource.ResourceResolver;
+import org.apache.sling.api.resource.ResourceResolverFactory;
+import org.apache.sling.commons.scheduler.Scheduler;
+import org.apache.sling.commons.threads.ThreadPool;
+import org.apache.sling.commons.threads.ThreadPoolManager;
+import org.apache.sling.event.impl.jobs.JobConsumerManager;
+import org.apache.sling.event.impl.jobs.config.InternalQueueConfiguration;
+import org.apache.sling.event.impl.jobs.config.InternalQueueConfiguration.Config;
+import org.apache.sling.event.impl.jobs.config.JobManagerConfiguration;
+import org.apache.sling.event.impl.jobs.config.JobManagerConfigurationTestFactory;
+import org.apache.sling.event.impl.jobs.config.QueueConfigurationManager;
+import org.apache.sling.event.impl.jobs.config.QueueConfigurationManager.QueueInfo;
+import org.apache.sling.event.impl.jobs.jmx.QueuesMBeanImpl;
+import org.apache.sling.event.impl.jobs.scheduling.JobSchedulerImpl;
+import org.apache.sling.event.impl.jobs.stats.StatisticsManager;
+import org.apache.sling.event.impl.support.Environment;
+import org.apache.sling.event.impl.support.ResourceHelper;
+import org.apache.sling.event.jobs.Job;
+import org.apache.sling.event.jobs.Queue;
+import org.apache.sling.event.jobs.QueueConfiguration.ThreadPriority;
+import org.apache.sling.event.jobs.QueueConfiguration.Type;
+import org.apache.sling.testing.mock.osgi.MockOsgi;
+import org.apache.sling.testing.mock.sling.MockSling;
+import org.apache.sling.testing.mock.sling.ResourceResolverType;
+import org.apache.sling.testing.mock.sling.builder.ContentBuilder;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Mock;
+import org.mockito.Mockito;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.runners.MockitoJUnitRunner;
+import org.mockito.stubbing.Answer;
+import org.osgi.framework.BundleContext;
+import org.osgi.service.component.ComponentContext;
+import org.osgi.service.event.EventAdmin;
+
+/**
+ * SLING-9906.
+ * This tests a new behaviour of a topic introduced with SLING-9906.
+ * When there are ClassNotFoundException read errors when loading a job,
+ * this is now considered a special, temporarily blocking case which
+ * is likely due to a missing bundle. To prevent this exception from
+ * reoccurring over and over again, the topic is now "halted", ie job
+ * execution is stopped for this topic. It is "unhalted" once some
+ * configuration changes (such as a JobConsumer/Executor being added).
+ */
+@RunWith(MockitoJUnitRunner.class)
+public class TestTopicHalting {
+
+    private static AtomicInteger cnfeCount = new AtomicInteger();
+
+    @Mock
+    private JobSchedulerImpl jobScheduler;
+    
+    @Mock
+    private EventAdmin eventAdmin;
+    
+    @Mock
+    private JobConsumerManager jobConsumerManager;
+
+    @Mock
+    private ThreadPoolManager threadPoolManager;
+
+    @Mock
+    private ThreadPool threadPool;
+
+    @Mock
+    private StatisticsManager statisticsManager;
+
+    @Mock
+    private QueueConfigurationManager queueConfigurationManager;
+
+    @Mock
+    private Scheduler scheduler;
+
+    private JobManagerConfiguration configuration;
+
+    private QueuesMBeanImpl queuesMBean;
+    
+    private String ownSlingId;
+
+    private int jobCnt;
+
+    private ResourceResolverFactory factory;
+
+    private ComponentContext componentContext;
+
+    private BundleContext bundleContext;
+
+    /** object under test */
+    private QueueManager queueManager;
+
+    @Before
+    public void setUp() throws Throwable {
+        cnfeCount.set(0);
+        ownSlingId = UUID.randomUUID().toString();
+        Environment.APPLICATION_ID = ownSlingId;
+        componentContext = MockOsgi.newComponentContext();
+        bundleContext = componentContext.getBundleContext();
+        
+        factory = MockSling.newResourceResolverFactory(ResourceResolverType.JCR_OAK, bundleContext);
+
+        queuesMBean = new QueuesMBeanImpl();
+        queuesMBean.activate(bundleContext);
+
+        configuration = JobManagerConfigurationTestFactory.create(JobManagerConfiguration.DEFAULT_REPOSITORY_PATH, 
+                factory, queueConfigurationManager);
+        
+        queueManager = QueueManager.newForTest(eventAdmin, jobConsumerManager, 
+                queuesMBean, threadPoolManager, threadPool, configuration, statisticsManager);
+
+        initQueueConfigurationManagerMocks();
+        
+        queueManager.activate(null);
+
+        @SuppressWarnings("deprecation")
+        ResourceResolver resourceResolver = factory.getAdministrativeResourceResolver(null);
+        ContentBuilder contentBuilder = new ContentBuilder(resourceResolver);
+
+        final String topic = "aTopic";
+        for (int year = 2019; year <= 2022; year++) {
+            for (int month = 1; month <= 12; month++) {
+                createJob(contentBuilder, ownSlingId, topic, year, month);
+            }
+        }
+        resourceResolver.commit();
+    }
+
+    private void initQueueConfigurationManagerMocks() {
+        Mockito.when(queueConfigurationManager.getQueueInfo(Mockito.anyString())).thenAnswer(new Answer<QueueInfo>() {
+            
+            private final Map<String, QueueInfo> queueInfos = new HashMap<>();
+
+            @Override
+            public QueueInfo answer(InvocationOnMock invocation) throws Throwable {
+                final String topic = (String) invocation.getArguments()[0];
+                QueueInfo queueInfo = queueInfos.get(topic);
+                if ( queueInfo == null ) {
+                    queueInfo = createQueueInfo(topic);
+                    queueInfos.put(topic, queueInfo);
+                }
+                return queueInfo;
+            }
+
+            private QueueInfo createQueueInfo(final String topic) {
+                final QueueInfo result = new QueueInfo();
+                result.queueName = "Queue for topic=" + topic;
+                Map<String, Object> props = new HashMap<>();
+                Config cconfig = Mockito.mock(Config.class);
+                Mockito.when(cconfig.queue_priority()).thenReturn(ThreadPriority.NORM.name());
+                Mockito.when(cconfig.queue_type()).thenReturn(Type.ORDERED.name());
+                Mockito.when(cconfig.queue_maxparallel()).thenReturn(1.0);
+                result.queueConfiguration = InternalQueueConfiguration.fromConfiguration(props, cconfig);
+                result.targetId = ownSlingId;
+                return result;
+            }
+            
+        });
+    }
+    
+    @Test
+    public void testUnhalting() throws Throwable {
+        assertNotNull(queueManager);
+        assertEquals(0, cnfeCount.get());
+        queueManager.configurationChanged(true);
+        int expectedCnfeCount = 2 * jobCnt;
+        assertEquals(expectedCnfeCount, cnfeCount.get());
+        queueManager.maintain();
+        assertEquals(expectedCnfeCount, cnfeCount.get());
+        queueManager.maintain();
+        assertEquals(expectedCnfeCount, cnfeCount.get());
+        queueManager.maintain();
+        assertEquals(expectedCnfeCount, cnfeCount.get());
+        queueManager.configurationChanged(true);
+        expectedCnfeCount += 2 * jobCnt;
+        assertEquals(expectedCnfeCount, cnfeCount.get());
+        queueManager.maintain();
+        assertEquals(expectedCnfeCount, cnfeCount.get());
+        queueManager.maintain();
+        assertEquals(expectedCnfeCount, cnfeCount.get());
+        queueManager.maintain();
+        assertEquals(expectedCnfeCount, cnfeCount.get());
+    }
+
+    @Test
+    public void testFullTopicScan() throws Throwable {
+        assertNotNull(queueManager);
+        assertEquals(0, cnfeCount.get());
+        for( int i = 0; i < 50; i++ ) {
+            queueManager.fullTopicScan();
+            assertEquals(2 * jobCnt, cnfeCount.get());
+        }
+    }
+
+    @Test
+    public void testMaintain() throws Throwable {
+        assertNotNull(queueManager);
+        assertEquals(0, cnfeCount.get());
+        // configurationChanged(true) is going to do a fullTopicScan()
+        queueManager.configurationChanged(true);
+        // due to fullTopicScan -> 2 times the jobs are loaded causing a CNFE
+        int expectedCnfeCount = 2 * jobCnt;
+        assertEquals(expectedCnfeCount, cnfeCount.get());
+        for( int i = 0; i < 50; i++ ) {
+            // schedulerRuns % 3 == 1
+            queueManager.maintain();
+            assertEquals(expectedCnfeCount, cnfeCount.get());
+            // schedulerRuns % 3 == 2
+            queueManager.maintain();
+            assertEquals(expectedCnfeCount, cnfeCount.get());
+            // schedulerRuns % 3 == 0
+            queueManager.maintain();
+            // if we weren't halting the topic due to CNFE, we'd now be doing:
+            // expectedCnfeCount += 2 * jobCnt;
+            assertEquals(expectedCnfeCount, cnfeCount.get());
+        }
+    }
+    
+    @Test
+    public void testConfigurationChanged() throws Throwable {
+        assertNotNull(queueManager);
+        assertEquals(0, cnfeCount.get());
+        queueManager.configurationChanged(false);
+        assertEquals(0, cnfeCount.get());
+        queueManager.configurationChanged(true);
+        assertEquals(2 * jobCnt, cnfeCount.get());
+        Iterable<Queue> qit = queueManager.getQueues();
+        assertNotNull(qit);
+        Iterator<Queue> it = qit.iterator();
+        assertNotNull(it);
+        assertFalse(it.hasNext());
+    }
+    
+    private Resource createJob(ContentBuilder contentBuilder, String localSlingId, String topic, int year, int month) throws ItemExistsException, PathNotFoundException, VersionException, ConstraintViolationException, LockException, RepositoryException {
+        // /var/eventing/jobs/assigned/<slingId>/<topic>/2020/10/13/19/26        
+        String applicationId = localSlingId;
+        String counter = String.valueOf(jobCnt++);
+        String jobId = year + "/" + month + "/1/20/0/" + applicationId + "_" + counter;
+        String path = JobManagerConfiguration.DEFAULT_REPOSITORY_PATH + "/assigned/" + localSlingId + "/" + topic + "/" + jobId;
+        
+        final UnDeserializableDataObject uddao = new UnDeserializableDataObject();
+        return contentBuilder.resource(path, 
+                ResourceHelper.PROPERTY_JOB_TOPIC, topic, 
+                ResourceHelper.PROPERTY_JOB_ID, jobId,
+                Job.PROPERTY_JOB_CREATED, Calendar.getInstance(),
+                "uddao", uddao);
+    }
+
+    private static final class UnDeserializableDataObject implements Externalizable {
+        private static final long serialVersionUID = 1L;
+
+        public UnDeserializableDataObject() {
+            // we'll allow this one
+        }
+        
+        @Override
+        public void writeExternal(ObjectOutput out) throws IOException {
+            out.write(42);
+        }
+
+        @Override
+        public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
+            cnfeCount.incrementAndGet();
+            throw new ClassNotFoundException("UnDeserializableDataObject");
+        }
+    }
+}
diff --git a/src/test/java/org/apache/sling/event/it/AbstractJobHandlingTest.java b/src/test/java/org/apache/sling/event/it/AbstractJobHandlingTest.java
index 464ef74..87e296c 100644
--- a/src/test/java/org/apache/sling/event/it/AbstractJobHandlingTest.java
+++ b/src/test/java/org/apache/sling/event/it/AbstractJobHandlingTest.java
@@ -23,6 +23,7 @@
 import static org.ops4j.pax.exam.CoreOptions.junitBundles;
 import static org.ops4j.pax.exam.CoreOptions.mavenBundle;
 import static org.ops4j.pax.exam.CoreOptions.options;
+import static org.ops4j.pax.exam.CoreOptions.repository;
 import static org.ops4j.pax.exam.CoreOptions.systemProperty;
 import static org.ops4j.pax.exam.CoreOptions.when;
 
@@ -104,6 +105,7 @@
         final String slingHome = new File(buildDir + File.separatorChar + "sling_" + System.currentTimeMillis()).getAbsolutePath();
 
         return options(
+                repository("https://repo.maven.apache.org/maven2/").id("central"),
                 frameworkProperty("sling.home").value(slingHome),
                 frameworkProperty("repository.home").value(slingHome + File.separatorChar + "repository"),
                 when( localRepo.length() > 0 ).useOptions(