Merge pull request #24 from actinium15/issue/SLING-8853

SLING-8853 Adds ActiveResourceQueue and makes it available as …
diff --git a/pom.xml b/pom.xml
index 6e50d20..00733b3 100644
--- a/pom.xml
+++ b/pom.xml
@@ -50,6 +50,7 @@
 
     <properties>
         <exam.version>4.11.0</exam.version>
+        <sling.java.version>8</sling.java.version>
     </properties>
 
     <!-- ======================================================================= -->
diff --git a/src/main/java/org/apache/sling/distribution/agent/impl/ForwardDistributionAgentFactory.java b/src/main/java/org/apache/sling/distribution/agent/impl/ForwardDistributionAgentFactory.java
index c57b421..977f652 100644
--- a/src/main/java/org/apache/sling/distribution/agent/impl/ForwardDistributionAgentFactory.java
+++ b/src/main/java/org/apache/sling/distribution/agent/impl/ForwardDistributionAgentFactory.java
@@ -59,6 +59,7 @@
 import org.apache.sling.distribution.queue.impl.MultipleQueueDispatchingStrategy;
 import org.apache.sling.distribution.queue.impl.PriorityQueueDispatchingStrategy;
 import org.apache.sling.distribution.queue.impl.jobhandling.JobHandlingDistributionQueueProvider;
+import org.apache.sling.distribution.queue.impl.resource.ResourceQueueProvider;
 import org.apache.sling.distribution.queue.impl.simple.SimpleDistributionQueueProvider;
 import org.apache.sling.distribution.transport.DistributionTransportSecretProvider;
 import org.apache.sling.distribution.transport.impl.HttpConfiguration;
@@ -170,6 +171,7 @@
 
     @Property(options = {
             @PropertyOption(name = JobHandlingDistributionQueueProvider.TYPE, value = "Sling Jobs"),
+            @PropertyOption(name = ResourceQueueProvider.TYPE, value = "Resource Backed"),
             @PropertyOption(name = SimpleDistributionQueueProvider.TYPE, value = "In-memory"),
             @PropertyOption(name = SimpleDistributionQueueProvider.TYPE_CHECKPOINT, value = "In-file")},
             value = "jobs",
@@ -257,7 +259,10 @@
             queueProvider = new JobHandlingDistributionQueueProvider(agentName, jobManager, context, configAdmin);
         } else if (SimpleDistributionQueueProvider.TYPE.equals(queueProviderName)) {
             queueProvider = new SimpleDistributionQueueProvider(scheduler, agentName, false);
-        } else {
+        } else if (ResourceQueueProvider.TYPE.equals(queueProviderName)) {
+            queueProvider = new ResourceQueueProvider(context,
+                    resourceResolverFactory, SimpleDistributionAgent.DEFAULT_AGENT_SERVICE, agentName, scheduler, true);
+        } else { // when SimpleDistributionQueueProvider.TYPE_CHECKPOINT is "queueProviderName"
             queueProvider = new SimpleDistributionQueueProvider(scheduler, agentName, true);
         }
         queueProvider = new MonitoringDistributionQueueProvider(queueProvider, context);
