Merge pull request #24 from actinium15/issue/SLING-8853
SLING-8853 Adds ActiveResourceQueue and makes it available as …
diff --git a/pom.xml b/pom.xml
index 6e50d20..00733b3 100644
--- a/pom.xml
+++ b/pom.xml
@@ -50,6 +50,7 @@
<properties>
<exam.version>4.11.0</exam.version>
+ <sling.java.version>8</sling.java.version>
</properties>
<!-- ======================================================================= -->
diff --git a/src/main/java/org/apache/sling/distribution/agent/impl/ForwardDistributionAgentFactory.java b/src/main/java/org/apache/sling/distribution/agent/impl/ForwardDistributionAgentFactory.java
index c57b421..977f652 100644
--- a/src/main/java/org/apache/sling/distribution/agent/impl/ForwardDistributionAgentFactory.java
+++ b/src/main/java/org/apache/sling/distribution/agent/impl/ForwardDistributionAgentFactory.java
@@ -59,6 +59,7 @@
import org.apache.sling.distribution.queue.impl.MultipleQueueDispatchingStrategy;
import org.apache.sling.distribution.queue.impl.PriorityQueueDispatchingStrategy;
import org.apache.sling.distribution.queue.impl.jobhandling.JobHandlingDistributionQueueProvider;
+import org.apache.sling.distribution.queue.impl.resource.ResourceQueueProvider;
import org.apache.sling.distribution.queue.impl.simple.SimpleDistributionQueueProvider;
import org.apache.sling.distribution.transport.DistributionTransportSecretProvider;
import org.apache.sling.distribution.transport.impl.HttpConfiguration;
@@ -170,6 +171,7 @@
@Property(options = {
@PropertyOption(name = JobHandlingDistributionQueueProvider.TYPE, value = "Sling Jobs"),
+ @PropertyOption(name = ResourceQueueProvider.TYPE, value = "Resource Backed"),
@PropertyOption(name = SimpleDistributionQueueProvider.TYPE, value = "In-memory"),
@PropertyOption(name = SimpleDistributionQueueProvider.TYPE_CHECKPOINT, value = "In-file")},
value = "jobs",
@@ -257,7 +259,10 @@
queueProvider = new JobHandlingDistributionQueueProvider(agentName, jobManager, context, configAdmin);
} else if (SimpleDistributionQueueProvider.TYPE.equals(queueProviderName)) {
queueProvider = new SimpleDistributionQueueProvider(scheduler, agentName, false);
- } else {
+ } else if (ResourceQueueProvider.TYPE.equals(queueProviderName)) {
+ queueProvider = new ResourceQueueProvider(context,
+ resourceResolverFactory, SimpleDistributionAgent.DEFAULT_AGENT_SERVICE, agentName, scheduler, true);
+ } else { // when SimpleDistributionQueueProvider.TYPE_CHECKPOINT is "queueProviderName"
queueProvider = new SimpleDistributionQueueProvider(scheduler, agentName, true);
}
queueProvider = new MonitoringDistributionQueueProvider(queueProvider, context);
diff --git a/src/main/java/org/apache/sling/distribution/agent/impl/SimpleDistributionAgent.java b/src/main/java/org/apache/sling/distribution/agent/impl/SimpleDistributionAgent.java
index 96c3d7f..19cb7a7 100644
--- a/src/main/java/org/apache/sling/distribution/agent/impl/SimpleDistributionAgent.java
+++ b/src/main/java/org/apache/sling/distribution/agent/impl/SimpleDistributionAgent.java
@@ -59,7 +59,7 @@
* Basic implementation of a {@link DistributionAgent}
*/
public class SimpleDistributionAgent implements DistributionAgent {
- private final static String DEFAULT_AGENT_SERVICE = "defaultAgentService";
+ final static String DEFAULT_AGENT_SERVICE = "defaultAgentService";
private final DistributionQueueProvider queueProvider;
private final DistributionPackageImporter distributionPackageImporter;
diff --git a/src/main/java/org/apache/sling/distribution/queue/impl/resource/ActiveResourceQueue.java b/src/main/java/org/apache/sling/distribution/queue/impl/resource/ActiveResourceQueue.java
new file mode 100644
index 0000000..6a1339c
--- /dev/null
+++ b/src/main/java/org/apache/sling/distribution/queue/impl/resource/ActiveResourceQueue.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.sling.distribution.queue.impl.resource;
+
+import org.apache.sling.api.resource.LoginException;
+import org.apache.sling.api.resource.PersistenceException;
+import org.apache.sling.api.resource.Resource;
+import org.apache.sling.api.resource.ResourceResolver;
+import org.apache.sling.api.resource.ResourceResolverFactory;
+import org.apache.sling.distribution.queue.DistributionQueueEntry;
+import org.apache.sling.distribution.queue.DistributionQueueItem;
+import org.apache.sling.distribution.queue.DistributionQueueItemStatus;
+import org.apache.sling.distribution.queue.DistributionQueueStatus;
+import org.apache.sling.distribution.queue.impl.DistributionQueueUtils;
+import org.apache.sling.distribution.util.impl.DistributionUtils;
+import org.jetbrains.annotations.NotNull;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/*
+ * ActiveResourceQueue, in conjunction with a queue-processor active only on a single Sling instance,
+ * provides an Active, JCR resource-backed queue.
+ *
+ * It maintains the dequeue attempts in JCR to reflect consistent dequeue attempt status across all cluster instances
+ */
+public class ActiveResourceQueue extends ResourceQueue {
+
+ private final Logger log = LoggerFactory.getLogger(getClass());
+
+ public ActiveResourceQueue(ResourceResolverFactory resolverFactory, String serviceName, String queueName, String rootPath) {
+ super(resolverFactory, serviceName, queueName, rootPath);
+ }
+
+ @NotNull
+ @Override
+ public DistributionQueueStatus getStatus() {
+ ResourceResolver resourceResolver = null;
+ try {
+ resourceResolver = DistributionUtils.loginService(resolverFactory, serviceName);
+ Resource queueRoot = ResourceQueueUtils.getRootResource(resourceResolver, queueRootPath);
+
+ int count = ResourceQueueUtils.getResourceCount(queueRoot);
+
+ DistributionQueueEntry head = ResourceQueueUtils.getHead(queueRoot);
+ DistributionQueueItem firstItem = (null != head)? head.getItem(): null;
+ DistributionQueueItemStatus firstItemStatus = (null != head)? head.getStatus(): null;
+ log.debug("Queue has {} items, with following status for the head: {}",
+ count, firstItemStatus);
+ return new DistributionQueueStatus(count,
+ DistributionQueueUtils.calculateState(firstItem, firstItemStatus));
+ } catch (LoginException | PersistenceException e) {
+ throw new RuntimeException(e);
+ } finally {
+ DistributionUtils.safelyLogout(resourceResolver);
+ }
+ }
+}
diff --git a/src/main/java/org/apache/sling/distribution/queue/impl/resource/ResourceQueue.java b/src/main/java/org/apache/sling/distribution/queue/impl/resource/ResourceQueue.java
index 9a04b4c..4896c65 100644
--- a/src/main/java/org/apache/sling/distribution/queue/impl/resource/ResourceQueue.java
+++ b/src/main/java/org/apache/sling/distribution/queue/impl/resource/ResourceQueue.java
@@ -55,17 +55,17 @@
private final Logger log = LoggerFactory.getLogger(getClass());
-
- private final ResourceResolverFactory resolverFactory;
- private String serviceName;
- private String queueName;
- private final String queueRootPath;
+ protected final ResourceResolverFactory resolverFactory;
+ protected final String queueRootPath;
+ protected String serviceName;
+ protected String queueName;
public ResourceQueue(ResourceResolverFactory resolverFactory, String serviceName, String queueName, String rootPath) {
this.resolverFactory = resolverFactory;
this.serviceName = serviceName;
this.queueName = queueName;
this.queueRootPath = rootPath + "/" + queueName;
+ log.debug("starting a Resource Queue {}", queueName);
}
@NotNull
@@ -145,7 +145,6 @@
} finally {
DistributionUtils.safelyLogout(resourceResolver);
}
-
}
@Nullable
diff --git a/src/main/java/org/apache/sling/distribution/queue/impl/resource/ResourceQueueProvider.java b/src/main/java/org/apache/sling/distribution/queue/impl/resource/ResourceQueueProvider.java
index 5db42ea..d2df714 100644
--- a/src/main/java/org/apache/sling/distribution/queue/impl/resource/ResourceQueueProvider.java
+++ b/src/main/java/org/apache/sling/distribution/queue/impl/resource/ResourceQueueProvider.java
@@ -20,34 +20,56 @@
package org.apache.sling.distribution.queue.impl.resource;
import org.apache.sling.api.resource.ResourceResolverFactory;
+import org.apache.sling.commons.scheduler.ScheduleOptions;
import org.apache.sling.commons.scheduler.Scheduler;
import org.apache.sling.distribution.common.DistributionException;
import org.apache.sling.distribution.queue.DistributionQueueType;
import org.apache.sling.distribution.queue.impl.DistributionQueueProcessor;
import org.apache.sling.distribution.queue.impl.DistributionQueueProvider;
+import org.apache.sling.distribution.queue.impl.simple.SimpleDistributionQueueProcessor;
import org.apache.sling.distribution.queue.spi.DistributionQueue;
import org.jetbrains.annotations.NotNull;
import org.osgi.framework.BundleContext;
import org.osgi.framework.ServiceRegistration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import java.util.Dictionary;
import java.util.Hashtable;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
public class ResourceQueueProvider implements DistributionQueueProvider {
+ public static final String TYPE = "resource";
private final static String QUEUES_ROOT = "/var/sling/distribution/queues/";
+
+ private final Logger log = LoggerFactory.getLogger(getClass());
+
private ResourceResolverFactory resolverFactory;
private String serviceName;
private String agentRootPath;
+ private Scheduler scheduler;
+ private String agentName;
+ private boolean isActive;
+
+ private final Map<String, ResourceQueue> queueMap = new ConcurrentHashMap<>();
private ServiceRegistration<Runnable> cleanupTask;
-
- public ResourceQueueProvider(BundleContext context, ResourceResolverFactory resolverFactory, String serviceName, String agentName) {
+ public ResourceQueueProvider(BundleContext context, ResourceResolverFactory resolverFactory,
+ String serviceName, String agentName, Scheduler scheduler, boolean isActive) {
+ if (serviceName == null || (scheduler == null && isActive)
+ || context == null || resolverFactory == null || agentName == null) {
+ throw new IllegalArgumentException("all arguments are required");
+ }
this.resolverFactory = resolverFactory;
this.serviceName = serviceName;
+ this.agentName = agentName;
this.agentRootPath = QUEUES_ROOT + agentName;
+ this.scheduler = scheduler;
+ this.isActive = isActive;
register(context);
}
@@ -55,23 +77,61 @@
@NotNull
@Override
public DistributionQueue getQueue(@NotNull String queueName) throws DistributionException {
- return new ResourceQueue(resolverFactory, serviceName, queueName, agentRootPath);
+ return queueMap.computeIfAbsent(queueName, name -> {
+ if (isActive) {
+ return new ActiveResourceQueue(resolverFactory, serviceName, name, agentRootPath);
+ } else {
+ return new ResourceQueue(resolverFactory, serviceName, name, agentRootPath);
+ }
+ });
}
@NotNull
@Override
public DistributionQueue getQueue(@NotNull String queueName, @NotNull DistributionQueueType type) {
- return new ResourceQueue(resolverFactory, serviceName, queueName, agentRootPath);
+ try {
+ return getQueue(queueName);
+ } catch (DistributionException e) {
+ throw new RuntimeException("could not create config for queue " + queueName, e);
+ }
}
@Override
public void enableQueueProcessing(@NotNull DistributionQueueProcessor queueProcessor, String... queueNames) throws DistributionException {
- // processing not supported
+ // enable processing only for active ResourceQueues
+ if (isActive) {
+ for (String queueName : queueNames) {
+ ScheduleOptions options = scheduler.NOW(-1, 1)
+ .canRunConcurrently(false)
+ .onSingleInstanceOnly(true)
+ .name(getJobName(queueName));
+ scheduler.schedule(new SimpleDistributionQueueProcessor(getQueue(queueName), queueProcessor), options);
+ }
+ } else {
+ throw new DistributionException(new UnsupportedOperationException("enable Processing not supported for Passive Queues"));
+ }
}
@Override
public void disableQueueProcessing() throws DistributionException {
- // processing not supported
+ // disable processing only for active ResourceQueues
+ if (isActive) {
+ for (DistributionQueue queue : queueMap.values()) {
+ String queueName = queue.getName();
+ // disable queue processing
+ if (scheduler.unschedule(getJobName(queueName))) {
+ log.debug("queue processing on {} stopped", queue);
+ } else {
+ log.warn("could not disable queue processing on {}", queue);
+ }
+ }
+ } else {
+ throw new DistributionException(new UnsupportedOperationException("disable Processing not supported for Passive Queues"));
+ }
+ }
+
+ private String getJobName(String queueName) {
+ return "resource-queueProcessor-" + agentName + "-" + queueName;
}
diff --git a/src/main/java/org/apache/sling/distribution/queue/impl/resource/ResourceQueueProviderFactory.java b/src/main/java/org/apache/sling/distribution/queue/impl/resource/ResourceQueueProviderFactory.java
index ac0bafd..76ae581 100644
--- a/src/main/java/org/apache/sling/distribution/queue/impl/resource/ResourceQueueProviderFactory.java
+++ b/src/main/java/org/apache/sling/distribution/queue/impl/resource/ResourceQueueProviderFactory.java
@@ -21,10 +21,14 @@
import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
+import org.apache.felix.scr.annotations.ConfigurationPolicy;
+import org.apache.felix.scr.annotations.Properties;
import org.apache.felix.scr.annotations.Property;
import org.apache.felix.scr.annotations.Reference;
import org.apache.felix.scr.annotations.Service;
import org.apache.sling.api.resource.ResourceResolverFactory;
+import org.apache.sling.commons.osgi.PropertiesUtil;
+import org.apache.sling.commons.scheduler.Scheduler;
import org.apache.sling.distribution.component.impl.DistributionComponentConstants;
import org.apache.sling.distribution.queue.impl.DistributionQueueProvider;
import org.apache.sling.distribution.queue.impl.DistributionQueueProviderFactory;
@@ -32,25 +36,41 @@
import java.util.Map;
-@Component
+@Component(metatype = true,
+ label = "Apache Sling Resource Queue Provider Factory",
+ description = "OSGi configuration factory for Resource-backed queues",
+ configurationFactory = true,
+ policy = ConfigurationPolicy.REQUIRE)
@Service(DistributionQueueProviderFactory.class)
-@Property(name = DistributionComponentConstants.PN_NAME, value = "resourceQueue")
+@Properties({
+ @Property(name = DistributionComponentConstants.PN_NAME, value = "resourceQueue"),
+ @Property(name = ResourceQueueProviderFactory.PN_IS_ACTIVE,
+ label = "Should the Resource-backed queue created with a Queue Processor (i.e., ACTIVE)",
+ boolValue = {false})
+})
public class ResourceQueueProviderFactory implements DistributionQueueProviderFactory {
+ static final String PN_IS_ACTIVE = "queue.isActive";
+
@Reference
ResourceResolverFactory resourceResolverFactory;
+ @Reference
+ Scheduler scheduler;
BundleContext context;
+ private boolean isActive;
+
@Activate
protected void activate(BundleContext context, Map<String, Object> config)
{
+ this.isActive = PropertiesUtil.toBoolean(PN_IS_ACTIVE, false);
this.context = context;
}
@Override
public DistributionQueueProvider getProvider(String agentName, String serviceName) {
- return new ResourceQueueProvider(context, resourceResolverFactory, serviceName, agentName);
+ return new ResourceQueueProvider(context, resourceResolverFactory, serviceName, agentName, scheduler, isActive);
}
@Override
@@ -59,4 +79,4 @@
((ResourceQueueProvider) queueProvider).close();
}
}
-}
\ No newline at end of file
+}
diff --git a/src/main/java/org/apache/sling/distribution/queue/impl/simple/SimpleDistributionQueueProcessor.java b/src/main/java/org/apache/sling/distribution/queue/impl/simple/SimpleDistributionQueueProcessor.java
index 5aff79c..462895e 100644
--- a/src/main/java/org/apache/sling/distribution/queue/impl/simple/SimpleDistributionQueueProcessor.java
+++ b/src/main/java/org/apache/sling/distribution/queue/impl/simple/SimpleDistributionQueueProcessor.java
@@ -31,7 +31,7 @@
/**
* a simple scheduled {@link SimpleDistributionQueue}s processor
*/
-class SimpleDistributionQueueProcessor implements Runnable {
+public class SimpleDistributionQueueProcessor implements Runnable {
private final Logger log = LoggerFactory.getLogger(getClass());
private final DistributionQueue queue;
diff --git a/src/test/java/org/apache/sling/distribution/queue/impl/resource/ResourceQueueProcessingTest.java b/src/test/java/org/apache/sling/distribution/queue/impl/resource/ResourceQueueProcessingTest.java
new file mode 100644
index 0000000..68d08fa
--- /dev/null
+++ b/src/test/java/org/apache/sling/distribution/queue/impl/resource/ResourceQueueProcessingTest.java
@@ -0,0 +1,268 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.sling.distribution.queue.impl.resource;
+
+import org.apache.sling.api.resource.LoginException;
+import org.apache.sling.api.resource.PersistenceException;
+import org.apache.sling.api.resource.ResourceResolverFactory;
+import org.apache.sling.commons.scheduler.ScheduleOptions;
+import org.apache.sling.commons.scheduler.Scheduler;
+import org.apache.sling.distribution.common.DistributionException;
+import org.apache.sling.distribution.queue.DistributionQueueEntry;
+import org.apache.sling.distribution.queue.DistributionQueueItem;
+import org.apache.sling.distribution.queue.DistributionQueueState;
+import org.apache.sling.distribution.queue.impl.DistributionQueueProcessor;
+import org.apache.sling.distribution.queue.impl.DistributionQueueProvider;
+import org.apache.sling.distribution.queue.impl.DistributionQueueProviderFactory;
+import org.apache.sling.distribution.queue.spi.DistributionQueue;
+import org.apache.sling.testing.mock.osgi.MockOsgi;
+import org.apache.sling.testing.mock.sling.MockSling;
+import org.apache.sling.testing.mock.sling.ResourceResolverType;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.mockito.Matchers;
+import org.mockito.internal.util.reflection.Whitebox;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+import org.osgi.framework.BundleContext;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.eq;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import java.util.Collections;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+public class ResourceQueueProcessingTest {
+
+ public static final Logger log = LoggerFactory.getLogger(ResourceQueueProcessingTest.class);
+
+ protected static final String PACKAGE_ID = "testPackageId";
+ protected static BundleContext bundleContext = null;
+ protected static ResourceResolverFactory rrf = null;
+ protected static Scheduler scheduler = null;
+ protected static ScheduledExecutorService executorService = null;
+
+ @Test
+ public void testActiveResourceQueue() throws DistributionException, PersistenceException, LoginException {
+ // obtain an active queue provider instance
+ final String QUEUE_NAME = "testActiveQueue";
+ final int MAX_ENTRIES = 32;
+
+ DistributionQueueProvider resourceQueueProvider = new ResourceQueueProvider(bundleContext,
+ rrf, "test", "testAgent", scheduler, true);
+ DistributionQueueProcessor mockResourceQueueProcessor = mock(DistributionQueueProcessor.class);
+
+ DistributionQueue resourceQueue = resourceQueueProvider.getQueue(QUEUE_NAME);
+
+ try {
+ populateDistributionQueue(resourceQueue, MAX_ENTRIES);
+
+ assertTrue("Resource Queue state is not RUNNING",
+ resourceQueue.getStatus().getState().equals(DistributionQueueState.RUNNING));
+ assertEquals(MAX_ENTRIES, resourceQueue.getStatus().getItemsCount());
+
+ when(mockResourceQueueProcessor.process(eq(QUEUE_NAME), Matchers.any(DistributionQueueEntry.class)))
+ .thenReturn(true);
+
+ resourceQueueProvider.enableQueueProcessing(mockResourceQueueProcessor, QUEUE_NAME);
+ while (!resourceQueue.getStatus().getState().equals(DistributionQueueState.IDLE)) {
+ // do nothing, wait for processing to finish
+ log.info("Processing Resource Queue. Items remaining: {}",
+ resourceQueue.getStatus().getItemsCount());
+ }
+
+ assertEquals(0, resourceQueue.getStatus().getItemsCount());
+ } finally {
+ resourceQueueProvider.disableQueueProcessing();
+ resourceQueue.clear(Integer.MAX_VALUE);
+ }
+ }
+
+ @Test
+ public void testActiveResourceQueueWithoutEnablingProcessing() throws DistributionException {
+ final String QUEUE_NAME = "testActiveQueue_2";
+ final int MAX_ENTRIES = 2;
+ Scheduler tempScheduler = mock(Scheduler.class);
+ when(tempScheduler.unschedule(Matchers.anyString())).thenReturn(false);
+
+ DistributionQueueProvider resourceQueueProvider = new ResourceQueueProvider(bundleContext,
+ rrf, "test", "testAgent", tempScheduler, true);
+ DistributionQueue resourceQueue = resourceQueueProvider.getQueue(QUEUE_NAME);
+
+ try {
+ populateDistributionQueue(resourceQueue, MAX_ENTRIES);
+
+ assertTrue("Resource Queue state is not RUNNING",
+ resourceQueue.getStatus().getState().equals(DistributionQueueState.RUNNING));
+ assertEquals(MAX_ENTRIES, resourceQueue.getStatus().getItemsCount());
+ } finally {
+ // should log a WARN for ResourceQueueProvider class
+ resourceQueueProvider.disableQueueProcessing();
+ resourceQueue.clear(Integer.MAX_VALUE);
+ }
+ }
+
+ @Test(expected = DistributionException.class)
+ public void testPassiveResourceQueueEnableProcessing() throws DistributionException {
+ final String QUEUE_NAME = "testPassiveQueue_1";
+ final int MAX_ENTRIES = 4;
+ DistributionQueueProviderFactory resQueueProviderFactory= new ResourceQueueProviderFactory();
+ Whitebox.setInternalState(resQueueProviderFactory, "isActive", false);
+ Whitebox.setInternalState(resQueueProviderFactory, "resourceResolverFactory", rrf);
+ Whitebox.setInternalState(resQueueProviderFactory, "scheduler", scheduler);
+ MockOsgi.activate(resQueueProviderFactory, bundleContext);
+
+ DistributionQueueProvider resourceQueueProvider = resQueueProviderFactory
+ .getProvider("test", "testAgent");
+
+ DistributionQueue resourceQueue = resourceQueueProvider.getQueue(QUEUE_NAME, null);
+
+ try {
+ populateDistributionQueue(resourceQueue, MAX_ENTRIES);
+
+ assertTrue("Resource Queue state is PASSIVE",
+ resourceQueue.getStatus().getState().equals(DistributionQueueState.PASSIVE));
+ assertEquals(MAX_ENTRIES, resourceQueue.getStatus().getItemsCount());
+
+ resourceQueueProvider.enableQueueProcessing(null, QUEUE_NAME); // expect exception
+ } finally {
+ resourceQueue.clear(Integer.MAX_VALUE);
+ }
+ }
+
+ @Test(expected = DistributionException.class)
+ public void testPassiveResourceQueueDisableProcessing() throws DistributionException {
+ final String QUEUE_NAME = "testPassiveQueue_2";
+ final int MAX_ENTRIES = 2;
+ DistributionQueueProvider resourceQueueProvider = new ResourceQueueProvider(bundleContext,
+ rrf, "test", "testAgent", null, false);
+
+ DistributionQueue resourceQueue = resourceQueueProvider.getQueue(QUEUE_NAME, null);
+
+ try {
+ populateDistributionQueue(resourceQueue, MAX_ENTRIES);
+
+ assertTrue("Resource Queue state is PASSIVE",
+ resourceQueue.getStatus().getState().equals(DistributionQueueState.PASSIVE));
+ assertEquals(MAX_ENTRIES, resourceQueue.getStatus().getItemsCount());
+ } finally {
+ resourceQueueProvider.disableQueueProcessing(); // expect exception
+ resourceQueue.clear(Integer.MAX_VALUE);
+ }
+ }
+
+ @Test(expected = IllegalArgumentException.class)
+ public void testInvalidQueueProviderConstruction_1() {
+ constructIllegalResourceQueueProvider(IllegalQueueProviderType.MISSING_BUNDLE_CONTEXT);
+ }
+
+ @Test(expected = IllegalArgumentException.class)
+ public void testInvalidQueueProviderConstruction_2() {
+ constructIllegalResourceQueueProvider(IllegalQueueProviderType.MISSING_RESOLVER_FACTORY);
+ }
+
+ @Test(expected = IllegalArgumentException.class)
+ public void testInvalidQueueProviderConstruction_3() {
+ constructIllegalResourceQueueProvider(IllegalQueueProviderType.MISSING_SERVICENAME);
+ }
+
+ @Test(expected = IllegalArgumentException.class)
+ public void testInvalidQueueProviderConstruction_4() {
+ constructIllegalResourceQueueProvider(IllegalQueueProviderType.MISSING_AGENTNAME);
+ }
+
+ @Test(expected = IllegalArgumentException.class)
+ public void testInvalidQueueProviderConstruction_5() {
+ constructIllegalResourceQueueProvider(IllegalQueueProviderType.MISSING_SCHEDULER_WHEN_ACTIVE);
+ }
+
+ private void populateDistributionQueue(DistributionQueue queue, int maxEntries) {
+ for (int i = 0; i < maxEntries; i++) {
+ queue.add(new DistributionQueueItem(PACKAGE_ID, Collections.<String, Object>emptyMap()));
+ }
+ }
+
+ private ResourceQueueProvider constructIllegalResourceQueueProvider(IllegalQueueProviderType type) {
+ switch(type) {
+ case MISSING_BUNDLE_CONTEXT:
+ return new ResourceQueueProvider(null,
+ rrf, "test", "testAgent", scheduler, true);
+ case MISSING_RESOLVER_FACTORY:
+ return new ResourceQueueProvider(bundleContext,
+ null, "test", "testAgent", scheduler, true);
+ case MISSING_SERVICENAME:
+ return new ResourceQueueProvider(bundleContext,
+ rrf, null, "testAgent", scheduler, true);
+ case MISSING_AGENTNAME:
+ return new ResourceQueueProvider(bundleContext,
+ rrf, "test", null, scheduler, true);
+ case MISSING_SCHEDULER_WHEN_ACTIVE:
+ default:
+ return new ResourceQueueProvider(bundleContext,
+ rrf, "test", "testAgent", null, true);
+ }
+ }
+
+ private enum IllegalQueueProviderType {
+ MISSING_BUNDLE_CONTEXT,
+ MISSING_RESOLVER_FACTORY,
+ MISSING_SERVICENAME,
+ MISSING_AGENTNAME,
+ MISSING_SCHEDULER_WHEN_ACTIVE,
+ }
+
+ @BeforeClass
+ public static void setUp() throws LoginException {
+ bundleContext = MockOsgi.newBundleContext();
+ MockSling.setAdapterManagerBundleContext(bundleContext);
+ rrf = MockSling.newResourceResolverFactory(ResourceResolverType.JCR_OAK, bundleContext);
+ scheduler = mock(Scheduler.class);
+ ScheduleOptions mockScheduleOptions = mock(ScheduleOptions.class);
+ when(mockScheduleOptions.canRunConcurrently(Matchers.anyBoolean())).thenReturn(mockScheduleOptions);
+ when(mockScheduleOptions.onSingleInstanceOnly(Matchers.anyBoolean())).thenReturn(mockScheduleOptions);
+ when(mockScheduleOptions.name(Matchers.anyString())).thenReturn(mockScheduleOptions);
+ executorService = Executors.newSingleThreadScheduledExecutor();
+ when(scheduler.NOW(Matchers.anyInt(), Matchers.anyLong())).thenReturn(mockScheduleOptions);
+ when(scheduler.schedule(Matchers.any(Runnable.class), Matchers.any(ScheduleOptions.class)))
+ .thenAnswer(new Answer<Boolean>() {
+ @Override
+ public Boolean answer(InvocationOnMock invocation) throws Throwable {
+ Runnable task = (Runnable) invocation.getArguments()[0];
+ executorService.scheduleAtFixedRate(task, 0L, 1L, TimeUnit.SECONDS);
+ return true;
+ }
+ });
+ when(scheduler.unschedule(Matchers.anyString())).thenReturn(true);
+ }
+
+ @AfterClass
+ public static void tearDown() {
+ MockSling.clearAdapterManagerBundleContext();
+ executorService.shutdownNow();
+ }
+}