Merge pull request #14 from stefan-egli/SLING-10719

SLIGN-10719 : reuse available semaphore after outdating a queue, whic…
diff --git a/src/main/java/org/apache/sling/event/impl/jobs/queues/JobQueueImpl.java b/src/main/java/org/apache/sling/event/impl/jobs/queues/JobQueueImpl.java
index be6f485..da1742c 100644
--- a/src/main/java/org/apache/sling/event/impl/jobs/queues/JobQueueImpl.java
+++ b/src/main/java/org/apache/sling/event/impl/jobs/queues/JobQueueImpl.java
@@ -107,6 +107,12 @@
     /** Semaphore for handling the max number of jobs. */
     private final Semaphore available;
 
+    /** original value of maxParallel with which the queue was created */
+    private final int maxParallel;
+
+    /** Semaphore for handling reduced number of available slots that are yet to drain */
+    private final Semaphore drainage;
+
     /** Guard for having only one thread executing start jobs. */
     private final AtomicBoolean startJobsGuard = new AtomicBoolean(false);
 
@@ -123,6 +129,7 @@
      * @param config The queue configuration
      * @param services The queue services
      * @param topics The topics handled by this queue
+     * @param outdatedQueueInfo
      * @param haltedTopics reference to pass newly halted topics back
      *
      * @return {@code JobQueueImpl} if there are jobs to process, {@code null} otherwise.
@@ -131,7 +138,8 @@
                         final InternalQueueConfiguration config,
                         final QueueServices services,
                         final Set<String> topics,
-                        final Set<String> haltedTopicsBackRef) {
+                        final Set<String> haltedTopicsBackRef,
+                        final OutdatedJobQueueInfo outdatedQueueInfo) {
         final QueueJobCache cache = new QueueJobCache(services.configuration, name, services.statisticsManager, config.getType(), topics);
         // the cache might contain newly halted topics.
         // these we need to retain, to avoid scanning them going forward.
@@ -149,7 +157,7 @@
         if ( cache.isEmpty() ) {
             return null;
         }
-        return new JobQueueImpl(name, config, services, cache);
+        return new JobQueueImpl(name, config, services, cache, outdatedQueueInfo);
     }
 
     /**
@@ -159,11 +167,13 @@
      * @param config The queue configuration
      * @param services The queue services
      * @param cache The job cache
+     * @param outdatedQueue
      */
     private JobQueueImpl(final String name,
                         final InternalQueueConfiguration config,
                         final QueueServices services,
-                        final QueueJobCache cache) {
+                        final QueueJobCache cache,
+                        final OutdatedJobQueueInfo outdatedQueue) {
         if ( config.getOwnThreadPoolSize() > 0 ) {
             this.threadPool = new EventingThreadPool(services.threadPoolManager, config.getOwnThreadPoolSize());
         } else {
@@ -175,7 +185,40 @@
         this.logger = LoggerFactory.getLogger(this.getClass().getName() + '.' + name);
         this.running = true;
         this.cache = cache;
-        this.available = new Semaphore(config.getMaxParallel(), true);
+        this.maxParallel = config.getMaxParallel();
+        if (outdatedQueue == null) {
+            // queue is created the first time
+            this.available = new Semaphore(this.maxParallel, true);
+            this.drainage = new Semaphore(0, true);
+        } else {
+            // queue was previously outdated - let's reuse available and drainage
+            this.available = outdatedQueue.getAvailable();
+            this.drainage = outdatedQueue.getDrainage();
+            int oldMaxParallel = outdatedQueue.getMaxParallel();
+            int maxParallelDiff = this.maxParallel - oldMaxParallel;
+            int drainedOldDrainage = 0;
+            int drainedOldAvailable = 0;
+            if (maxParallelDiff != 0) {
+                // config change
+                drainedOldDrainage = this.drainage.drainPermits();
+                drainedOldAvailable = this.available.drainPermits();
+                int netNewPermits = drainedOldAvailable - drainedOldDrainage + maxParallelDiff;
+                if (netNewPermits > 0) {
+                    this.available.release(netNewPermits);
+                } else if (netNewPermits < 0) {
+                    // special case : maxparallel got reduced since last outdating,
+                    // resulting in effectively negative number of currently available permits.
+                    // to account for that, jobs try to drain first before re-adding to available
+                    // to trigger this behaviour, releasing the permit-diff to drainage
+                    this.drainage.release(-netNewPermits);
+                }
+            }
+            logger.info("<init> reused outdated queue info: queueName : {}"
+                    + ", old available : {}, old drainage : {}, old maxParallel : {}"
+                    + ", new available : {}, new drainage : {}, new maxParallel : {}",
+                    queueName, drainedOldAvailable, drainedOldDrainage, oldMaxParallel,
+                    available.availablePermits(), drainage.availablePermits(), this.maxParallel);
+        }
         logger.info("Starting job queue {}", queueName);
         logger.debug("Configuration for job queue={}", configuration);
     }
@@ -345,7 +388,18 @@
                 this.logger.error("Exception during job processing.", re);
             }
         } finally {
-            this.available.release();
+            // try draining first
+            if (this.drainage.tryAcquire()) {
+                // special case : if drainage is used, this means maxparallel
+                // got reconfigured and we are not releasing a permit to
+                // available here, but instead reduce drainage.
+                final int approxPermits = this.drainage.availablePermits();
+                this.logger.debug("startJobHandler: drained 1 permit for {}, approx left to drain: {}",
+                        queueName, approxPermits);
+            } else {
+                // otherwise release as usual
+                this.available.release();
+            }
         }
     }
 