diff --git a/src/main/java/org/apache/sling/distribution/agent/impl/SimpleDistributionAgent.java b/src/main/java/org/apache/sling/distribution/agent/impl/SimpleDistributionAgent.java
index 96c3d7f..19cb7a7 100644
--- a/src/main/java/org/apache/sling/distribution/agent/impl/SimpleDistributionAgent.java
+++ b/src/main/java/org/apache/sling/distribution/agent/impl/SimpleDistributionAgent.java
@@ -59,7 +59,7 @@
  * Basic implementation of a {@link DistributionAgent}
  */
 public class SimpleDistributionAgent implements DistributionAgent {
-    private final static String DEFAULT_AGENT_SERVICE = "defaultAgentService";
+    final static String DEFAULT_AGENT_SERVICE = "defaultAgentService";
 
     private final DistributionQueueProvider queueProvider;
     private final DistributionPackageImporter distributionPackageImporter;
diff --git a/src/main/java/org/apache/sling/distribution/queue/impl/resource/ActiveResourceQueue.java b/src/main/java/org/apache/sling/distribution/queue/impl/resource/ActiveResourceQueue.java
new file mode 100644
index 0000000..6a1339c
--- /dev/null
+++ b/src/main/java/org/apache/sling/distribution/queue/impl/resource/ActiveResourceQueue.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.sling.distribution.queue.impl.resource;
+
+import org.apache.sling.api.resource.LoginException;
+import org.apache.sling.api.resource.PersistenceException;
+import org.apache.sling.api.resource.Resource;
+import org.apache.sling.api.resource.ResourceResolver;
+import org.apache.sling.api.resource.ResourceResolverFactory;
+import org.apache.sling.distribution.queue.DistributionQueueEntry;
+import org.apache.sling.distribution.queue.DistributionQueueItem;
+import org.apache.sling.distribution.queue.DistributionQueueItemStatus;
+import org.apache.sling.distribution.queue.DistributionQueueStatus;
+import org.apache.sling.distribution.queue.impl.DistributionQueueUtils;
+import org.apache.sling.distribution.util.impl.DistributionUtils;
+import org.jetbrains.annotations.NotNull;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/*
+ * ActiveResourceQueue, in conjunction with a queue-processor active only on a single Sling instance,
+ * provides an Active, JCR resource-backed queue.
+ *
+ * It maintains the dequeue attempts in JCR to reflect consistent dequeue attempt status across all cluster instances
+ */
+public class ActiveResourceQueue extends ResourceQueue {
+
+    private final Logger log = LoggerFactory.getLogger(getClass());
+
+    public ActiveResourceQueue(ResourceResolverFactory resolverFactory, String serviceName, String queueName, String rootPath) {
+        super(resolverFactory, serviceName, queueName, rootPath);
+    }
+
+    @NotNull
+    @Override
+    public DistributionQueueStatus getStatus() {
+        ResourceResolver resourceResolver = null;
+        try {
+            resourceResolver = DistributionUtils.loginService(resolverFactory, serviceName);
+            Resource queueRoot = ResourceQueueUtils.getRootResource(resourceResolver, queueRootPath);
+
+            int count = ResourceQueueUtils.getResourceCount(queueRoot);
+
+            DistributionQueueEntry head = ResourceQueueUtils.getHead(queueRoot);
+            DistributionQueueItem firstItem = (null != head)? head.getItem(): null;
+            DistributionQueueItemStatus firstItemStatus = (null != head)? head.getStatus(): null;
+            log.debug("Queue has {} items, with following status for the head: {}",
+                    count, firstItemStatus);
+            return new DistributionQueueStatus(count,
+                    DistributionQueueUtils.calculateState(firstItem, firstItemStatus));
+        } catch (LoginException | PersistenceException e) {
+            throw new RuntimeException(e);
+        } finally {
+            DistributionUtils.safelyLogout(resourceResolver);
+        }
+    }
+}
diff --git a/src/main/java/org/apache/sling/distribution/queue/impl/resource/ResourceQueue.java b/src/main/java/org/apache/sling/distribution/queue/impl/resource/ResourceQueue.java
index 9a04b4c..4896c65 100644
--- a/src/main/java/org/apache/sling/distribution/queue/impl/resource/ResourceQueue.java
+++ b/src/main/java/org/apache/sling/distribution/queue/impl/resource/ResourceQueue.java
@@ -55,17 +55,17 @@
 
     private final Logger log = LoggerFactory.getLogger(getClass());
 
-
-    private final ResourceResolverFactory resolverFactory;
-    private String serviceName;
-    private String queueName;
-    private final String queueRootPath;
+    protected final ResourceResolverFactory resolverFactory;
+    protected final String queueRootPath;
+    protected String serviceName;
+    protected String queueName;
 
     public ResourceQueue(ResourceResolverFactory resolverFactory, String serviceName, String queueName, String rootPath) {
         this.resolverFactory = resolverFactory;
         this.serviceName = serviceName;
         this.queueName = queueName;
         this.queueRootPath = rootPath + "/" + queueName;
+        log.debug("starting a Resource Queue {}", queueName);
     }
 
     @NotNull
@@ -145,7 +145,6 @@
         } finally {
             DistributionUtils.safelyLogout(resourceResolver);
         }
-
     }
 
     @Nullable
