SLING-9906 : halted topics introduced - first collected in the QueueJobCache, but handed back to the QueueManager to survive queues being outdated/closed after inactivity
diff --git a/src/main/java/org/apache/sling/event/impl/jobs/queues/JobQueueImpl.java b/src/main/java/org/apache/sling/event/impl/jobs/queues/JobQueueImpl.java
index ca45976..e1804f6 100644
--- a/src/main/java/org/apache/sling/event/impl/jobs/queues/JobQueueImpl.java
+++ b/src/main/java/org/apache/sling/event/impl/jobs/queues/JobQueueImpl.java
@@ -123,14 +123,21 @@
      * @param config The queue configuration
      * @param services The queue services
      * @param topics The topics handled by this queue
+     * @param haltedTopics reference to pass newly halted topics back
      *
      * @return {@code JobQueueImpl} if there are jobs to process, {@code null} otherwise.
      */
     public static JobQueueImpl createQueue(final String name,
                         final InternalQueueConfiguration config,
                         final QueueServices services,
-                        final Set<String> topics) {
+                        final Set<String> topics,
+                        final Set<String> haltedTopicsBackRef) {
         final QueueJobCache cache = new QueueJobCache(services.configuration, name, services.statisticsManager, config.getType(), topics);
+        // the cache might contain newly halted topics.
+        // these we need to retain, to avoid scanning them going forward.
+        // but since the cache might be empty and thus discarded,
+        // we pass them back explicitly in provided haltedTopicsRef
+        haltedTopicsBackRef.addAll(cache.getNewlyHaltedTopics());
         if ( cache.isEmpty() ) {
             return null;
         }
diff --git a/src/main/java/org/apache/sling/event/impl/jobs/queues/QueueJobCache.java b/src/main/java/org/apache/sling/event/impl/jobs/queues/QueueJobCache.java
index ebcc92d..f4190e6 100644
--- a/src/main/java/org/apache/sling/event/impl/jobs/queues/QueueJobCache.java
+++ b/src/main/java/org/apache/sling/event/impl/jobs/queues/QueueJobCache.java
@@ -66,6 +66,9 @@
     /** The set of new topics to scan. */
     private final Set<String> topicsWithNewJobs = new HashSet<String>();
 
+    /** The set of new topics to pause. */
+    private final Set<String> newlyHaltedTopics = new HashSet<String>();
+
     /** The cache of current objects. */
     private final List<JobImpl> cache = new ArrayList<JobImpl>();
 
@@ -119,6 +122,12 @@
         return result;
     }
 
+    Set<String> getNewlyHaltedTopics() {
+        synchronized ( this.topicsWithNewJobs ) {
+            return new HashSet<>(this.newlyHaltedTopics);
+        }
+    }
+
     public void setIsBlocked(final boolean value) {
         this.queueIsBlocked.set(value);
     }
@@ -166,6 +175,7 @@
                         if ( doFull ) {
                             checkingTopics.addAll(this.topics);
                         }
+                        checkingTopics.removeAll(newlyHaltedTopics);
                         if ( !checkingTopics.isEmpty() ) {
                             this.loadJobs(queue.getName(), checkingTopics, statisticsManager);
                         }
@@ -273,6 +283,7 @@
         final List<JobImpl> list = new ArrayList<JobImpl>();
 
         final AtomicBoolean scanTopic = new AtomicBoolean(false);
+        final AtomicBoolean haltTopic = new AtomicBoolean(false);
 
         JobTopicTraverser.traverse(logger, topicResource, new JobTopicTraverser.JobCallback() {
 
@@ -288,19 +299,32 @@
                     logger.debug("Ignoring job {} - processing already started.", job);
                 } else {
                     // error reading job
-                    scanTopic.set(true);
-                    if ( job.isReadErrorRecoverable() ) {
-                        logger.debug("Ignoring job {} due to recoverable read errors.", job);
-                    } else {
+                    switch( job.getReadErrorType() ) {
+                    case CLASSNOTFOUNDEXCEPTION : {
+                        haltTopic.set(true);
+                        break;
+                    }
+                    case RUNTIMEEXCEPTION : {
+                        scanTopic.set(true);
                         logger.debug("Failing job {} due to unrecoverable read errors.", job);
                         final JobHandler handler = new JobHandler(job, null, configuration);
                         handler.finished(JobState.ERROR, true, null);
+                        break;
+                    }
+                    default: {
+                        scanTopic.set(true);
+                        logger.debug("Ignoring job {} due to recoverable read errors.", job);
+                    }
                     }
                 }
                 return list.size() < maxPreloadLimit;
             }
         });