@@ -731,5 +785,9 @@
     QueueJobCache getCache() {
         return cache;
     }
+
+    OutdatedJobQueueInfo getOutdatedJobQueueInfo() {
+        return new OutdatedJobQueueInfo(available, maxParallel, drainage);
+    }
 }
 
diff --git a/src/main/java/org/apache/sling/event/impl/jobs/queues/OutdatedJobQueueInfo.java b/src/main/java/org/apache/sling/event/impl/jobs/queues/OutdatedJobQueueInfo.java
new file mode 100644
index 0000000..dd6d4fa
--- /dev/null
+++ b/src/main/java/org/apache/sling/event/impl/jobs/queues/OutdatedJobQueueInfo.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sling.event.impl.jobs.queues;
+
+import java.util.concurrent.Semaphore;
+
+/**
+ * Encapsulates data required to survive queue outdating
+ */
+class OutdatedJobQueueInfo {
+
+    private final Semaphore available;
+    private final int maxParallel;
+    private final Semaphore drainage;
+
+    OutdatedJobQueueInfo(Semaphore available, int maxParallel, Semaphore drainage) {
+        this.available = available;
+        this.maxParallel = maxParallel;
+        this.drainage = drainage;
+    }
+
+    final Semaphore getAvailable() {
+        return available;
+    }
+
+    final int getMaxParallel() {
+        return maxParallel;
+    }
+
+    final Semaphore getDrainage() {
+        return drainage;
+    }
+}
diff --git a/src/main/java/org/apache/sling/event/impl/jobs/queues/QueueManager.java b/src/main/java/org/apache/sling/event/impl/jobs/queues/QueueManager.java
index 4336ad5..4485463 100644
--- a/src/main/java/org/apache/sling/event/impl/jobs/queues/QueueManager.java
+++ b/src/main/java/org/apache/sling/event/impl/jobs/queues/QueueManager.java
@@ -128,6 +128,10 @@
     /** All active queues. */
     private final Map<String, JobQueueImpl> queues = new ConcurrentHashMap<>();
 
+    /** upon outdating a queue its available etc semaphores are parked here,
+     * to handle long-running jobs vs topology changes properly */
+    private final Map<String, OutdatedJobQueueInfo> outdatedQueues = new ConcurrentHashMap<>();
+
     /** We count the scheduler runs. */
     private volatile long schedulerRuns;
 
