SLING-8945 ActiveResourceQueue doesn't provide cluster-consistent processing-attempts view
* refactors the code a bit to contain status recording within queue instances themselves, freeing Queue Providers
from the responsibility of maintaining them for the queues they create
diff --git a/src/main/java/org/apache/sling/distribution/queue/impl/resource/ActiveResourceQueue.java b/src/main/java/org/apache/sling/distribution/queue/impl/resource/ActiveResourceQueue.java
index 6a1339c..1921c22 100644
--- a/src/main/java/org/apache/sling/distribution/queue/impl/resource/ActiveResourceQueue.java
+++ b/src/main/java/org/apache/sling/distribution/queue/impl/resource/ActiveResourceQueue.java
@@ -71,4 +71,20 @@
DistributionUtils.safelyLogout(resourceResolver);
}
}
+
+ public void recordProcessingAttempt(@NotNull DistributionQueueEntry entry) {
+ ResourceResolver resourceResolver = null;
+ try {
+ resourceResolver = DistributionUtils.loginService(resolverFactory, serviceName);
+ Resource queueRoot = ResourceQueueUtils.getRootResource(resourceResolver, queueRootPath);
+ Resource queueItemResource = ResourceQueueUtils.getResourceById(queueRoot, entry.getId());
+ ResourceQueueUtils.incrementProcessingAttemptForQueueItem(queueItemResource);
+ resourceResolver.commit();
+ log.debug("incremented processing-attempt for queue entry with id: {}", entry.getId());
+ } catch (Exception e) {
+ log.warn("Couldn't increment processing-attempt for queue entry with id: {}", entry.getId());
+ } finally {
+ DistributionUtils.safelyLogout(resourceResolver);
+ }
+ }
}
diff --git a/src/main/java/org/apache/sling/distribution/queue/impl/resource/ResourceQueueProvider.java b/src/main/java/org/apache/sling/distribution/queue/impl/resource/ResourceQueueProvider.java
index 221501a..f35f2ce 100644
--- a/src/main/java/org/apache/sling/distribution/queue/impl/resource/ResourceQueueProvider.java
+++ b/src/main/java/org/apache/sling/distribution/queue/impl/resource/ResourceQueueProvider.java
@@ -23,7 +23,7 @@
import org.apache.sling.commons.scheduler.ScheduleOptions;
import org.apache.sling.commons.scheduler.Scheduler;
import org.apache.sling.distribution.common.DistributionException;
-import org.apache.sling.distribution.queue.DistributionQueueItemStatus;
+import org.apache.sling.distribution.queue.DistributionQueueEntry;
import org.apache.sling.distribution.queue.DistributionQueueType;
import org.apache.sling.distribution.queue.impl.DistributionQueueProcessor;
import org.apache.sling.distribution.queue.impl.DistributionQueueProvider;
@@ -40,6 +40,7 @@
import java.util.Hashtable;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.function.Consumer;
public class ResourceQueueProvider implements DistributionQueueProvider {
public static final String TYPE = "resource";
@@ -57,9 +58,6 @@
private final Map<String, ResourceQueue> queueMap = new ConcurrentHashMap<>();
- private final Map<String, Map<String, DistributionQueueItemStatus>> statusMap = new ConcurrentHashMap<>();
-
-
private ServiceRegistration<Runnable> cleanupTask;
public ResourceQueueProvider(BundleContext context, ResourceResolverFactory resolverFactory,
@@ -81,13 +79,11 @@
@NotNull
@Override
public DistributionQueue getQueue(@NotNull String queueName) throws DistributionException {
- String key = getKey(queueName);
- return queueMap.computeIfAbsent(key, k -> {
- statusMap.put(key, new ConcurrentHashMap<>());
+ return queueMap.computeIfAbsent(queueName, name -> {
if (isActive) {
- return new ActiveResourceQueue(resolverFactory, serviceName, queueName, agentRootPath);
+ return new ActiveResourceQueue(resolverFactory, serviceName, name, agentRootPath);
} else {
- return new ResourceQueue(resolverFactory, serviceName, queueName, agentRootPath);
+ return new ResourceQueue(resolverFactory, serviceName, name, agentRootPath);
}
});
}
@@ -111,8 +107,13 @@
.canRunConcurrently(false)
.onSingleInstanceOnly(true)
.name(getJobName(queueName));
- scheduler.schedule(new SimpleDistributionQueueProcessor(getQueue(queueName), queueProcessor,
- statusMap.get(getKey(queueName))), options);
+ DistributionQueue queueImpl = getQueue(queueName);
+ Consumer<DistributionQueueEntry> processingAttemptRecorder = null;
+ if (isActive) {
+ processingAttemptRecorder = ((ActiveResourceQueue)queueImpl)::recordProcessingAttempt;
+ }
+ scheduler.schedule(new SimpleDistributionQueueProcessor(queueImpl, queueProcessor, processingAttemptRecorder),
+ options);
}
} else {
throw new DistributionException(new UnsupportedOperationException("enable Processing not supported for Passive Queues"));
@@ -150,10 +151,6 @@
cleanupTask = context.registerService(Runnable.class, cleanup, props);
}
- private String getKey(String queueName) {
- return agentName + "#" + queueName;
- }
-
public void close() {
if (cleanupTask != null) {
cleanupTask.unregister();
diff --git a/src/main/java/org/apache/sling/distribution/queue/impl/resource/ResourceQueueUtils.java b/src/main/java/org/apache/sling/distribution/queue/impl/resource/ResourceQueueUtils.java
index a54767e..ccd76a2 100644
--- a/src/main/java/org/apache/sling/distribution/queue/impl/resource/ResourceQueueUtils.java
+++ b/src/main/java/org/apache/sling/distribution/queue/impl/resource/ResourceQueueUtils.java
@@ -20,6 +20,7 @@
package org.apache.sling.distribution.queue.impl.resource;
+import org.apache.sling.api.resource.ModifiableValueMap;
import org.apache.sling.api.resource.PersistenceException;
import org.apache.sling.api.resource.Resource;
import org.apache.sling.api.resource.ResourceResolver;
@@ -70,6 +71,7 @@
private static final String DISTRIBUTION_PACKAGE_ID = DISTRIBUTION_PACKAGE_PREFIX + "item.id";
private static final String DISTRIBUTION_PACKAGE_SIZE = DISTRIBUTION_PACKAGE_PREFIX + "package.size";
private static final String ENTERED_DATE = "entered.date";
+ private static final String PROCESSING_ATTEMPTS = "processing.attempts";
private static final AtomicLong itemCounter = new AtomicLong(0);
@@ -145,8 +147,9 @@
ValueMap valueMap = resource.getValueMap();
DistributionQueueItem queueItem = deserializeItem(valueMap);
Calendar entered = valueMap.get(ENTERED_DATE, Calendar.getInstance());
+ int attempts = valueMap.get(PROCESSING_ATTEMPTS, 0);
DistributionQueueItemStatus queueItemStatus = new DistributionQueueItemStatus(entered,
- DistributionQueueItemState.QUEUED, 0, queueName);
+ DistributionQueueItemState.QUEUED, attempts, queueName);
String entryId = getIdFromPath(queueRoot.getPath(), resource.getPath());
@@ -420,4 +423,10 @@
return itemId.replace(ID_START, "").replace("--", "/");
}
+ public static void incrementProcessingAttemptForQueueItem(Resource queueItemResource) {
+ ValueMap vm = queueItemResource.adaptTo(ModifiableValueMap.class);
+ int attempts = vm.get(PROCESSING_ATTEMPTS, 0);
+ vm.put(PROCESSING_ATTEMPTS, attempts + 1);
+ }
+
}
diff --git a/src/main/java/org/apache/sling/distribution/queue/impl/simple/SimpleDistributionQueue.java b/src/main/java/org/apache/sling/distribution/queue/impl/simple/SimpleDistributionQueue.java
index 6b4a2c1..85a08b1 100644
--- a/src/main/java/org/apache/sling/distribution/queue/impl/simple/SimpleDistributionQueue.java
+++ b/src/main/java/org/apache/sling/distribution/queue/impl/simple/SimpleDistributionQueue.java
@@ -27,6 +27,7 @@
import java.util.Map;
import java.util.Queue;
import java.util.Set;
+import java.util.WeakHashMap;
import java.util.concurrent.LinkedBlockingQueue;
import org.apache.sling.distribution.queue.spi.DistributionQueue;
@@ -67,13 +68,13 @@
private final Queue<DistributionQueueItem> queue;
- private final Map<String, DistributionQueueItemStatus> statusMap;
+ private final Map<DistributionQueueItem, DistributionQueueItemStatus> statusMap;
- public SimpleDistributionQueue(String agentName, String name, Map<String, DistributionQueueItemStatus> statusMap) {
+ public SimpleDistributionQueue(String agentName, String name) {
log.debug("starting a simple queue {} for agent {}", name, agentName);
this.name = name;
this.queue = new LinkedBlockingQueue<DistributionQueueItem>();
- this.statusMap = statusMap;
+ this.statusMap = new WeakHashMap<DistributionQueueItem, DistributionQueueItemStatus>(10);
}
@NotNull
@@ -84,19 +85,17 @@
public DistributionQueueEntry add(@NotNull DistributionQueueItem item) {
DistributionQueueItemState itemState = DistributionQueueItemState.ERROR;
boolean result = false;
- String entryId = item.getPackageId();
try {
result = queue.offer(item);
itemState = DistributionQueueItemState.QUEUED;
} catch (Exception e) {
log.error("cannot add an item to the queue", e);
} finally {
- statusMap.put(entryId, new DistributionQueueItemStatus(Calendar.getInstance(), itemState, 0, name));
+ statusMap.put(item, new DistributionQueueItemStatus(Calendar.getInstance(), itemState, 0, name));
}
if (result) {
-
- return new DistributionQueueEntry(item.getPackageId(), item, statusMap.get(entryId));
+ return new DistributionQueueEntry(item.getPackageId(), item, statusMap.get(item));
}
return null;
@@ -107,7 +106,7 @@
public DistributionQueueEntry getHead() {
DistributionQueueItem element = queue.peek();
if (element != null) {
- DistributionQueueItemStatus itemState = statusMap.get(element.getPackageId());
+ DistributionQueueItemStatus itemState = statusMap.get(element);
return new DistributionQueueEntry(element.getPackageId(), element, itemState);
}
@@ -117,7 +116,7 @@
@NotNull
private DistributionQueueState getState() {
DistributionQueueItem firstItem = queue.peek();
- DistributionQueueItemStatus firstItemStatus = firstItem != null ? statusMap.get(firstItem.getPackageId()) : null;
+ DistributionQueueItemStatus firstItemStatus = firstItem != null ? statusMap.get(firstItem) : null;
return DistributionQueueUtils.calculateState(firstItem, firstItemStatus);
}
@@ -144,7 +143,7 @@
List<DistributionQueueEntry> result = new ArrayList<DistributionQueueEntry>();
for (DistributionQueueItem item : queue) {
- result.add(new DistributionQueueEntry(item.getPackageId(), item, statusMap.get(item.getPackageId())));
+ result.add(new DistributionQueueEntry(item.getPackageId(), item, statusMap.get(item)));
}
return result;
}
@@ -153,7 +152,7 @@
public DistributionQueueEntry getEntry(@NotNull String id) {
for (DistributionQueueItem item : queue) {
if (id.equals(item.getPackageId())) {
- return new DistributionQueueEntry(id, item, statusMap.get(item.getPackageId()));
+ return new DistributionQueueEntry(id, item, statusMap.get(item));
}
}
@@ -180,7 +179,6 @@
boolean removed = false;
if (toRemove != null) {
removed = queue.remove(toRemove.getItem());
- statusMap.remove(id);
}
log.debug("item with id {} removed from the queue: {}", id, removed);
if (removed) {
@@ -210,4 +208,10 @@
return removedEntries;
}
+ public void recordProcessingAttempt(@NotNull DistributionQueueEntry entry) {
+ statusMap.computeIfPresent(entry.getItem(), (item, status) -> {
+ return new DistributionQueueItemStatus(status.getEntered(),
+ status.getItemState(), status.getAttempts() + 1, status.getQueueName());
+ });
+ }
}
diff --git a/src/main/java/org/apache/sling/distribution/queue/impl/simple/SimpleDistributionQueueProcessor.java b/src/main/java/org/apache/sling/distribution/queue/impl/simple/SimpleDistributionQueueProcessor.java
index 462895e..3cf267b 100644
--- a/src/main/java/org/apache/sling/distribution/queue/impl/simple/SimpleDistributionQueueProcessor.java
+++ b/src/main/java/org/apache/sling/distribution/queue/impl/simple/SimpleDistributionQueueProcessor.java
@@ -20,10 +20,8 @@
import org.apache.sling.distribution.queue.spi.DistributionQueue;
-import java.util.Map;
-
+import java.util.function.Consumer;
import org.apache.sling.distribution.queue.DistributionQueueEntry;
-import org.apache.sling.distribution.queue.DistributionQueueItemStatus;
import org.apache.sling.distribution.queue.impl.DistributionQueueProcessor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -36,34 +34,35 @@
private final Logger log = LoggerFactory.getLogger(getClass());
private final DistributionQueue queue;
private final DistributionQueueProcessor queueProcessor;
- private final Map<String, DistributionQueueItemStatus> statusMap;
+ private Consumer<DistributionQueueEntry> recordProcessingAttempt;
public SimpleDistributionQueueProcessor(DistributionQueue queue,
DistributionQueueProcessor queueProcessor,
- Map<String, DistributionQueueItemStatus> statusMap) {
+ Consumer<DistributionQueueEntry> processingAttemptRecorder) {
this.queue = queue;
this.queueProcessor = queueProcessor;
- this.statusMap = statusMap;
+ this.recordProcessingAttempt = (null != processingAttemptRecorder)?
+ processingAttemptRecorder:
+ (entry) -> {};
}
public void run() {
try {
DistributionQueueEntry entry;
while ((entry = queue.getHead()) != null) {
- DistributionQueueItemStatus itemStatus = entry.getStatus();
- statusMap.put(entry.getId(), new DistributionQueueItemStatus(itemStatus.getEntered(),
- itemStatus.getItemState(), itemStatus.getAttempts() + 1, queue.getName()));
- if (queueProcessor.process(queue.getName(), entry)) {
- if (queue.remove(entry.getId()) != null) {
- log.debug("item {} processed and removed from the queue", entry.getItem());
- }
+ boolean wasProcessed = queueProcessor.process(queue.getName(), entry);
+ boolean wasRemoved = wasProcessed?
+ (queue.remove(entry.getId()) != null):
+ false;
+ if (wasProcessed && wasRemoved) {
+ log.debug("item {} processed and removed from the queue", entry.getItem());
} else {
- log.warn("processing of item {} failed", entry.getId());
+ log.warn("processing and removal of item {} failed; will reattempt", entry.getId());
+ this.recordProcessingAttempt.accept(entry);
}
}
} catch (Exception e) {
log.error("error while processing queue {}", e);
}
-
}
}
diff --git a/src/main/java/org/apache/sling/distribution/queue/impl/simple/SimpleDistributionQueueProvider.java b/src/main/java/org/apache/sling/distribution/queue/impl/simple/SimpleDistributionQueueProvider.java
index bc3d3c7..9315ee0 100644
--- a/src/main/java/org/apache/sling/distribution/queue/impl/simple/SimpleDistributionQueueProvider.java
+++ b/src/main/java/org/apache/sling/distribution/queue/impl/simple/SimpleDistributionQueueProvider.java
@@ -27,14 +27,15 @@
import java.util.Collection;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.function.Consumer;
import org.apache.commons.io.IOUtils;
import org.apache.commons.io.LineIterator;
import org.apache.sling.commons.scheduler.ScheduleOptions;
import org.apache.sling.commons.scheduler.Scheduler;
import org.apache.sling.distribution.queue.spi.DistributionQueue;
+import org.apache.sling.distribution.queue.DistributionQueueEntry;
import org.apache.sling.distribution.queue.DistributionQueueItem;
-import org.apache.sling.distribution.queue.DistributionQueueItemStatus;
import org.apache.sling.distribution.queue.impl.DistributionQueueProcessor;
import org.apache.sling.distribution.queue.impl.DistributionQueueProvider;
import org.apache.sling.distribution.queue.DistributionQueueType;
@@ -56,8 +57,7 @@
private final String name;
private final Scheduler scheduler;
- private final Map<String, SimpleDistributionQueue> queueMap = new ConcurrentHashMap<>();
- private final Map<String, Map<String, DistributionQueueItemStatus>> statusMap = new ConcurrentHashMap<>();
+ private final Map<String, SimpleDistributionQueue> queueMap = new ConcurrentHashMap<String, SimpleDistributionQueue>();
private final boolean checkpoint;
private File checkpointDirectory;
@@ -92,11 +92,8 @@
SimpleDistributionQueue queue = queueMap.get(key);
if (queue == null) {
log.debug("creating a queue with key {}", key);
- Map<String, DistributionQueueItemStatus> queueStatusMap
- = new ConcurrentHashMap<>();
- queue = new SimpleDistributionQueue(name, queueName, queueStatusMap);
+ queue = new SimpleDistributionQueue(name, queueName);
queueMap.put(key, queue);
- statusMap.put(key, queueStatusMap);
log.debug("queue created {}", queue);
}
return queue;
@@ -155,9 +152,14 @@
// enable processing
for (String queueName : queueNames) {
- ScheduleOptions options = scheduler.NOW(-1, 1).canRunConcurrently(false).name(getJobName(queueName));
- scheduler.schedule(new SimpleDistributionQueueProcessor(getQueue(queueName), queueProcessor,
- statusMap.get(getKey(queueName))), options);
+ ScheduleOptions options = scheduler.NOW(-1, 1)
+ .canRunConcurrently(false)
+ .name(getJobName(queueName));
+ DistributionQueue queueImpl = getQueue(queueName);
+ Consumer<DistributionQueueEntry> processingAttemptRecorder =
+ ((SimpleDistributionQueue)queueImpl)::recordProcessingAttempt;
+ scheduler.schedule(new SimpleDistributionQueueProcessor(getQueue(queueName), queueProcessor, processingAttemptRecorder),
+ options);
}
}
diff --git a/src/test/java/org/apache/sling/distribution/agent/impl/SimpleDistributionAgentTest.java b/src/test/java/org/apache/sling/distribution/agent/impl/SimpleDistributionAgentTest.java
index 433d2ff..aae6f7b 100644
--- a/src/test/java/org/apache/sling/distribution/agent/impl/SimpleDistributionAgentTest.java
+++ b/src/test/java/org/apache/sling/distribution/agent/impl/SimpleDistributionAgentTest.java
@@ -21,7 +21,6 @@
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
-import java.util.HashMap;
import org.apache.sling.api.resource.ResourceResolver;
import org.apache.sling.api.resource.ResourceResolverFactory;
import org.apache.sling.distribution.DistributionRequest;
@@ -125,7 +124,7 @@
when(queueProvider.getQueue(DistributionQueueDispatchingStrategy.DEFAULT_QUEUE_NAME)).thenReturn(
- new SimpleDistributionQueue(name, "name", new HashMap<String, DistributionQueueItemStatus>()));
+ new SimpleDistributionQueue(name, "name"));
DistributionResponse response = agent.execute(resourceResolver, request);
assertNotNull(response);
assertEquals("[ERROR]", response.getMessage());
@@ -165,7 +164,7 @@
}
}).when(packageExporter).exportPackages(any(ResourceResolver.class), any(DistributionRequest.class), any(DistributionPackageProcessor.class));
when(queueProvider.getQueue(DistributionQueueDispatchingStrategy.DEFAULT_QUEUE_NAME)).thenReturn(
- new SimpleDistributionQueue(name, "name", new HashMap<String, DistributionQueueItemStatus>()));
+ new SimpleDistributionQueue(name, "name"));
DistributionResponse response = agent.execute(resourceResolver, request);
assertNotNull(response);
assertEquals("[QUEUED]", response.getMessage());
@@ -205,7 +204,7 @@
return null;
}
}).when(packageExporter).exportPackages(any(ResourceResolver.class), any(DistributionRequest.class), any(DistributionPackageProcessor.class)); when(queueProvider.getQueue(DistributionQueueDispatchingStrategy.DEFAULT_QUEUE_NAME)).thenReturn(
- new SimpleDistributionQueue(name, "name", new HashMap<String, DistributionQueueItemStatus>()));
+ new SimpleDistributionQueue(name, "name"));
agent.execute(resourceResolver, request);
}
@@ -298,7 +297,7 @@
return null;
}
}).when(packageExporter).exportPackages(any(ResourceResolver.class), any(DistributionRequest.class), any(DistributionPackageProcessor.class)); when(queueProvider.getQueue(DistributionQueueDispatchingStrategy.DEFAULT_QUEUE_NAME)).thenReturn(
- new SimpleDistributionQueue(name, "name", new HashMap<String, DistributionQueueItemStatus>()));
+ new SimpleDistributionQueue(name, "name"));
DistributionResponse response = agent.execute(resourceResolver, request);
@@ -344,7 +343,7 @@
return null;
}
}).when(packageExporter).exportPackages(any(ResourceResolver.class), any(DistributionRequest.class), any(DistributionPackageProcessor.class)); when(queueProvider.getQueue(DistributionQueueDispatchingStrategy.DEFAULT_QUEUE_NAME)).thenReturn(
- new SimpleDistributionQueue(name, "name", new HashMap<String, DistributionQueueItemStatus>()));
+ new SimpleDistributionQueue(name, "name"));
DistributionResponse response = agent.execute(resourceResolver, request);
diff --git a/src/test/java/org/apache/sling/distribution/queue/impl/resource/ResourceQueueProcessingTest.java b/src/test/java/org/apache/sling/distribution/queue/impl/resource/ResourceQueueProcessingTest.java
index 68d08fa..6e6630f 100644
--- a/src/test/java/org/apache/sling/distribution/queue/impl/resource/ResourceQueueProcessingTest.java
+++ b/src/test/java/org/apache/sling/distribution/queue/impl/resource/ResourceQueueProcessingTest.java
@@ -87,7 +87,7 @@
assertEquals(MAX_ENTRIES, resourceQueue.getStatus().getItemsCount());
when(mockResourceQueueProcessor.process(eq(QUEUE_NAME), Matchers.any(DistributionQueueEntry.class)))
- .thenReturn(true);
+ .thenReturn(false, true);
resourceQueueProvider.enableQueueProcessing(mockResourceQueueProcessor, QUEUE_NAME);
while (!resourceQueue.getStatus().getState().equals(DistributionQueueState.IDLE)) {
diff --git a/src/test/java/org/apache/sling/distribution/queue/impl/simple/SimpleDistributionQueueProcessorTest.java b/src/test/java/org/apache/sling/distribution/queue/impl/simple/SimpleDistributionQueueProcessorTest.java
index b219f2d..ec7d5c5 100644
--- a/src/test/java/org/apache/sling/distribution/queue/impl/simple/SimpleDistributionQueueProcessorTest.java
+++ b/src/test/java/org/apache/sling/distribution/queue/impl/simple/SimpleDistributionQueueProcessorTest.java
@@ -21,6 +21,7 @@
import java.util.Collection;
import java.util.HashMap;
import java.util.LinkedList;
+
import org.apache.sling.distribution.queue.DistributionQueueItemState;
import org.apache.sling.distribution.queue.DistributionQueueItemStatus;
import org.apache.sling.distribution.queue.spi.DistributionQueue;
@@ -28,7 +29,6 @@
import org.apache.sling.distribution.queue.DistributionQueueItem;
import org.apache.sling.distribution.queue.impl.DistributionQueueProcessor;
import org.junit.Test;
-
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
@@ -42,7 +42,7 @@
DistributionQueue queue = mock(DistributionQueue.class);
DistributionQueueProcessor queueProcessor = mock(DistributionQueueProcessor.class);
SimpleDistributionQueueProcessor simpleDistributionQueueProcessor = new SimpleDistributionQueueProcessor(
- queue, queueProcessor, new HashMap<String, DistributionQueueItemStatus>());
+ queue, queueProcessor, null);
simpleDistributionQueueProcessor.run();
}
@@ -56,7 +56,7 @@
when(queueProvider.getQueues()).thenReturn(queues);
DistributionQueueProcessor queueProcessor = mock(DistributionQueueProcessor.class);
SimpleDistributionQueueProcessor simpleDistributionQueueProcessor = new SimpleDistributionQueueProcessor(
- queue, queueProcessor, new HashMap<String, DistributionQueueItemStatus>());
+ queue, queueProcessor, null);
simpleDistributionQueueProcessor.run();
}
@@ -73,7 +73,7 @@
when(queueProvider.getQueues()).thenReturn(queues);
DistributionQueueProcessor queueProcessor = mock(DistributionQueueProcessor.class);
SimpleDistributionQueueProcessor simpleDistributionQueueProcessor = new SimpleDistributionQueueProcessor(
- queue, queueProcessor, new HashMap<String, DistributionQueueItemStatus>());
+ queue, queueProcessor, queue::recordProcessingAttempt);
simpleDistributionQueueProcessor.run();
}
}
\ No newline at end of file
diff --git a/src/test/java/org/apache/sling/distribution/queue/impl/simple/SimpleDistributionQueueTest.java b/src/test/java/org/apache/sling/distribution/queue/impl/simple/SimpleDistributionQueueTest.java
index aadd3a4..169b871 100644
--- a/src/test/java/org/apache/sling/distribution/queue/impl/simple/SimpleDistributionQueueTest.java
+++ b/src/test/java/org/apache/sling/distribution/queue/impl/simple/SimpleDistributionQueueTest.java
@@ -19,6 +19,7 @@
package org.apache.sling.distribution.queue.impl.simple;
import java.util.HashMap;
+
import org.apache.sling.distribution.queue.spi.DistributionQueue;
import org.apache.sling.distribution.queue.DistributionQueueEntry;
import org.apache.sling.distribution.queue.DistributionQueueItem;
@@ -38,8 +39,7 @@
@Test
public void testPackageAddition() throws Exception {
- DistributionQueue queue = new SimpleDistributionQueue("agentName", "default",
- new HashMap<String, DistributionQueueItemStatus>());
+ DistributionQueue queue = new SimpleDistributionQueue("agentName", "default");
DistributionQueueItem pkg = new DistributionQueueItem("packageId", new HashMap<String, Object>());
assertNotNull(queue.add(pkg));
assertFalse(queue.getStatus().isEmpty());
@@ -47,8 +47,7 @@
@Test
public void testPackageAdditionAndRemoval() throws Exception {
- DistributionQueue queue = new SimpleDistributionQueue("agentName", "default",
- new HashMap<String, DistributionQueueItemStatus>());
+ DistributionQueue queue = new SimpleDistributionQueue("agentName", "default");
DistributionQueueItem pkg = new DistributionQueueItem("id", new HashMap<String, Object>());
assertNotNull(queue.add(pkg));
assertFalse(queue.getStatus().isEmpty());
@@ -60,18 +59,26 @@
@Test
public void testPackageAdditionRetrievalAndRemoval() throws Exception {
- DistributionQueue queue = new SimpleDistributionQueue("agentName", "default",
- new HashMap<String, DistributionQueueItemStatus>());
+ DistributionQueue queue = new SimpleDistributionQueue("agentName", "default");
DistributionQueueItem pkg = new DistributionQueueItem("id", new HashMap<String, Object>());
assertNotNull(queue.add(pkg));
assertFalse(queue.getStatus().isEmpty());
- assertEquals(pkg, queue.getHead().getItem());
- assertFalse(queue.getStatus().isEmpty());
+ DistributionQueueEntry entry = queue.getHead();
+
DistributionQueueItemStatus status = queue.getEntry(pkg.getPackageId()).getStatus();
+ assertNotNull(status);
+ assertEquals(0, status.getAttempts());
+
+ ((SimpleDistributionQueue)queue).recordProcessingAttempt(entry);
+
+ assertEquals(pkg, entry.getItem());
+ assertFalse(queue.getStatus().isEmpty());
+
+ status = queue.getEntry(pkg.getPackageId()).getStatus();
assertNotNull(queue.remove(pkg.getPackageId()));
assertTrue(queue.getStatus().isEmpty());
assertNotNull(status);
- assertEquals(0, status.getAttempts());
+ assertEquals(1, status.getAttempts());
}
}