SLING-8853 Adds test for ActiveResourceQueue
diff --git a/src/test/java/org/apache/sling/distribution/queue/impl/resource/ResourceQueueProcessingTest.java b/src/test/java/org/apache/sling/distribution/queue/impl/resource/ResourceQueueProcessingTest.java
new file mode 100644
index 0000000..e477747
--- /dev/null
+++ b/src/test/java/org/apache/sling/distribution/queue/impl/resource/ResourceQueueProcessingTest.java
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.sling.distribution.queue.impl.resource;
+
+import org.apache.sling.api.resource.LoginException;
+import org.apache.sling.api.resource.PersistenceException;
+import org.apache.sling.api.resource.ResourceResolverFactory;
+import org.apache.sling.commons.scheduler.ScheduleOptions;
+import org.apache.sling.commons.scheduler.Scheduler;
+import org.apache.sling.distribution.common.DistributionException;
+import org.apache.sling.distribution.queue.DistributionQueueEntry;
+import org.apache.sling.distribution.queue.DistributionQueueItem;
+import org.apache.sling.distribution.queue.DistributionQueueState;
+import org.apache.sling.distribution.queue.impl.DistributionQueueProcessor;
+import org.apache.sling.distribution.queue.impl.DistributionQueueProvider;
+import org.apache.sling.distribution.queue.spi.DistributionQueue;
+import org.apache.sling.testing.mock.osgi.MockOsgi;
+import org.apache.sling.testing.mock.sling.MockSling;
+import org.apache.sling.testing.mock.sling.ResourceResolverType;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.mockito.Matchers;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+import org.osgi.framework.BundleContext;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.eq;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import java.util.Collections;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+
+public class ResourceQueueProcessingTest {
+
+ public static final Logger log = LoggerFactory.getLogger(ResourceQueueProcessingTest.class);
+
+ protected static BundleContext bundleContext = null;
+ protected static ResourceResolverFactory rrf = null;
+ protected static Scheduler scheduler = null;
+ protected static ScheduledExecutorService executorService = null;
+
+ @Test
+ public void testActiveResourceQueue() throws DistributionException, PersistenceException, LoginException {
+ // obtain an active queue provider instance
+ final String QUEUE_NAME = "testQueue";
+ final String PACKAGE_ID = "testPackageId";
+ final int MAX_ENTRIES = 32;
+
+ DistributionQueueProvider resourceQueueProvider = new ResourceQueueProvider(bundleContext,
+ rrf, "test", "testAgent", scheduler, true);
+ DistributionQueueProcessor mockResourceQueueProcessor = mock(DistributionQueueProcessor.class);
+
+ DistributionQueue resourceQueue = resourceQueueProvider.getQueue(QUEUE_NAME);
+
+ try {
+ for (int i = 0; i < MAX_ENTRIES; i++) {
+ resourceQueue.add(new DistributionQueueItem(PACKAGE_ID, Collections.<String, Object>emptyMap()));
+ }
+
+ assertTrue("Resource Queue state is not RUNNING",
+ resourceQueue.getStatus().getState().equals(DistributionQueueState.RUNNING));
+ assertEquals(MAX_ENTRIES, resourceQueue.getStatus().getItemsCount());
+
+ when(mockResourceQueueProcessor.process(eq(QUEUE_NAME), Matchers.any(DistributionQueueEntry.class)))
+ .thenReturn(true);
+
+ resourceQueueProvider.enableQueueProcessing(mockResourceQueueProcessor, QUEUE_NAME);
+ while (!resourceQueue.getStatus().getState().equals(DistributionQueueState.IDLE)) {
+ // do nothing, wait for processing to finish
+ log.info("Processing Resource Queue. Items remaining: {}",
+ resourceQueue.getStatus().getItemsCount());
+ }
+
+ assertEquals(0, resourceQueue.getStatus().getItemsCount());
+ } finally {
+ resourceQueue.clear(Integer.MAX_VALUE);
+ }
+ }
+
+ @BeforeClass
+ public static void setUp() throws LoginException {
+ bundleContext = MockOsgi.newBundleContext();
+ MockSling.setAdapterManagerBundleContext(bundleContext);
+ rrf = MockSling.newResourceResolverFactory(ResourceResolverType.JCR_OAK, bundleContext);
+ scheduler = mock(Scheduler.class);
+ ScheduleOptions mockScheduleOptions = mock(ScheduleOptions.class);
+ when(mockScheduleOptions.canRunConcurrently(Matchers.anyBoolean())).thenReturn(mockScheduleOptions);
+ when(mockScheduleOptions.onSingleInstanceOnly(Matchers.anyBoolean())).thenReturn(mockScheduleOptions);
+ when(mockScheduleOptions.name(Matchers.anyString())).thenReturn(mockScheduleOptions);
+ executorService = Executors.newSingleThreadScheduledExecutor();
+ when(scheduler.NOW(Matchers.anyInt(), Matchers.anyLong())).thenReturn(mockScheduleOptions);
+ when(scheduler.schedule(Matchers.any(Runnable.class), Matchers.any(ScheduleOptions.class)))
+ .thenAnswer(new Answer<Boolean>() {
+ @Override
+ public Boolean answer(InvocationOnMock invocation) throws Throwable {
+ Runnable task = (Runnable) invocation.getArguments()[0];
+ executorService.scheduleAtFixedRate(task, 0L, 1L, TimeUnit.SECONDS);
+ return true;
+ }
+ });
+ }
+
+ @AfterClass
+ public static void tearDown() {
+ MockSling.clearAdapterManagerBundleContext();
+ executorService.shutdownNow();
+ }
+}