Merge pull request #14 from stefan-egli/SLING-10719
SLIGN-10719 : reuse available semaphore after outdating a queue, whic…
diff --git a/src/main/java/org/apache/sling/event/impl/jobs/queues/JobQueueImpl.java b/src/main/java/org/apache/sling/event/impl/jobs/queues/JobQueueImpl.java
index be6f485..da1742c 100644
--- a/src/main/java/org/apache/sling/event/impl/jobs/queues/JobQueueImpl.java
+++ b/src/main/java/org/apache/sling/event/impl/jobs/queues/JobQueueImpl.java
@@ -107,6 +107,12 @@
/** Semaphore for handling the max number of jobs. */
private final Semaphore available;
+ /** original value of maxParallel with which the queue was created */
+ private final int maxParallel;
+
+ /** Semaphore for handling reduced number of available slots that are yet to drain */
+ private final Semaphore drainage;
+
/** Guard for having only one thread executing start jobs. */
private final AtomicBoolean startJobsGuard = new AtomicBoolean(false);
@@ -123,6 +129,7 @@
* @param config The queue configuration
* @param services The queue services
* @param topics The topics handled by this queue
+ * @param outdatedQueueInfo
* @param haltedTopics reference to pass newly halted topics back
*
* @return {@code JobQueueImpl} if there are jobs to process, {@code null} otherwise.
@@ -131,7 +138,8 @@
final InternalQueueConfiguration config,
final QueueServices services,
final Set<String> topics,
- final Set<String> haltedTopicsBackRef) {
+ final Set<String> haltedTopicsBackRef,
+ final OutdatedJobQueueInfo outdatedQueueInfo) {
final QueueJobCache cache = new QueueJobCache(services.configuration, name, services.statisticsManager, config.getType(), topics);
// the cache might contain newly halted topics.
// these we need to retain, to avoid scanning them going forward.
@@ -149,7 +157,7 @@
if ( cache.isEmpty() ) {
return null;
}
- return new JobQueueImpl(name, config, services, cache);
+ return new JobQueueImpl(name, config, services, cache, outdatedQueueInfo);
}
/**
@@ -159,11 +167,13 @@
* @param config The queue configuration
* @param services The queue services
* @param cache The job cache
+ * @param outdatedQueue
*/
private JobQueueImpl(final String name,
final InternalQueueConfiguration config,
final QueueServices services,
- final QueueJobCache cache) {
+ final QueueJobCache cache,
+ final OutdatedJobQueueInfo outdatedQueue) {
if ( config.getOwnThreadPoolSize() > 0 ) {
this.threadPool = new EventingThreadPool(services.threadPoolManager, config.getOwnThreadPoolSize());
} else {
@@ -175,7 +185,40 @@
this.logger = LoggerFactory.getLogger(this.getClass().getName() + '.' + name);
this.running = true;
this.cache = cache;
- this.available = new Semaphore(config.getMaxParallel(), true);
+ this.maxParallel = config.getMaxParallel();
+ if (outdatedQueue == null) {
+ // queue is created the first time
+ this.available = new Semaphore(this.maxParallel, true);
+ this.drainage = new Semaphore(0, true);
+ } else {
+ // queue was previously outdated - let's reuse available and drainage
+ this.available = outdatedQueue.getAvailable();
+ this.drainage = outdatedQueue.getDrainage();
+ int oldMaxParallel = outdatedQueue.getMaxParallel();
+ int maxParallelDiff = this.maxParallel - oldMaxParallel;
+ int drainedOldDrainage = 0;
+ int drainedOldAvailable = 0;
+ if (maxParallelDiff != 0) {
+ // config change
+ drainedOldDrainage = this.drainage.drainPermits();
+ drainedOldAvailable = this.available.drainPermits();
+ int netNewPermits = drainedOldAvailable - drainedOldDrainage + maxParallelDiff;
+ if (netNewPermits > 0) {
+ this.available.release(netNewPermits);
+ } else if (netNewPermits < 0) {
+ // special case : maxparallel got reduced since last outdating,
+ // resulting in effectively negative number of currently available permits.
+ // to account for that, jobs try to drain first before re-adding to available
+ // to trigger this behaviour, releasing the permit-diff to drainage
+ this.drainage.release(-netNewPermits);
+ }
+ }
+ logger.info("<init> reused outdated queue info: queueName : {}"
+ + ", old available : {}, old drainage : {}, old maxParallel : {}"
+ + ", new available : {}, new drainage : {}, new maxParallel : {}",
+ queueName, drainedOldAvailable, drainedOldDrainage, oldMaxParallel,
+ available.availablePermits(), drainage.availablePermits(), this.maxParallel);
+ }
logger.info("Starting job queue {}", queueName);
logger.debug("Configuration for job queue={}", configuration);
}
@@ -345,7 +388,18 @@
this.logger.error("Exception during job processing.", re);
}
} finally {
- this.available.release();
+ // try draining first
+ if (this.drainage.tryAcquire()) {
+ // special case : if drainage is used, this means maxparallel
+ // got reconfigured and we are not releasing a permit to
+ // available here, but instead reduce drainage.
+ final int approxPermits = this.drainage.availablePermits();
+ this.logger.debug("startJobHandler: drained 1 permit for {}, approx left to drain: {}",
+ queueName, approxPermits);
+ } else {
+ // otherwise release as usual
+ this.available.release();
+ }
}
}
@@ -731,5 +785,9 @@
QueueJobCache getCache() {
return cache;
}
+
+ OutdatedJobQueueInfo getOutdatedJobQueueInfo() {
+ return new OutdatedJobQueueInfo(available, maxParallel, drainage);
+ }
}
diff --git a/src/main/java/org/apache/sling/event/impl/jobs/queues/OutdatedJobQueueInfo.java b/src/main/java/org/apache/sling/event/impl/jobs/queues/OutdatedJobQueueInfo.java
new file mode 100644
index 0000000..dd6d4fa
--- /dev/null
+++ b/src/main/java/org/apache/sling/event/impl/jobs/queues/OutdatedJobQueueInfo.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sling.event.impl.jobs.queues;
+
+import java.util.concurrent.Semaphore;
+
+/**
+ * Encapsulates data required to survive queue outdating
+ */
+class OutdatedJobQueueInfo {
+
+ private final Semaphore available;
+ private final int maxParallel;
+ private final Semaphore drainage;
+
+ OutdatedJobQueueInfo(Semaphore available, int maxParallel, Semaphore drainage) {
+ this.available = available;
+ this.maxParallel = maxParallel;
+ this.drainage = drainage;
+ }
+
+ final Semaphore getAvailable() {
+ return available;
+ }
+
+ final int getMaxParallel() {
+ return maxParallel;
+ }
+
+ final Semaphore getDrainage() {
+ return drainage;
+ }
+}
diff --git a/src/main/java/org/apache/sling/event/impl/jobs/queues/QueueManager.java b/src/main/java/org/apache/sling/event/impl/jobs/queues/QueueManager.java
index 4336ad5..4485463 100644
--- a/src/main/java/org/apache/sling/event/impl/jobs/queues/QueueManager.java
+++ b/src/main/java/org/apache/sling/event/impl/jobs/queues/QueueManager.java
@@ -128,6 +128,10 @@
/** All active queues. */
private final Map<String, JobQueueImpl> queues = new ConcurrentHashMap<>();
+ /** upon outdating a queue its available etc semaphores are parked here,
+ * to handle long-running jobs vs topology changes properly */
+ private final Map<String, OutdatedJobQueueInfo> outdatedQueues = new ConcurrentHashMap<>();
+
/** We count the scheduler runs. */
private volatile long schedulerRuns;
@@ -253,7 +257,8 @@
queue = null;
}
if ( queue == null ) {
- queue = JobQueueImpl.createQueue(queueInfo.queueName, config, queueServices, filteredTopics, haltedTopics);
+ final OutdatedJobQueueInfo outdatedQueueInfo = outdatedQueues.get(queueInfo.queueName);
+ queue = JobQueueImpl.createQueue(queueInfo.queueName, config, queueServices, filteredTopics, haltedTopics, outdatedQueueInfo);
// on startup the queue might be empty and we get null back from createQueue
if ( queue != null ) {
isNewQueue = true;
@@ -285,7 +290,10 @@
// remove the queue with the old name
// check for main queue
final String oldName = ResourceHelper.filterQueueName(queue.getName());
- this.queues.remove(oldName);
+ final JobQueueImpl oldQueue = this.queues.remove(oldName);
+ if (oldQueue != null) {
+ outdatedQueues.put(oldName, oldQueue.getOutdatedJobQueueInfo());
+ }
// check if we can close or have to rename
if ( queue.tryToClose() ) {
// copy statistics
diff --git a/src/test/java/org/apache/sling/event/impl/discovery/InitDelayingTopologyEventListenerTest.java b/src/test/java/org/apache/sling/event/impl/discovery/InitDelayingTopologyEventListenerTest.java
index 83a3ac4..b8f4772 100644
--- a/src/test/java/org/apache/sling/event/impl/discovery/InitDelayingTopologyEventListenerTest.java
+++ b/src/test/java/org/apache/sling/event/impl/discovery/InitDelayingTopologyEventListenerTest.java
@@ -299,7 +299,7 @@
@Test
public void testProperties() throws Exception {
final TestListener delegate = new TestListener();
- InitDelayingTopologyEventListener listener = new InitDelayingTopologyEventListener(1, delegate);
+ InitDelayingTopologyEventListener listener = new InitDelayingTopologyEventListener(2, delegate);
listener.handleTopologyEvent(createEvent(Type.TOPOLOGY_INIT));
listener.handleTopologyEvent(createEvent(Type.TOPOLOGY_CHANGING));
listener.handleTopologyEvent(createEvent(Type.TOPOLOGY_CHANGED));
diff --git a/src/test/java/org/apache/sling/event/it/AbstractJobHandlingTest.java b/src/test/java/org/apache/sling/event/it/AbstractJobHandlingTest.java
index e8bc0ac..47931c6 100644
--- a/src/test/java/org/apache/sling/event/it/AbstractJobHandlingTest.java
+++ b/src/test/java/org/apache/sling/event/it/AbstractJobHandlingTest.java
@@ -112,7 +112,7 @@
return options(
newConfiguration("org.apache.sling.event.impl.jobs.jcr.PersistenceHandler")
- .put(JobManagerConfiguration.PROPERTY_BACKGROUND_LOAD_DELAY, 3L)
+ .put(JobManagerConfiguration.PROPERTY_BACKGROUND_LOAD_DELAY, backgroundLoadDelay())
.put("startup.delay", 1L)
.asOption(),
baseConfiguration(),
@@ -145,6 +145,10 @@
);
}
+ long backgroundLoadDelay() {
+ return 3L;
+ }
+
protected ModifiableCompositeOption baseConfiguration() {
return composite(
failOnUnresolvedBundles(),
diff --git a/src/test/java/org/apache/sling/event/it/AbstractMaxParallelTest.java b/src/test/java/org/apache/sling/event/it/AbstractMaxParallelTest.java
new file mode 100644
index 0000000..cf3ec2f
--- /dev/null
+++ b/src/test/java/org/apache/sling/event/it/AbstractMaxParallelTest.java
@@ -0,0 +1,311 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sling.event.it;
+
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.sling.discovery.TopologyEvent;
+import org.apache.sling.discovery.TopologyEvent.Type;
+import org.apache.sling.discovery.TopologyEventListener;
+import org.apache.sling.discovery.TopologyView;
+import org.apache.sling.event.jobs.Job;
+import org.apache.sling.event.jobs.JobManager;
+import org.apache.sling.event.jobs.NotificationConstants;
+import org.apache.sling.event.jobs.consumer.JobConsumer;
+import org.junit.After;
+import org.osgi.framework.InvalidSyntaxException;
+import org.osgi.framework.ServiceReference;
+import org.osgi.framework.ServiceRegistration;
+import org.osgi.service.event.Event;
+import org.osgi.service.event.EventHandler;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public abstract class AbstractMaxParallelTest extends AbstractJobHandlingTest {
+
+ private final Logger log = LoggerFactory.getLogger(this.getClass());
+
+ private static final int BACKGROUND_LOAD_DELAY_SECONDS = 1;
+
+ private static final int EXTRA_CHAOS_DURATION_SECONDS = 20;
+
+ private static final int UNKNOWN_TOPOLOGY_FACTOR_MILLIS = 15;//100;
+
+ private static final int STABLE_TOPOLOGY_FACTOR_MILLIS = 40;//300;
+
+ static final String TOPIC_PREFIX = "sling/maxparallel/";
+
+ static final String TOPIC_NAME = TOPIC_PREFIX + "zero";
+
+ private final Object syncObj = new Object();
+
+ protected int max = -1;
+
+ @Override
+ long backgroundLoadDelay() {
+ return BACKGROUND_LOAD_DELAY_SECONDS;
+ }
+
+ @Override
+ @After
+ public void cleanup() {
+ super.cleanup();
+ }
+
+ private void registerMax(int cnt) {
+ synchronized(syncObj) {
+ max = Math.max(max, cnt);
+ }
+ }
+
+ /**
+ * Setup consumers
+ */
+ private void setupJobConsumers(long jobRunMillis) {
+ this.registerJobConsumer(TOPIC_NAME,
+
+ new JobConsumer() {
+
+ private AtomicInteger cnt = new AtomicInteger(0);
+
+ @Override
+ public JobResult process(final Job job) {
+ int c = cnt.incrementAndGet();
+ registerMax(c);
+ log.info("process : start delaying. count=" + c + ", id="+ job.getId());
+ try {
+ Thread.sleep(jobRunMillis);
+ } catch (InterruptedException e) {
+ // TODO Auto-generated catch block
+ e.printStackTrace();
+ }
+ log.info("process : done delaying. count=" + c + ", id="+ job.getId());
+ cnt.decrementAndGet();
+ return JobResult.OK;
+ }
+ });
+ }
+
+ private static final class CreateJobThread extends Thread {
+
+ private final Logger log = LoggerFactory.getLogger(this.getClass());
+
+ private final JobManager jobManager;
+
+ final AtomicLong finishedThreads;
+
+ private final Map<String, AtomicLong> created;
+
+ private final int numJobs;
+
+ public CreateJobThread(final JobManager jobManager,
+ Map<String, AtomicLong> created,
+ final AtomicLong finishedThreads,
+ int numJobs) {
+ this.jobManager = jobManager;
+ this.created = created;
+ this.finishedThreads = finishedThreads;
+ this.numJobs = numJobs;
+ }
+
+ @Override
+ public void run() {
+ AtomicInteger cnt = new AtomicInteger(0);
+ for(int i=0; i<numJobs; i++) {
+ final int c = cnt.incrementAndGet();
+ log.info("run: creating job " + c + " on topic " + TOPIC_NAME);
+ if (jobManager.addJob(TOPIC_NAME, null) != null) {
+ created.get(TOPIC_NAME).incrementAndGet();
+ }
+ }
+ finishedThreads.incrementAndGet();
+ }
+
+ }
+
+ /**
+ * Setup chaos thread(s)
+ *
+ * Chaos is right now created by sending topology changing/changed events randomly
+ */
+ private void setupChaosThreads(final List<Thread> threads,
+ final AtomicLong finishedThreads, long duration) {
+ final List<TopologyView> views = new ArrayList<>();
+ // register topology listener
+ final ServiceRegistration<TopologyEventListener> reg = this.bc.registerService(TopologyEventListener.class, new TopologyEventListener() {
+
+ @Override
+ public void handleTopologyEvent(final TopologyEvent event) {
+ if ( event.getType() == Type.TOPOLOGY_INIT ) {
+ views.add(event.getNewView());
+ }
+ }
+ }, null);
+ while ( views.isEmpty() ) {
+ this.sleep(10);
+ }
+ reg.unregister();
+ final TopologyView view = views.get(0);
+
+ try {
+ final Collection<ServiceReference<TopologyEventListener>> refs = this.bc.getServiceReferences(TopologyEventListener.class, null);
+ assertNotNull(refs);
+ assertFalse(refs.isEmpty());
+ TopologyEventListener found = null;
+ for(final ServiceReference<TopologyEventListener> ref : refs) {
+ final TopologyEventListener listener = this.bc.getService(ref);
+ if ( listener != null && listener.getClass().getName().equals("org.apache.sling.event.impl.jobs.config.TopologyHandler") ) {
+ found = listener;
+ break;
+ }
+ bc.ungetService(ref);
+ }
+ assertNotNull(found);
+ final TopologyEventListener tel = found;
+ log.info("setupChaosThreads : simulating TOPOLOGY_INIT");
+ tel.handleTopologyEvent(new TopologyEvent(Type.TOPOLOGY_INIT, null, view));
+
+ threads.add(new Thread() {
+
+ private final Random random = new Random();
+
+ @Override
+ public void run() {
+ final long startTime = System.currentTimeMillis();
+ // this thread runs 30 seconds longer than the job creation thread
+ final long endTime = startTime + (duration +EXTRA_CHAOS_DURATION_SECONDS) * 1000;
+ while ( System.currentTimeMillis() < endTime ) {
+ final int sleepTime = random.nextInt(25) + 15;
+ try {
+ Thread.sleep(sleepTime * STABLE_TOPOLOGY_FACTOR_MILLIS);
+ } catch ( final InterruptedException ie) {
+ Thread.currentThread().interrupt();
+ }
+ log.info("setupChaosThreads : simulating TOPOLOGY_CHANGING");
+ tel.handleTopologyEvent(new TopologyEvent(Type.TOPOLOGY_CHANGING, view, null));
+ final int changingTime = random.nextInt(20) + 3;
+ try {
+ Thread.sleep(changingTime * UNKNOWN_TOPOLOGY_FACTOR_MILLIS);
+ } catch ( final InterruptedException ie) {
+ Thread.currentThread().interrupt();
+ }
+ log.info("setupChaosThreads : simulating TOPOLOGY_CHANGED");
+ tel.handleTopologyEvent(new TopologyEvent(Type.TOPOLOGY_CHANGED, view, view));
+ }
+ tel.getClass().getName();
+ finishedThreads.incrementAndGet();
+ }
+ });
+ } catch (InvalidSyntaxException e) {
+ e.printStackTrace();
+ }
+ }
+
+ void doTestMaxParallel(int numJobs, long jobRunMillis, long duration) throws Exception {
+ final JobManager jobManager = this.getJobManager();
+
+ final Map<String, AtomicLong> added = new HashMap<>();
+ final Map<String, AtomicLong> created = new HashMap<>();
+ final Map<String, AtomicLong> finished = new HashMap<>();
+ final List<String> topics = new ArrayList<>();
+ added.put(TOPIC_NAME, new AtomicLong());
+ created.put(TOPIC_NAME, new AtomicLong());
+ finished.put(TOPIC_NAME, new AtomicLong());
+ topics.add(TOPIC_NAME);
+
+ final List<Thread> threads = new ArrayList<>();
+ final AtomicLong finishedThreads = new AtomicLong();
+
+ this.registerEventHandler("org/apache/sling/event/notification/job/*",
+ new EventHandler() {
+
+ @Override
+ public void handleEvent(final Event event) {
+ final String topic = (String) event.getProperty(NotificationConstants.NOTIFICATION_PROPERTY_JOB_TOPIC);
+ if ( NotificationConstants.TOPIC_JOB_FINISHED.equals(event.getTopic())) {
+ finished.get(topic).incrementAndGet();
+ } else if ( NotificationConstants.TOPIC_JOB_ADDED.equals(event.getTopic())) {
+ added.get(topic).incrementAndGet();
+ }
+ }
+ });
+
+ // setup job consumers
+ this.setupJobConsumers(jobRunMillis);
+
+ // setup job creation tests
+ new CreateJobThread(jobManager, created, finishedThreads, numJobs).start();
+
+ // wait until 1 job is being processed
+ log.info("doTestMaxParallel : waiting until 1 job is being processed");
+ while ( max <= 0 ) {
+ this.sleep(100);
+ }
+ log.info("doTestMaxParallel : 1 job was processed, ready to go. max=" + max);
+
+ this.setupChaosThreads(threads, finishedThreads, duration);
+
+ log.info("doTestMaxParallel : starting threads (" + threads.size() + ")");
+ // start threads
+ for(final Thread t : threads) {
+ t.setDaemon(true);
+ t.start();
+ }
+
+ log.info("doTestMaxParallel: sleeping for " + duration + " seconds to wait for threads to finish...");
+ // for sure we can sleep for the duration
+ this.sleep(duration * 1000);
+
+ log.info("doTestMaxParallel: polling for threads to finish...");
+ // wait until threads are finished
+ while ( finishedThreads.get() < threads.size() ) {
+ this.sleep(100);
+ }
+
+ final Set<String> allTopics = new HashSet<>(topics);
+ log.info("doTestMaxParallel: waiting for job handling to finish... " + allTopics.size());
+ while ( !allTopics.isEmpty() ) {
+ final Iterator<String> iter = allTopics.iterator();
+ while ( iter.hasNext() ) {
+ final String topic = iter.next();
+ log.info("doTestMaxParallel: checking topic= " + topic +
+ ", finished=" + finished.get(topic).get() + ", created=" + created.get(topic).get());
+ if ( finished.get(topic).get() == created.get(topic).get() ) {
+ iter.remove();
+ }
+ }
+ log.info("doTestMaxParallel: waiting for job handling to finish... " + allTopics.size());
+ this.sleep(1000);
+ }
+ log.info("doTestMaxParallel: done.");
+ }
+}
diff --git a/src/test/java/org/apache/sling/event/it/OrderedMaxParallelTest.java b/src/test/java/org/apache/sling/event/it/OrderedMaxParallelTest.java
new file mode 100644
index 0000000..03085cd
--- /dev/null
+++ b/src/test/java/org/apache/sling/event/it/OrderedMaxParallelTest.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sling.event.it;
+
+import static org.junit.Assert.assertEquals;
+
+import java.io.IOException;
+import java.util.Dictionary;
+import java.util.Hashtable;
+
+import org.apache.sling.event.impl.jobs.config.ConfigurationConstants;
+import org.apache.sling.event.jobs.QueueConfiguration;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.ops4j.pax.exam.junit.PaxExam;
+
+@RunWith(PaxExam.class)
+public class OrderedMaxParallelTest extends AbstractMaxParallelTest {
+
+ private static final int DURATION = 40;
+
+ @Override
+ @Before
+ public void setup() throws IOException {
+ super.setup();
+
+ // create ordered test queue
+ final org.osgi.service.cm.Configuration orderedConfig = this.configAdmin.createFactoryConfiguration("org.apache.sling.event.jobs.QueueConfiguration", null);
+ final Dictionary<String, Object> orderedProps = new Hashtable<>();
+ orderedProps.put(ConfigurationConstants.PROP_NAME, "ordered-max-parallel");
+ orderedProps.put(ConfigurationConstants.PROP_TYPE, QueueConfiguration.Type.ORDERED.name());
+ orderedProps.put(ConfigurationConstants.PROP_TOPICS, TOPIC_NAME);
+ orderedProps.put(ConfigurationConstants.PROP_RETRIES, 2);
+ orderedProps.put(ConfigurationConstants.PROP_RETRY_DELAY, 2000L);
+ orderedProps.put(ConfigurationConstants.PROP_MAX_PARALLEL, 1);
+ orderedConfig.update(orderedProps);
+
+ this.sleep(1000L);
+ }
+
+ @Test(timeout=DURATION * 16000L)
+ public void testOrderedMaxParallel_slow() throws Exception {
+ doTestMaxParallel(12, 1717, DURATION);
+
+ assertEquals(1, max);
+ }
+
+ @Test(timeout=DURATION * 16000L)
+ public void testOrderedMaxParallel2_fast() throws Exception {
+ doTestMaxParallel(50, 123, DURATION);
+
+ assertEquals(1, max);
+ }
+}
diff --git a/src/test/java/org/apache/sling/event/it/RoundRobinMaxParallelTest.java b/src/test/java/org/apache/sling/event/it/RoundRobinMaxParallelTest.java
new file mode 100644
index 0000000..7e5531b
--- /dev/null
+++ b/src/test/java/org/apache/sling/event/it/RoundRobinMaxParallelTest.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sling.event.it;
+
+import static org.junit.Assert.assertTrue;
+
+import java.io.IOException;
+import java.util.Dictionary;
+import java.util.Hashtable;
+
+import org.apache.sling.event.impl.jobs.config.ConfigurationConstants;
+import org.apache.sling.event.jobs.QueueConfiguration;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.ops4j.pax.exam.junit.PaxExam;
+
+@RunWith(PaxExam.class)
+public class RoundRobinMaxParallelTest extends AbstractMaxParallelTest {
+
+ private static final int MAX_PARALLEL = 3;
+
+ private static final int DURATION = 50;
+
+ @Override
+ @Before
+ public void setup() throws IOException {
+ super.setup();
+
+ // create ordered test queue
+ final org.osgi.service.cm.Configuration orderedConfig = this.configAdmin.createFactoryConfiguration("org.apache.sling.event.jobs.QueueConfiguration", null);
+ final Dictionary<String, Object> orderedProps = new Hashtable<>();
+ orderedProps.put(ConfigurationConstants.PROP_NAME, "round-robin-max-parallel");
+ orderedProps.put(ConfigurationConstants.PROP_TYPE, QueueConfiguration.Type.TOPIC_ROUND_ROBIN.name());
+ orderedProps.put(ConfigurationConstants.PROP_TOPICS, TOPIC_NAME);
+ orderedProps.put(ConfigurationConstants.PROP_RETRIES, 2);
+ orderedProps.put(ConfigurationConstants.PROP_RETRY_DELAY, 2000L);
+ orderedProps.put(ConfigurationConstants.PROP_MAX_PARALLEL, MAX_PARALLEL);
+ orderedConfig.update(orderedProps);
+
+ this.sleep(1000L);
+ }
+
+ @Test(timeout=DURATION * 16000L)
+ public void testRoundRobinMaxParallel_slow() throws Exception {
+ doTestMaxParallel(20, 1717, DURATION);
+
+ assertTrue(max <= MAX_PARALLEL);
+ }
+
+ @Test(timeout=DURATION * 16000L)
+ public void testRoundRobinMaxParallel_fast() throws Exception {
+ doTestMaxParallel(200, 123, DURATION);
+
+ assertTrue(max <= MAX_PARALLEL);
+ }
+}