diff --git a/src/main/java/org/apache/sling/distribution/queue/impl/resource/ResourceQueueProvider.java b/src/main/java/org/apache/sling/distribution/queue/impl/resource/ResourceQueueProvider.java
index 5db42ea..d2df714 100644
--- a/src/main/java/org/apache/sling/distribution/queue/impl/resource/ResourceQueueProvider.java
+++ b/src/main/java/org/apache/sling/distribution/queue/impl/resource/ResourceQueueProvider.java
@@ -20,34 +20,56 @@
 package org.apache.sling.distribution.queue.impl.resource;
 
 import org.apache.sling.api.resource.ResourceResolverFactory;
+import org.apache.sling.commons.scheduler.ScheduleOptions;
 import org.apache.sling.commons.scheduler.Scheduler;
 import org.apache.sling.distribution.common.DistributionException;
 import org.apache.sling.distribution.queue.DistributionQueueType;
 import org.apache.sling.distribution.queue.impl.DistributionQueueProcessor;
 import org.apache.sling.distribution.queue.impl.DistributionQueueProvider;
+import org.apache.sling.distribution.queue.impl.simple.SimpleDistributionQueueProcessor;
 import org.apache.sling.distribution.queue.spi.DistributionQueue;
 
 import org.jetbrains.annotations.NotNull;
 import org.osgi.framework.BundleContext;
 import org.osgi.framework.ServiceRegistration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.util.Dictionary;
 import java.util.Hashtable;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
 
 public class ResourceQueueProvider implements DistributionQueueProvider {
+    public static final String TYPE = "resource";
 
     private final static String QUEUES_ROOT = "/var/sling/distribution/queues/";
+
+    private final Logger log = LoggerFactory.getLogger(getClass());
+
     private ResourceResolverFactory resolverFactory;
     private String serviceName;
     private String agentRootPath;
+    private Scheduler scheduler;
+    private String agentName;
+    private boolean isActive;
+
+    private final Map<String, ResourceQueue> queueMap = new ConcurrentHashMap<>();
 
     private ServiceRegistration<Runnable> cleanupTask;
 
-
-    public ResourceQueueProvider(BundleContext context, ResourceResolverFactory resolverFactory, String serviceName, String agentName) {
+    public ResourceQueueProvider(BundleContext context, ResourceResolverFactory resolverFactory,
+            String serviceName, String agentName, Scheduler scheduler, boolean isActive) {
+        if (serviceName == null || (scheduler == null && isActive)
+                || context == null || resolverFactory == null || agentName == null) {
+            throw new IllegalArgumentException("all arguments are required");
+        }
         this.resolverFactory = resolverFactory;
         this.serviceName = serviceName;
+        this.agentName = agentName;
         this.agentRootPath = QUEUES_ROOT + agentName;
+        this.scheduler = scheduler;
+        this.isActive = isActive;
 
         register(context);
     }
@@ -55,23 +77,61 @@
     @NotNull
     @Override
     public DistributionQueue getQueue(@NotNull String queueName) throws DistributionException {
-        return new ResourceQueue(resolverFactory, serviceName, queueName, agentRootPath);
+        return queueMap.computeIfAbsent(queueName, name -> {
+            if (isActive) {
+                return new ActiveResourceQueue(resolverFactory, serviceName, name, agentRootPath);
+            } else {
+                return new ResourceQueue(resolverFactory, serviceName, name, agentRootPath);
+            }
+        });
     }
 
     @NotNull
     @Override
     public DistributionQueue getQueue(@NotNull String queueName, @NotNull DistributionQueueType type) {
-        return new ResourceQueue(resolverFactory, serviceName, queueName, agentRootPath);
+        try {
+            return getQueue(queueName);
+        } catch (DistributionException e) {
+            throw new RuntimeException("could not create config for queue " + queueName, e);
+        }
     }
 
     @Override
     public void enableQueueProcessing(@NotNull DistributionQueueProcessor queueProcessor, String... queueNames) throws DistributionException {
-        // processing not supported
+        // enable processing only for active ResourceQueues
+        if (isActive) {
+            for (String queueName : queueNames) {
+                ScheduleOptions options = scheduler.NOW(-1, 1)
+                        .canRunConcurrently(false)
+                        .onSingleInstanceOnly(true)
+                        .name(getJobName(queueName));
+                scheduler.schedule(new SimpleDistributionQueueProcessor(getQueue(queueName), queueProcessor), options);
+            }
+        } else {
+            throw new DistributionException(new UnsupportedOperationException("enable Processing not supported for Passive Queues"));
+        }
     }
 
     @Override
     public void disableQueueProcessing() throws DistributionException {
-        // processing not supported
+        // disable processing only for active ResourceQueues
+        if (isActive) {
+            for (DistributionQueue queue : queueMap.values()) {
+                String queueName = queue.getName();
+                // disable queue processing
+                if (scheduler.unschedule(getJobName(queueName))) {
+                    log.debug("queue processing on {} stopped", queue);
+                } else {
+                    log.warn("could not disable queue processing on {}", queue);
+                }
+            }
+        } else {
+            throw new DistributionException(new UnsupportedOperationException("disable Processing not supported for Passive Queues"));
+        }
+    }
+
+    private String getJobName(String queueName) {
+        return "resource-queueProcessor-" + agentName + "-" + queueName;
     }
 
 
diff --git a/src/main/java/org/apache/sling/distribution/queue/impl/resource/ResourceQueueProviderFactory.java b/src/main/java/org/apache/sling/distribution/queue/impl/resource/ResourceQueueProviderFactory.java
index ac0bafd..76ae581 100644
--- a/src/main/java/org/apache/sling/distribution/queue/impl/resource/ResourceQueueProviderFactory.java
+++ b/src/main/java/org/apache/sling/distribution/queue/impl/resource/ResourceQueueProviderFactory.java
@@ -21,10 +21,14 @@
 
 import org.apache.felix.scr.annotations.Activate;
 import org.apache.felix.scr.annotations.Component;
+import org.apache.felix.scr.annotations.ConfigurationPolicy;
+import org.apache.felix.scr.annotations.Properties;
 import org.apache.felix.scr.annotations.Property;
 import org.apache.felix.scr.annotations.Reference;
 import org.apache.felix.scr.annotations.Service;
 import org.apache.sling.api.resource.ResourceResolverFactory;
+import org.apache.sling.commons.osgi.PropertiesUtil;
+import org.apache.sling.commons.scheduler.Scheduler;
 import org.apache.sling.distribution.component.impl.DistributionComponentConstants;
 import org.apache.sling.distribution.queue.impl.DistributionQueueProvider;
 import org.apache.sling.distribution.queue.impl.DistributionQueueProviderFactory;
@@ -32,25 +36,41 @@
 
 import java.util.Map;
 
-@Component
+@Component(metatype = true,
+            label = "Apache Sling Resource Queue Provider Factory",
+            description = "OSGi configuration factory for Resource-backed queues",
+            configurationFactory = true,
+            policy = ConfigurationPolicy.REQUIRE)
 @Service(DistributionQueueProviderFactory.class)
-@Property(name = DistributionComponentConstants.PN_NAME, value = "resourceQueue")
+@Properties({
+        @Property(name = DistributionComponentConstants.PN_NAME, value = "resourceQueue"),
+        @Property(name = ResourceQueueProviderFactory.PN_IS_ACTIVE,
+                label = "Should the Resource-backed queue created with a Queue Processor (i.e., ACTIVE)",
+                boolValue = {false})
+})
 public class ResourceQueueProviderFactory implements DistributionQueueProviderFactory {
 
+    static final String PN_IS_ACTIVE = "queue.isActive";
+
     @Reference
     ResourceResolverFactory resourceResolverFactory;
+    @Reference
+    Scheduler scheduler;
 
     BundleContext context;
 
+    private boolean isActive;
+
     @Activate
     protected void activate(BundleContext context, Map<String, Object> config)
     {
+        this.isActive = PropertiesUtil.toBoolean(PN_IS_ACTIVE, false);
         this.context = context;
     }
 
     @Override
     public DistributionQueueProvider getProvider(String agentName, String serviceName) {
-        return new ResourceQueueProvider(context, resourceResolverFactory, serviceName, agentName);
+        return new ResourceQueueProvider(context, resourceResolverFactory, serviceName, agentName, scheduler, isActive);
     }
 
     @Override
@@ -59,4 +79,4 @@
             ((ResourceQueueProvider) queueProvider).close();
         }
     }
-}
\ No newline at end of file
+}
diff --git a/src/main/java/org/apache/sling/distribution/queue/impl/simple/SimpleDistributionQueueProcessor.java b/src/main/java/org/apache/sling/distribution/queue/impl/simple/SimpleDistributionQueueProcessor.java
index 5aff79c..462895e 100644
--- a/src/main/java/org/apache/sling/distribution/queue/impl/simple/SimpleDistributionQueueProcessor.java
+++ b/src/main/java/org/apache/sling/distribution/queue/impl/simple/SimpleDistributionQueueProcessor.java
@@ -31,7 +31,7 @@
 /**
  * a simple scheduled {@link SimpleDistributionQueue}s processor
  */
