SLING-9906 : halted topics introduced - first collected in the QueueJobCache, but handed back to the QueueManager to survive queues being outdated/closed after inactivity
diff --git a/src/main/java/org/apache/sling/event/impl/jobs/queues/JobQueueImpl.java b/src/main/java/org/apache/sling/event/impl/jobs/queues/JobQueueImpl.java
index ca45976..e1804f6 100644
--- a/src/main/java/org/apache/sling/event/impl/jobs/queues/JobQueueImpl.java
+++ b/src/main/java/org/apache/sling/event/impl/jobs/queues/JobQueueImpl.java
@@ -123,14 +123,21 @@
* @param config The queue configuration
* @param services The queue services
* @param topics The topics handled by this queue
+ * @param haltedTopics reference to pass newly halted topics back
*
* @return {@code JobQueueImpl} if there are jobs to process, {@code null} otherwise.
*/
public static JobQueueImpl createQueue(final String name,
final InternalQueueConfiguration config,
final QueueServices services,
- final Set<String> topics) {
+ final Set<String> topics,
+ final Set<String> haltedTopicsBackRef) {
final QueueJobCache cache = new QueueJobCache(services.configuration, name, services.statisticsManager, config.getType(), topics);
+ // the cache might contain newly halted topics.
+ // these we need to retain, to avoid scanning them going forward.
+ // but since the cache might be empty and thus discarded,
+ // we pass them back explicitly in provided haltedTopicsRef
+ haltedTopicsBackRef.addAll(cache.getNewlyHaltedTopics());
if ( cache.isEmpty() ) {
return null;
}
diff --git a/src/main/java/org/apache/sling/event/impl/jobs/queues/QueueJobCache.java b/src/main/java/org/apache/sling/event/impl/jobs/queues/QueueJobCache.java
index ebcc92d..f4190e6 100644
--- a/src/main/java/org/apache/sling/event/impl/jobs/queues/QueueJobCache.java
+++ b/src/main/java/org/apache/sling/event/impl/jobs/queues/QueueJobCache.java
@@ -66,6 +66,9 @@
/** The set of new topics to scan. */
private final Set<String> topicsWithNewJobs = new HashSet<String>();
+ /** The set of new topics to pause. */
+ private final Set<String> newlyHaltedTopics = new HashSet<String>();
+
/** The cache of current objects. */
private final List<JobImpl> cache = new ArrayList<JobImpl>();
@@ -119,6 +122,12 @@
return result;
}
+ Set<String> getNewlyHaltedTopics() {
+ synchronized ( this.topicsWithNewJobs ) {
+ return new HashSet<>(this.newlyHaltedTopics);
+ }
+ }
+
public void setIsBlocked(final boolean value) {
this.queueIsBlocked.set(value);
}
@@ -166,6 +175,7 @@
if ( doFull ) {
checkingTopics.addAll(this.topics);
}
+ checkingTopics.removeAll(newlyHaltedTopics);
if ( !checkingTopics.isEmpty() ) {
this.loadJobs(queue.getName(), checkingTopics, statisticsManager);
}
@@ -273,6 +283,7 @@
final List<JobImpl> list = new ArrayList<JobImpl>();
final AtomicBoolean scanTopic = new AtomicBoolean(false);
+ final AtomicBoolean haltTopic = new AtomicBoolean(false);
JobTopicTraverser.traverse(logger, topicResource, new JobTopicTraverser.JobCallback() {
@@ -288,19 +299,32 @@
logger.debug("Ignoring job {} - processing already started.", job);
} else {
// error reading job
- scanTopic.set(true);
- if ( job.isReadErrorRecoverable() ) {
- logger.debug("Ignoring job {} due to recoverable read errors.", job);
- } else {
+ switch( job.getReadErrorType() ) {
+ case CLASSNOTFOUNDEXCEPTION : {
+ haltTopic.set(true);
+ break;
+ }
+ case RUNTIMEEXCEPTION : {
+ scanTopic.set(true);
logger.debug("Failing job {} due to unrecoverable read errors.", job);
final JobHandler handler = new JobHandler(job, null, configuration);
handler.finished(JobState.ERROR, true, null);
+ break;
+ }
+ default: {
+ scanTopic.set(true);
+ logger.debug("Ignoring job {} due to recoverable read errors.", job);
+ }
}
}
return list.size() < maxPreloadLimit;
}
});
- if ( scanTopic.get() ) {
+ if ( haltTopic.get() ) {
+ synchronized ( this.topicsWithNewJobs ) {
+ this.newlyHaltedTopics.add(topic);
+ }
+ } else if ( scanTopic.get() ) {
synchronized ( this.topicsWithNewJobs ) {
this.topicsWithNewJobs.add(topic);
}
@@ -317,7 +341,12 @@
public void handleNewTopics(final Set<String> topics) {
logger.debug("Update cache to handle new event for topics {}", topics);
synchronized ( this.topicsWithNewJobs ) {
- this.topicsWithNewJobs.addAll(topics);
+ final Set<String> nonHaltedTopics = new HashSet<>(topics);
+ if (!Collections.disjoint(topics, newlyHaltedTopics)) {
+ logger.warn("handleNewTopics : sets not disjoint as expected. topics: " + topics + ", newlyHaltedTopics: " + newlyHaltedTopics);
+ nonHaltedTopics.removeAll(newlyHaltedTopics);
+ }
+ this.topicsWithNewJobs.addAll(nonHaltedTopics);
}
this.topics.addAll(topics);
}
diff --git a/src/main/java/org/apache/sling/event/impl/jobs/queues/QueueManager.java b/src/main/java/org/apache/sling/event/impl/jobs/queues/QueueManager.java
index 40bfe97..8e1f44d 100644
--- a/src/main/java/org/apache/sling/event/impl/jobs/queues/QueueManager.java
+++ b/src/main/java/org/apache/sling/event/impl/jobs/queues/QueueManager.java
@@ -27,6 +27,7 @@
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentSkipListSet;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.sling.api.resource.Resource;
@@ -136,6 +137,9 @@
/** The queue services. */
private volatile QueueServices queueServices;
+ /** The set of new topics to pause. */
+ private final Set<String> haltedTopics = new ConcurrentSkipListSet<String>();
+
/**
* Activate this component.
* @param props Configuration properties
@@ -180,7 +184,7 @@
* is idle for two consecutive clean up calls, it is removed.
* @see java.lang.Runnable#run()
*/
- private void maintain() {
+ void maintain() {
this.schedulerRuns++;
logger.debug("Queue manager maintenance: Starting #{}", this.schedulerRuns);
@@ -233,6 +237,9 @@
private void start(final QueueInfo queueInfo,
final Set<String> topics) {
final InternalQueueConfiguration config = queueInfo.queueConfiguration;
+ final Set<String> filteredTopics = new HashSet<String>(topics);
+ filteredTopics.removeAll(haltedTopics);
+
// get or create queue
boolean isNewQueue = false;
JobQueueImpl queue = null;
@@ -246,7 +253,7 @@
queue = null;
}
if ( queue == null ) {
- queue = JobQueueImpl.createQueue(queueInfo.queueName, config, queueServices, topics);
+ queue = JobQueueImpl.createQueue(queueInfo.queueName, config, queueServices, filteredTopics, haltedTopics);
// on startup the queue might be empty and we get null back from createQueue
if ( queue != null ) {
isNewQueue = true;
@@ -258,7 +265,7 @@
if ( queue != null ) {
logger.debug("Starting queue {}", queueInfo.queueName);
if ( !isNewQueue ) {
- queue.wakeUpQueue(topics);
+ queue.wakeUpQueue(filteredTopics);
}
queue.startJobs();
}
diff --git a/src/test/java/org/apache/sling/event/impl/jobs/queues/TestQueueJobCache.java b/src/test/java/org/apache/sling/event/impl/jobs/queues/TestQueueJobCache.java
index a000d17..aae0c73 100644
--- a/src/test/java/org/apache/sling/event/impl/jobs/queues/TestQueueJobCache.java
+++ b/src/test/java/org/apache/sling/event/impl/jobs/queues/TestQueueJobCache.java
@@ -68,7 +68,6 @@
import org.apache.sling.testing.mock.sling.ResourceResolverType;
import org.apache.sling.testing.mock.sling.builder.ContentBuilder;
import org.junit.Before;
-import org.junit.Ignore;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.Mock;
@@ -128,6 +127,7 @@
@Before
public void setUp() throws Throwable {
+ cnfeCount.set(0);
ownSlingId = UUID.randomUUID().toString();
Environment.APPLICATION_ID = ownSlingId;
componentContext = MockOsgi.newComponentContext();
@@ -164,9 +164,20 @@
private void initQueueConfigurationManagerMocks() {
Mockito.when(queueConfigurationManager.getQueueInfo(Mockito.anyString())).thenAnswer(new Answer<QueueInfo>() {
+ private final Map<String, QueueInfo> queueInfos = new HashMap<>();
+
@Override
public QueueInfo answer(InvocationOnMock invocation) throws Throwable {
final String topic = (String) invocation.getArguments()[0];
+ QueueInfo queueInfo = queueInfos.get(topic);
+ if ( queueInfo == null ) {
+ queueInfo = createQueueInfo(topic);
+ queueInfos.put(topic, queueInfo);
+ }
+ return queueInfo;
+ }
+
+ private QueueInfo createQueueInfo(final String topic) {
final QueueInfo result = new QueueInfo();
result.queueName = "Queue for topic=" + topic;
Map<String, Object> props = new HashMap<>();
@@ -183,10 +194,37 @@
}
@Test
- @Ignore(value="nothing useful tested here yet")
public void testFullTopicScan() throws Throwable {
assertNotNull(queueManager);
- queueManager.fullTopicScan();
+ assertEquals(0, cnfeCount.get());
+ for( int i = 0; i < 50; i++ ) {
+ queueManager.fullTopicScan();
+ assertEquals(2 * jobCnt, cnfeCount.get());
+ }
+ }
+
+ @Test
+ public void testMaintain() throws Throwable {
+ assertNotNull(queueManager);
+ assertEquals(0, cnfeCount.get());
+ // configurationChanged(true) is going to do a fullTopicScan()
+ queueManager.configurationChanged(true);
+ // due to fullTopicScan -> 2 times the jobs are loaded causing a CNFE
+ int expectedCnfeCount = 2 * jobCnt;
+ assertEquals(expectedCnfeCount, cnfeCount.get());
+ for( int i = 0; i < 50; i++ ) {
+ // schedulerRuns % 3 == 1
+ queueManager.maintain();
+ assertEquals(expectedCnfeCount, cnfeCount.get());
+ // schedulerRuns % 3 == 2
+ queueManager.maintain();
+ assertEquals(expectedCnfeCount, cnfeCount.get());
+ // schedulerRuns % 3 == 0
+ queueManager.maintain();
+ // if we weren't halting the topic due to CNFE, we'd now be doing:
+ // expectedCnfeCount += 2 * jobCnt;
+ assertEquals(expectedCnfeCount, cnfeCount.get());
+ }
}
@Test
@@ -196,17 +234,12 @@
queueManager.configurationChanged(false);
assertEquals(0, cnfeCount.get());
queueManager.configurationChanged(true);
- assertEquals(4 * jobCnt, cnfeCount.get());
+ assertEquals(2 * jobCnt, cnfeCount.get());
Iterable<Queue> qit = queueManager.getQueues();
assertNotNull(qit);
Iterator<Queue> it = qit.iterator();
assertNotNull(it);
- assertTrue(it.hasNext());
- JobQueueImpl n = (JobQueueImpl) it.next();
- assertNotNull(n);
- QueueJobCache c = n.getCache();
- assertNotNull(c);
- assertFalse(c.isEmpty());
+ assertFalse(it.hasNext());
}
private Resource createJob(ContentBuilder contentBuilder, String localSlingId, String topic, int year, int month) throws ItemExistsException, PathNotFoundException, VersionException, ConstraintViolationException, LockException, RepositoryException {