-        if ( scanTopic.get() ) {
+        if ( haltTopic.get() ) {
+            synchronized ( this.topicsWithNewJobs ) {
+                this.newlyHaltedTopics.add(topic);
+            }
+        } else if ( scanTopic.get() ) {
             synchronized ( this.topicsWithNewJobs ) {
                 this.topicsWithNewJobs.add(topic);
             }
@@ -317,7 +341,12 @@
     public void handleNewTopics(final Set<String> topics) {
         logger.debug("Update cache to handle new event for topics {}", topics);
         synchronized ( this.topicsWithNewJobs ) {
-            this.topicsWithNewJobs.addAll(topics);
+            final Set<String> nonHaltedTopics = new HashSet<>(topics);
+            if (!Collections.disjoint(topics, newlyHaltedTopics)) {
+                logger.warn("handleNewTopics : sets not disjoint as expected. topics: " + topics + ", newlyHaltedTopics: " + newlyHaltedTopics);
+                nonHaltedTopics.removeAll(newlyHaltedTopics);
+            }
+            this.topicsWithNewJobs.addAll(nonHaltedTopics);
         }
         this.topics.addAll(topics);
     }
diff --git a/src/main/java/org/apache/sling/event/impl/jobs/queues/QueueManager.java b/src/main/java/org/apache/sling/event/impl/jobs/queues/QueueManager.java
index 40bfe97..8e1f44d 100644
--- a/src/main/java/org/apache/sling/event/impl/jobs/queues/QueueManager.java
+++ b/src/main/java/org/apache/sling/event/impl/jobs/queues/QueueManager.java
@@ -27,6 +27,7 @@
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentSkipListSet;
 import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.apache.sling.api.resource.Resource;
@@ -136,6 +137,9 @@
     /** The queue services. */
     private volatile QueueServices queueServices;
 
+    /** The set of new topics to pause. */
+    private final Set<String> haltedTopics = new ConcurrentSkipListSet<String>();
+
     /**
      * Activate this component.
      * @param props Configuration properties
@@ -180,7 +184,7 @@
      * is idle for two consecutive clean up calls, it is removed.
      * @see java.lang.Runnable#run()
      */
-    private void maintain() {
+    void maintain() {
         this.schedulerRuns++;
         logger.debug("Queue manager maintenance: Starting #{}", this.schedulerRuns);
 
@@ -233,6 +237,9 @@
     private void start(final QueueInfo queueInfo,
                        final Set<String> topics) {
         final InternalQueueConfiguration config = queueInfo.queueConfiguration;
+        final Set<String> filteredTopics = new HashSet<String>(topics);
+        filteredTopics.removeAll(haltedTopics);
+
         // get or create queue
         boolean isNewQueue = false;
         JobQueueImpl queue = null;
@@ -246,7 +253,7 @@
                 queue = null;
             }
             if ( queue == null ) {
-                queue = JobQueueImpl.createQueue(queueInfo.queueName, config, queueServices, topics);
+                queue = JobQueueImpl.createQueue(queueInfo.queueName, config, queueServices, filteredTopics, haltedTopics);
                 // on startup the queue might be empty and we get null back from createQueue
                 if ( queue != null ) {
                     isNewQueue = true;
@@ -258,7 +265,7 @@
         if ( queue != null ) {
             logger.debug("Starting queue {}", queueInfo.queueName);
             if ( !isNewQueue ) {
-                queue.wakeUpQueue(topics);
+                queue.wakeUpQueue(filteredTopics);
             }
             queue.startJobs();
         }
diff --git a/src/test/java/org/apache/sling/event/impl/jobs/queues/TestQueueJobCache.java b/src/test/java/org/apache/sling/event/impl/jobs/queues/TestQueueJobCache.java
index a000d17..aae0c73 100644
--- a/src/test/java/org/apache/sling/event/impl/jobs/queues/TestQueueJobCache.java
+++ b/src/test/java/org/apache/sling/event/impl/jobs/queues/TestQueueJobCache.java
@@ -68,7 +68,6 @@
 import org.apache.sling.testing.mock.sling.ResourceResolverType;
 import org.apache.sling.testing.mock.sling.builder.ContentBuilder;
 import org.junit.Before;
-import org.junit.Ignore;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.mockito.Mock;
@@ -128,6 +127,7 @@
 
     @Before
     public void setUp() throws Throwable {
+        cnfeCount.set(0);
         ownSlingId = UUID.randomUUID().toString();
         Environment.APPLICATION_ID = ownSlingId;
         componentContext = MockOsgi.newComponentContext();
@@ -164,9 +164,20 @@
     private void initQueueConfigurationManagerMocks() {
         Mockito.when(queueConfigurationManager.getQueueInfo(Mockito.anyString())).thenAnswer(new Answer<QueueInfo>() {
             
+            private final Map<String, QueueInfo> queueInfos = new HashMap<>();
+
             @Override
             public QueueInfo answer(InvocationOnMock invocation) throws Throwable {
                 final String topic = (String) invocation.getArguments()[0];
+                QueueInfo queueInfo = queueInfos.get(topic);
+                if ( queueInfo == null ) {
+                    queueInfo = createQueueInfo(topic);
+                    queueInfos.put(topic, queueInfo);
+                }
+                return queueInfo;
+            }
+
+            private QueueInfo createQueueInfo(final String topic) {
                 final QueueInfo result = new QueueInfo();
                 result.queueName = "Queue for topic=" + topic;
                 Map<String, Object> props = new HashMap<>();
@@ -183,10 +194,37 @@
     }
     
     @Test
-    @Ignore(value="nothing useful tested here yet")
     public void testFullTopicScan() throws Throwable {
         assertNotNull(queueManager);
-        queueManager.fullTopicScan();
+        assertEquals(0, cnfeCount.get());
+        for( int i = 0; i < 50; i++ ) {
+            queueManager.fullTopicScan();
+            assertEquals(2 * jobCnt, cnfeCount.get());
+        }
+    }
+
+    @Test
+    public void testMaintain() throws Throwable {
+        assertNotNull(queueManager);
+        assertEquals(0, cnfeCount.get());
+        // configurationChanged(true) is going to do a fullTopicScan()
+        queueManager.configurationChanged(true);
+        // due to fullTopicScan -> 2 times the jobs are loaded causing a CNFE
+        int expectedCnfeCount = 2 * jobCnt;
+        assertEquals(expectedCnfeCount, cnfeCount.get());
+        for( int i = 0; i < 50; i++ ) {
+            // schedulerRuns % 3 == 1
+            queueManager.maintain();
+            assertEquals(expectedCnfeCount, cnfeCount.get());
+            // schedulerRuns % 3 == 2
+            queueManager.maintain();
+            assertEquals(expectedCnfeCount, cnfeCount.get());
+            // schedulerRuns % 3 == 0
+            queueManager.maintain();
+            // if we weren't halting the topic due to CNFE, we'd now be doing:
+            // expectedCnfeCount += 2 * jobCnt;
+            assertEquals(expectedCnfeCount, cnfeCount.get());
+        }
     }
     
     @Test
@@ -196,17 +234,12 @@
         queueManager.configurationChanged(false);
         assertEquals(0, cnfeCount.get());
         queueManager.configurationChanged(true);
-        assertEquals(4 * jobCnt, cnfeCount.get());
+        assertEquals(2 * jobCnt, cnfeCount.get());
         Iterable<Queue> qit = queueManager.getQueues();
         assertNotNull(qit);
         Iterator<Queue> it = qit.iterator();
         assertNotNull(it);
-        assertTrue(it.hasNext());
-        JobQueueImpl n = (JobQueueImpl) it.next();
-        assertNotNull(n);
-        QueueJobCache c = n.getCache();
-        assertNotNull(c);
-        assertFalse(c.isEmpty());
+        assertFalse(it.hasNext());
     }
     
     private Resource createJob(ContentBuilder contentBuilder, String localSlingId, String topic, int year, int month) throws ItemExistsException, PathNotFoundException, VersionException, ConstraintViolationException, LockException, RepositoryException {