-class SimpleDistributionQueueProcessor implements Runnable {
+public class SimpleDistributionQueueProcessor implements Runnable {
 
     private final Logger log = LoggerFactory.getLogger(getClass());
     private final DistributionQueue queue;
diff --git a/src/test/java/org/apache/sling/distribution/queue/impl/resource/ResourceQueueProcessingTest.java b/src/test/java/org/apache/sling/distribution/queue/impl/resource/ResourceQueueProcessingTest.java
new file mode 100644
index 0000000..68d08fa
--- /dev/null
+++ b/src/test/java/org/apache/sling/distribution/queue/impl/resource/ResourceQueueProcessingTest.java
@@ -0,0 +1,268 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.sling.distribution.queue.impl.resource;
+
+import org.apache.sling.api.resource.LoginException;
+import org.apache.sling.api.resource.PersistenceException;
+import org.apache.sling.api.resource.ResourceResolverFactory;
+import org.apache.sling.commons.scheduler.ScheduleOptions;
+import org.apache.sling.commons.scheduler.Scheduler;
+import org.apache.sling.distribution.common.DistributionException;
+import org.apache.sling.distribution.queue.DistributionQueueEntry;
+import org.apache.sling.distribution.queue.DistributionQueueItem;
+import org.apache.sling.distribution.queue.DistributionQueueState;
+import org.apache.sling.distribution.queue.impl.DistributionQueueProcessor;
+import org.apache.sling.distribution.queue.impl.DistributionQueueProvider;
+import org.apache.sling.distribution.queue.impl.DistributionQueueProviderFactory;
+import org.apache.sling.distribution.queue.spi.DistributionQueue;
+import org.apache.sling.testing.mock.osgi.MockOsgi;
+import org.apache.sling.testing.mock.sling.MockSling;
+import org.apache.sling.testing.mock.sling.ResourceResolverType;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.mockito.Matchers;
+import org.mockito.internal.util.reflection.Whitebox;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+import org.osgi.framework.BundleContext;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.eq;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import java.util.Collections;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+public class ResourceQueueProcessingTest {
+
+    public static final Logger log = LoggerFactory.getLogger(ResourceQueueProcessingTest.class);
+
+    protected static final String PACKAGE_ID = "testPackageId";
+    protected static BundleContext bundleContext = null;
+    protected static ResourceResolverFactory rrf = null;
+    protected static Scheduler scheduler = null;
+    protected static ScheduledExecutorService executorService = null;
+
+    @Test
+    public void testActiveResourceQueue() throws DistributionException, PersistenceException, LoginException {
+        // obtain an active queue provider instance
+        final String QUEUE_NAME = "testActiveQueue";
+        final int MAX_ENTRIES = 32;
+
+        DistributionQueueProvider resourceQueueProvider = new ResourceQueueProvider(bundleContext,
+                rrf, "test", "testAgent", scheduler, true);
+        DistributionQueueProcessor mockResourceQueueProcessor = mock(DistributionQueueProcessor.class);
+
+        DistributionQueue resourceQueue = resourceQueueProvider.getQueue(QUEUE_NAME);
+
+        try {
+            populateDistributionQueue(resourceQueue, MAX_ENTRIES);
+
+            assertTrue("Resource Queue state is not RUNNING",
+                    resourceQueue.getStatus().getState().equals(DistributionQueueState.RUNNING));
+            assertEquals(MAX_ENTRIES, resourceQueue.getStatus().getItemsCount());
+
+            when(mockResourceQueueProcessor.process(eq(QUEUE_NAME), Matchers.any(DistributionQueueEntry.class)))
+                .thenReturn(true);
+
+            resourceQueueProvider.enableQueueProcessing(mockResourceQueueProcessor, QUEUE_NAME);
+            while (!resourceQueue.getStatus().getState().equals(DistributionQueueState.IDLE)) {
+                // do nothing, wait for processing to finish
+                log.info("Processing Resource Queue. Items remaining: {}",
+                        resourceQueue.getStatus().getItemsCount());
+            }
+
+            assertEquals(0, resourceQueue.getStatus().getItemsCount());
+        } finally {
+            resourceQueueProvider.disableQueueProcessing();
+            resourceQueue.clear(Integer.MAX_VALUE);
+        }
+    }
+
+    @Test
+    public void testActiveResourceQueueWithoutEnablingProcessing() throws DistributionException {
+        final String QUEUE_NAME = "testActiveQueue_2";
+        final int MAX_ENTRIES = 2;
+        Scheduler tempScheduler = mock(Scheduler.class);
+        when(tempScheduler.unschedule(Matchers.anyString())).thenReturn(false);
+
+        DistributionQueueProvider resourceQueueProvider = new ResourceQueueProvider(bundleContext,
+                rrf, "test", "testAgent", tempScheduler, true);
+        DistributionQueue resourceQueue = resourceQueueProvider.getQueue(QUEUE_NAME);
+
+        try {
+            populateDistributionQueue(resourceQueue, MAX_ENTRIES);
+
+            assertTrue("Resource Queue state is not RUNNING",
+                    resourceQueue.getStatus().getState().equals(DistributionQueueState.RUNNING));
+            assertEquals(MAX_ENTRIES, resourceQueue.getStatus().getItemsCount());
+        } finally {
+            // should log a WARN for ResourceQueueProvider class
+            resourceQueueProvider.disableQueueProcessing();
+            resourceQueue.clear(Integer.MAX_VALUE);
+        }
+    }
+
+    @Test(expected = DistributionException.class)
+    public void testPassiveResourceQueueEnableProcessing() throws DistributionException {
+        final String QUEUE_NAME = "testPassiveQueue_1";
+        final int MAX_ENTRIES = 4;
+        DistributionQueueProviderFactory resQueueProviderFactory= new ResourceQueueProviderFactory();
+        Whitebox.setInternalState(resQueueProviderFactory, "isActive", false);
+        Whitebox.setInternalState(resQueueProviderFactory, "resourceResolverFactory", rrf);
+        Whitebox.setInternalState(resQueueProviderFactory, "scheduler", scheduler);
+        MockOsgi.activate(resQueueProviderFactory, bundleContext);
+
+        DistributionQueueProvider resourceQueueProvider = resQueueProviderFactory
+                .getProvider("test", "testAgent");
+
+        DistributionQueue resourceQueue = resourceQueueProvider.getQueue(QUEUE_NAME, null);
+
+        try {
+            populateDistributionQueue(resourceQueue, MAX_ENTRIES);
+
+            assertTrue("Resource Queue state is PASSIVE",
+                    resourceQueue.getStatus().getState().equals(DistributionQueueState.PASSIVE));
+            assertEquals(MAX_ENTRIES, resourceQueue.getStatus().getItemsCount());
+
+            resourceQueueProvider.enableQueueProcessing(null, QUEUE_NAME); // expect exception
+        } finally {
+            resourceQueue.clear(Integer.MAX_VALUE);
+        }
+    }
+
+    @Test(expected = DistributionException.class)
+    public void testPassiveResourceQueueDisableProcessing() throws DistributionException {
+        final String QUEUE_NAME = "testPassiveQueue_2";
+        final int MAX_ENTRIES = 2;
+        DistributionQueueProvider resourceQueueProvider = new ResourceQueueProvider(bundleContext,
+                rrf, "test", "testAgent", null, false);
+
+        DistributionQueue resourceQueue = resourceQueueProvider.getQueue(QUEUE_NAME, null);
+
+        try {
+            populateDistributionQueue(resourceQueue, MAX_ENTRIES);
+
+            assertTrue("Resource Queue state is PASSIVE",
+                    resourceQueue.getStatus().getState().equals(DistributionQueueState.PASSIVE));
+            assertEquals(MAX_ENTRIES, resourceQueue.getStatus().getItemsCount());
+        } finally {
+            resourceQueueProvider.disableQueueProcessing(); // expect exception
+            resourceQueue.clear(Integer.MAX_VALUE);
+        }
+    }
+
+    @Test(expected = IllegalArgumentException.class)
+    public void testInvalidQueueProviderConstruction_1() {
+        constructIllegalResourceQueueProvider(IllegalQueueProviderType.MISSING_BUNDLE_CONTEXT);
+    }
+
+    @Test(expected = IllegalArgumentException.class)
+    public void testInvalidQueueProviderConstruction_2() {
+        constructIllegalResourceQueueProvider(IllegalQueueProviderType.MISSING_RESOLVER_FACTORY);
+    }
+
+    @Test(expected = IllegalArgumentException.class)
+    public void testInvalidQueueProviderConstruction_3() {
+        constructIllegalResourceQueueProvider(IllegalQueueProviderType.MISSING_SERVICENAME);
+    }
+
+    @Test(expected = IllegalArgumentException.class)
+    public void testInvalidQueueProviderConstruction_4() {
+        constructIllegalResourceQueueProvider(IllegalQueueProviderType.MISSING_AGENTNAME);
+    }
+
+    @Test(expected = IllegalArgumentException.class)
+    public void testInvalidQueueProviderConstruction_5() {
+        constructIllegalResourceQueueProvider(IllegalQueueProviderType.MISSING_SCHEDULER_WHEN_ACTIVE);
+    }
+
+    private void populateDistributionQueue(DistributionQueue queue, int maxEntries) {
+        for (int i = 0; i < maxEntries; i++) {
+            queue.add(new DistributionQueueItem(PACKAGE_ID, Collections.<String, Object>emptyMap()));
+        }
+    }
+
+    private ResourceQueueProvider constructIllegalResourceQueueProvider(IllegalQueueProviderType type) {
+        switch(type) {
+        case MISSING_BUNDLE_CONTEXT:
+            return new ResourceQueueProvider(null,
+                    rrf, "test", "testAgent", scheduler, true);
+        case MISSING_RESOLVER_FACTORY:
+            return new ResourceQueueProvider(bundleContext,
+                    null, "test", "testAgent", scheduler, true);
+        case MISSING_SERVICENAME:
+            return new ResourceQueueProvider(bundleContext,
+                    rrf, null, "testAgent", scheduler, true);
+        case MISSING_AGENTNAME:
+            return new ResourceQueueProvider(bundleContext,
+                    rrf, "test", null, scheduler, true);
+        case MISSING_SCHEDULER_WHEN_ACTIVE:
+        default:
+            return new ResourceQueueProvider(bundleContext,
+                    rrf, "test", "testAgent", null, true);
+        }
+    }
+
+    private enum IllegalQueueProviderType {
+        MISSING_BUNDLE_CONTEXT,
+        MISSING_RESOLVER_FACTORY,
+        MISSING_SERVICENAME,
+        MISSING_AGENTNAME,
+        MISSING_SCHEDULER_WHEN_ACTIVE,
+    }
+
+    @BeforeClass
+    public static void setUp() throws LoginException {
+        bundleContext = MockOsgi.newBundleContext();
+        MockSling.setAdapterManagerBundleContext(bundleContext);
+        rrf = MockSling.newResourceResolverFactory(ResourceResolverType.JCR_OAK, bundleContext);
+        scheduler = mock(Scheduler.class);
+        ScheduleOptions mockScheduleOptions = mock(ScheduleOptions.class);
+        when(mockScheduleOptions.canRunConcurrently(Matchers.anyBoolean())).thenReturn(mockScheduleOptions);
+        when(mockScheduleOptions.onSingleInstanceOnly(Matchers.anyBoolean())).thenReturn(mockScheduleOptions);
+        when(mockScheduleOptions.name(Matchers.anyString())).thenReturn(mockScheduleOptions);
+        executorService = Executors.newSingleThreadScheduledExecutor();
+        when(scheduler.NOW(Matchers.anyInt(), Matchers.anyLong())).thenReturn(mockScheduleOptions);
+        when(scheduler.schedule(Matchers.any(Runnable.class), Matchers.any(ScheduleOptions.class)))
+            .thenAnswer(new Answer<Boolean>() {
+                @Override
+                public Boolean answer(InvocationOnMock invocation) throws Throwable {
+                    Runnable task = (Runnable) invocation.getArguments()[0];
+                    executorService.scheduleAtFixedRate(task, 0L, 1L, TimeUnit.SECONDS);
+                    return true;
+                }
+            });
+        when(scheduler.unschedule(Matchers.anyString())).thenReturn(true);
+    }
+
+    @AfterClass
+    public static void tearDown() {
+        MockSling.clearAdapterManagerBundleContext();
+        executorService.shutdownNow();
+    }
+}