@@ -253,7 +257,8 @@
                 queue = null;
             }
             if ( queue == null ) {
-                queue = JobQueueImpl.createQueue(queueInfo.queueName, config, queueServices, filteredTopics, haltedTopics);
+                final OutdatedJobQueueInfo outdatedQueueInfo = outdatedQueues.get(queueInfo.queueName);
+                queue = JobQueueImpl.createQueue(queueInfo.queueName, config, queueServices, filteredTopics, haltedTopics, outdatedQueueInfo);
                 // on startup the queue might be empty and we get null back from createQueue
                 if ( queue != null ) {
                     isNewQueue = true;
@@ -285,7 +290,10 @@
         // remove the queue with the old name
         // check for main queue
         final String oldName = ResourceHelper.filterQueueName(queue.getName());
-        this.queues.remove(oldName);
+        final JobQueueImpl oldQueue = this.queues.remove(oldName);
+        if (oldQueue != null) {
+            outdatedQueues.put(oldName, oldQueue.getOutdatedJobQueueInfo());
+        }
         // check if we can close or have to rename
         if ( queue.tryToClose() ) {
             // copy statistics
diff --git a/src/test/java/org/apache/sling/event/impl/discovery/InitDelayingTopologyEventListenerTest.java b/src/test/java/org/apache/sling/event/impl/discovery/InitDelayingTopologyEventListenerTest.java
index 83a3ac4..b8f4772 100644
--- a/src/test/java/org/apache/sling/event/impl/discovery/InitDelayingTopologyEventListenerTest.java
+++ b/src/test/java/org/apache/sling/event/impl/discovery/InitDelayingTopologyEventListenerTest.java
@@ -299,7 +299,7 @@
     @Test
     public void testProperties() throws Exception {
         final TestListener delegate = new TestListener();
-        InitDelayingTopologyEventListener listener = new InitDelayingTopologyEventListener(1, delegate);
+        InitDelayingTopologyEventListener listener = new InitDelayingTopologyEventListener(2, delegate);
         listener.handleTopologyEvent(createEvent(Type.TOPOLOGY_INIT));
         listener.handleTopologyEvent(createEvent(Type.TOPOLOGY_CHANGING));
         listener.handleTopologyEvent(createEvent(Type.TOPOLOGY_CHANGED));
diff --git a/src/test/java/org/apache/sling/event/it/AbstractJobHandlingTest.java b/src/test/java/org/apache/sling/event/it/AbstractJobHandlingTest.java
index e8bc0ac..47931c6 100644
--- a/src/test/java/org/apache/sling/event/it/AbstractJobHandlingTest.java
+++ b/src/test/java/org/apache/sling/event/it/AbstractJobHandlingTest.java
@@ -112,7 +112,7 @@
 
         return options(
                 newConfiguration("org.apache.sling.event.impl.jobs.jcr.PersistenceHandler")
-                    .put(JobManagerConfiguration.PROPERTY_BACKGROUND_LOAD_DELAY, 3L)
+                    .put(JobManagerConfiguration.PROPERTY_BACKGROUND_LOAD_DELAY, backgroundLoadDelay())
                     .put("startup.delay", 1L)
                     .asOption(),
                 baseConfiguration(),
@@ -145,6 +145,10 @@
            );
     }
 
+    long backgroundLoadDelay() {
+        return 3L;
+    }
+
     protected ModifiableCompositeOption baseConfiguration() {
         return composite(
             failOnUnresolvedBundles(),
diff --git a/src/test/java/org/apache/sling/event/it/AbstractMaxParallelTest.java b/src/test/java/org/apache/sling/event/it/AbstractMaxParallelTest.java
new file mode 100644
index 0000000..cf3ec2f
--- /dev/null
+++ b/src/test/java/org/apache/sling/event/it/AbstractMaxParallelTest.java
@@ -0,0 +1,311 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sling.event.it;
+
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.sling.discovery.TopologyEvent;
+import org.apache.sling.discovery.TopologyEvent.Type;
+import org.apache.sling.discovery.TopologyEventListener;
+import org.apache.sling.discovery.TopologyView;
+import org.apache.sling.event.jobs.Job;
+import org.apache.sling.event.jobs.JobManager;
+import org.apache.sling.event.jobs.NotificationConstants;
+import org.apache.sling.event.jobs.consumer.JobConsumer;
+import org.junit.After;
+import org.osgi.framework.InvalidSyntaxException;
+import org.osgi.framework.ServiceReference;
+import org.osgi.framework.ServiceRegistration;
+import org.osgi.service.event.Event;
+import org.osgi.service.event.EventHandler;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public abstract class AbstractMaxParallelTest extends AbstractJobHandlingTest {
+
+    private final Logger log = LoggerFactory.getLogger(this.getClass());
+
+    private static final int BACKGROUND_LOAD_DELAY_SECONDS = 1;
+
+    private static final int EXTRA_CHAOS_DURATION_SECONDS = 20;
+
+    private static final int UNKNOWN_TOPOLOGY_FACTOR_MILLIS = 15;//100;
+
+    private static final int STABLE_TOPOLOGY_FACTOR_MILLIS = 40;//300;
+
+    static final String TOPIC_PREFIX = "sling/maxparallel/";
+
+    static final String TOPIC_NAME = TOPIC_PREFIX + "zero";
+
+    private final Object syncObj = new Object();
+
+    protected int max = -1;
+
+    @Override
+    long backgroundLoadDelay() {
+        return BACKGROUND_LOAD_DELAY_SECONDS;
+    }
+
+    @Override
+    @After
+    public void cleanup() {
+        super.cleanup();
+    }
+
+    private void registerMax(int cnt) {
+        synchronized(syncObj) {
+            max = Math.max(max, cnt);
+        }
+    }
+
+    /**
+     * Setup consumers
+     */
+    private void setupJobConsumers(long jobRunMillis) {
+        this.registerJobConsumer(TOPIC_NAME,
+
+            new JobConsumer() {
+
+                private AtomicInteger cnt = new AtomicInteger(0);
+
+                @Override
+                public JobResult process(final Job job) {
+                    int c = cnt.incrementAndGet();
+                    registerMax(c);
+                    log.info("process : start delaying. count=" + c + ", id="+ job.getId());
+                    try {
+                        Thread.sleep(jobRunMillis);
+                    } catch (InterruptedException e) {
+                        // TODO Auto-generated catch block
+                        e.printStackTrace();
+                    }
+                    log.info("process : done delaying. count=" + c + ", id="+ job.getId());
+                    cnt.decrementAndGet();
+                    return JobResult.OK;
+                }
+            });
+    }
+
+    private static final class CreateJobThread extends Thread {
+
+        private final Logger log = LoggerFactory.getLogger(this.getClass());
+
+        private final JobManager jobManager;
+
+        final AtomicLong finishedThreads;
+
+        private final Map<String, AtomicLong> created;
+
+        private final int numJobs;
+
+        public CreateJobThread(final JobManager jobManager,
+                Map<String, AtomicLong> created,
+                final AtomicLong finishedThreads,
+                int numJobs) {
+            this.jobManager = jobManager;
+            this.created = created;
+            this.finishedThreads = finishedThreads;
+            this.numJobs = numJobs;
+        }
+
+        @Override
+        public void run() {
+            AtomicInteger cnt = new AtomicInteger(0);
+            for(int i=0; i<numJobs; i++) {
+                final int c = cnt.incrementAndGet();
+                log.info("run: creating job " + c + " on topic " + TOPIC_NAME);
+                if (jobManager.addJob(TOPIC_NAME, null) != null) {
+                    created.get(TOPIC_NAME).incrementAndGet();
+                }
+            }
+            finishedThreads.incrementAndGet();
+        }
+
+    }
+
+    /**
+     * Setup chaos thread(s)
+     *
+     * Chaos is right now created by sending topology changing/changed events randomly
+     */
+    private void setupChaosThreads(final List<Thread> threads,
+            final AtomicLong finishedThreads, long duration) {
+        final List<TopologyView> views = new ArrayList<>();
+        // register topology listener
+        final ServiceRegistration<TopologyEventListener> reg = this.bc.registerService(TopologyEventListener.class, new TopologyEventListener() {
+
+            @Override
+            public void handleTopologyEvent(final TopologyEvent event) {
+                if ( event.getType() == Type.TOPOLOGY_INIT ) {
+                    views.add(event.getNewView());
+                }
+            }
+        }, null);
+        while ( views.isEmpty() ) {
+            this.sleep(10);
+        }
+        reg.unregister();
+        final TopologyView view = views.get(0);
+
+        try {
+            final Collection<ServiceReference<TopologyEventListener>> refs = this.bc.getServiceReferences(TopologyEventListener.class, null);
+            assertNotNull(refs);
+            assertFalse(refs.isEmpty());
+            TopologyEventListener found = null;
+            for(final ServiceReference<TopologyEventListener> ref : refs) {
+                final TopologyEventListener listener = this.bc.getService(ref);
+                if ( listener != null && listener.getClass().getName().equals("org.apache.sling.event.impl.jobs.config.TopologyHandler") ) {
+                    found = listener;
+                    break;
+                }
+                bc.ungetService(ref);
+            }
+            assertNotNull(found);
+            final TopologyEventListener tel = found;
+            log.info("setupChaosThreads : simulating TOPOLOGY_INIT");
+            tel.handleTopologyEvent(new TopologyEvent(Type.TOPOLOGY_INIT, null, view));
+
+            threads.add(new Thread() {
+
+                private final Random random = new Random();
+
+                @Override
+                public void run() {
+                    final long startTime = System.currentTimeMillis();
+                    // this thread runs 30 seconds longer than the job creation thread
+                    final long endTime = startTime + (duration +EXTRA_CHAOS_DURATION_SECONDS) * 1000;
+                    while ( System.currentTimeMillis() < endTime ) {
+                        final int sleepTime = random.nextInt(25) + 15;
+                        try {
+                            Thread.sleep(sleepTime * STABLE_TOPOLOGY_FACTOR_MILLIS);
+                        } catch ( final InterruptedException ie) {
+                            Thread.currentThread().interrupt();
+                        }
+                        log.info("setupChaosThreads : simulating TOPOLOGY_CHANGING");
+                        tel.handleTopologyEvent(new TopologyEvent(Type.TOPOLOGY_CHANGING, view, null));
+                        final int changingTime = random.nextInt(20) + 3;
+                        try {
+                            Thread.sleep(changingTime * UNKNOWN_TOPOLOGY_FACTOR_MILLIS);
+                        } catch ( final InterruptedException ie) {
+                            Thread.currentThread().interrupt();
+                        }
+                        log.info("setupChaosThreads : simulating TOPOLOGY_CHANGED");
+                        tel.handleTopologyEvent(new TopologyEvent(Type.TOPOLOGY_CHANGED, view, view));
+                    }
+                    tel.getClass().getName();
+                    finishedThreads.incrementAndGet();
+                }
+            });
+        } catch (InvalidSyntaxException e) {
+            e.printStackTrace();
+        }
+    }
+
+    void doTestMaxParallel(int numJobs, long jobRunMillis, long duration) throws Exception {
+        final JobManager jobManager = this.getJobManager();
+
+        final Map<String, AtomicLong> added = new HashMap<>();
+        final Map<String, AtomicLong> created = new HashMap<>();
+        final Map<String, AtomicLong> finished = new HashMap<>();
+        final List<String> topics = new ArrayList<>();
+        added.put(TOPIC_NAME, new AtomicLong());
+        created.put(TOPIC_NAME, new AtomicLong());
+        finished.put(TOPIC_NAME, new AtomicLong());
+        topics.add(TOPIC_NAME);
+
+        final List<Thread> threads = new ArrayList<>();
+        final AtomicLong finishedThreads = new AtomicLong();
+
+        this.registerEventHandler("org/apache/sling/event/notification/job/*",
+                new EventHandler() {
+
+                    @Override
+                    public void handleEvent(final Event event) {
+                        final String topic = (String) event.getProperty(NotificationConstants.NOTIFICATION_PROPERTY_JOB_TOPIC);
+                        if ( NotificationConstants.TOPIC_JOB_FINISHED.equals(event.getTopic())) {
+                            finished.get(topic).incrementAndGet();
+                        } else if ( NotificationConstants.TOPIC_JOB_ADDED.equals(event.getTopic())) {
+                            added.get(topic).incrementAndGet();
+                        }
+                    }
+                });
+
+        // setup job consumers
+        this.setupJobConsumers(jobRunMillis);
+
+        // setup job creation tests
+        new CreateJobThread(jobManager, created, finishedThreads, numJobs).start();
+
+        // wait until 1 job is being processed
+        log.info("doTestMaxParallel : waiting until 1 job is being processed");
+        while ( max <= 0 ) {
+            this.sleep(100);
+        }
+        log.info("doTestMaxParallel : 1 job was processed, ready to go. max=" + max);
+
+        this.setupChaosThreads(threads, finishedThreads, duration);
+
+        log.info("doTestMaxParallel : starting threads (" + threads.size() + ")");
+        // start threads
+        for(final Thread t : threads) {
+            t.setDaemon(true);
+            t.start();
+        }
+
+        log.info("doTestMaxParallel: sleeping for " + duration + " seconds to wait for threads to finish...");
+        // for sure we can sleep for the duration
+        this.sleep(duration * 1000);
+
+        log.info("doTestMaxParallel: polling for threads to finish...");
+        // wait until threads are finished
+        while ( finishedThreads.get() < threads.size() ) {
+            this.sleep(100);
+        }
+
+        final Set<String> allTopics = new HashSet<>(topics);
+        log.info("doTestMaxParallel: waiting for job handling to finish... " + allTopics.size());
+        while ( !allTopics.isEmpty() ) {
+            final Iterator<String> iter = allTopics.iterator();
+            while ( iter.hasNext() ) {
+                final String topic = iter.next();
+                log.info("doTestMaxParallel: checking topic= " + topic +
+                        ", finished=" + finished.get(topic).get() + ", created=" + created.get(topic).get());
+                if ( finished.get(topic).get() == created.get(topic).get() ) {
+                    iter.remove();
+                }
+            }
+            log.info("doTestMaxParallel: waiting for job handling to finish... " + allTopics.size());
+            this.sleep(1000);
+        }
+        log.info("doTestMaxParallel: done.");
+    }
+}
diff --git a/src/test/java/org/apache/sling/event/it/OrderedMaxParallelTest.java b/src/test/java/org/apache/sling/event/it/OrderedMaxParallelTest.java
new file mode 100644
index 0000000..03085cd
--- /dev/null
+++ b/src/test/java/org/apache/sling/event/it/OrderedMaxParallelTest.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sling.event.it;
+
+import static org.junit.Assert.assertEquals;
+
+import java.io.IOException;
+import java.util.Dictionary;
+import java.util.Hashtable;
+
+import org.apache.sling.event.impl.jobs.config.ConfigurationConstants;
+import org.apache.sling.event.jobs.QueueConfiguration;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.ops4j.pax.exam.junit.PaxExam;
+
+@RunWith(PaxExam.class)
+public class OrderedMaxParallelTest extends AbstractMaxParallelTest {
+
+    private static final int DURATION = 40;
+
+    @Override
+    @Before
+    public void setup() throws IOException {
+        super.setup();
+
+        // create ordered test queue
+        final org.osgi.service.cm.Configuration orderedConfig = this.configAdmin.createFactoryConfiguration("org.apache.sling.event.jobs.QueueConfiguration", null);
+        final Dictionary<String, Object> orderedProps = new Hashtable<>();
+        orderedProps.put(ConfigurationConstants.PROP_NAME, "ordered-max-parallel");
+        orderedProps.put(ConfigurationConstants.PROP_TYPE, QueueConfiguration.Type.ORDERED.name());
+        orderedProps.put(ConfigurationConstants.PROP_TOPICS, TOPIC_NAME);
+        orderedProps.put(ConfigurationConstants.PROP_RETRIES, 2);
+        orderedProps.put(ConfigurationConstants.PROP_RETRY_DELAY, 2000L);
+        orderedProps.put(ConfigurationConstants.PROP_MAX_PARALLEL, 1);
+        orderedConfig.update(orderedProps);
+
+        this.sleep(1000L);
+    }
+
+    @Test(timeout=DURATION * 16000L)
+    public void testOrderedMaxParallel_slow() throws Exception {
+        doTestMaxParallel(12, 1717, DURATION);
+
+        assertEquals(1, max);
+    }
+
+    @Test(timeout=DURATION * 16000L)
+    public void testOrderedMaxParallel2_fast() throws Exception {
+        doTestMaxParallel(50, 123, DURATION);
+
+        assertEquals(1, max);
+    }
+}
diff --git a/src/test/java/org/apache/sling/event/it/RoundRobinMaxParallelTest.java b/src/test/java/org/apache/sling/event/it/RoundRobinMaxParallelTest.java
new file mode 100644
index 0000000..7e5531b
--- /dev/null
+++ b/src/test/java/org/apache/sling/event/it/RoundRobinMaxParallelTest.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sling.event.it;
+
+import static org.junit.Assert.assertTrue;
+
+import java.io.IOException;
+import java.util.Dictionary;
+import java.util.Hashtable;
+
+import org.apache.sling.event.impl.jobs.config.ConfigurationConstants;
+import org.apache.sling.event.jobs.QueueConfiguration;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.ops4j.pax.exam.junit.PaxExam;
+
+@RunWith(PaxExam.class)
+public class RoundRobinMaxParallelTest extends AbstractMaxParallelTest {
+
+    private static final int MAX_PARALLEL = 3;
+
+    private static final int DURATION = 50;
+
+    @Override
+    @Before
+    public void setup() throws IOException {
+        super.setup();
+
+        // create ordered test queue
+        final org.osgi.service.cm.Configuration orderedConfig = this.configAdmin.createFactoryConfiguration("org.apache.sling.event.jobs.QueueConfiguration", null);
+        final Dictionary<String, Object> orderedProps = new Hashtable<>();
+        orderedProps.put(ConfigurationConstants.PROP_NAME, "round-robin-max-parallel");
+        orderedProps.put(ConfigurationConstants.PROP_TYPE, QueueConfiguration.Type.TOPIC_ROUND_ROBIN.name());
+        orderedProps.put(ConfigurationConstants.PROP_TOPICS, TOPIC_NAME);
+        orderedProps.put(ConfigurationConstants.PROP_RETRIES, 2);
+        orderedProps.put(ConfigurationConstants.PROP_RETRY_DELAY, 2000L);
+        orderedProps.put(ConfigurationConstants.PROP_MAX_PARALLEL, MAX_PARALLEL);
+        orderedConfig.update(orderedProps);
+
+        this.sleep(1000L);
+    }
+
+    @Test(timeout=DURATION * 16000L)
+    public void testRoundRobinMaxParallel_slow() throws Exception {
+        doTestMaxParallel(20, 1717, DURATION);
+
+        assertTrue(max <= MAX_PARALLEL);
+    }
+
+    @Test(timeout=DURATION * 16000L)
+    public void testRoundRobinMaxParallel_fast() throws Exception {
+        doTestMaxParallel(200, 123, DURATION);
+
+        assertTrue(max <= MAX_PARALLEL);
+    }
+}