SLING-8854 In-file and In-memory queue-providers for Forward Distribution report incorrect value for processing "Attempts" (#26)
diff --git a/src/main/java/org/apache/sling/distribution/queue/impl/simple/SimpleDistributionQueue.java b/src/main/java/org/apache/sling/distribution/queue/impl/simple/SimpleDistributionQueue.java
index 62e9e49..6b4a2c1 100644
--- a/src/main/java/org/apache/sling/distribution/queue/impl/simple/SimpleDistributionQueue.java
+++ b/src/main/java/org/apache/sling/distribution/queue/impl/simple/SimpleDistributionQueue.java
@@ -27,7 +27,6 @@
import java.util.Map;
import java.util.Queue;
import java.util.Set;
-import java.util.WeakHashMap;
import java.util.concurrent.LinkedBlockingQueue;
import org.apache.sling.distribution.queue.spi.DistributionQueue;
@@ -68,13 +67,13 @@
private final Queue<DistributionQueueItem> queue;
- private final Map<DistributionQueueItem, DistributionQueueItemStatus> statusMap;
+ private final Map<String, DistributionQueueItemStatus> statusMap;
- public SimpleDistributionQueue(String agentName, String name) {
+ public SimpleDistributionQueue(String agentName, String name, Map<String, DistributionQueueItemStatus> statusMap) {
log.debug("starting a simple queue {} for agent {}", name, agentName);
this.name = name;
this.queue = new LinkedBlockingQueue<DistributionQueueItem>();
- this.statusMap = new WeakHashMap<DistributionQueueItem, DistributionQueueItemStatus>(10);
+ this.statusMap = statusMap;
}
@NotNull
@@ -85,17 +84,19 @@
public DistributionQueueEntry add(@NotNull DistributionQueueItem item) {
DistributionQueueItemState itemState = DistributionQueueItemState.ERROR;
boolean result = false;
+ String entryId = item.getPackageId();
try {
result = queue.offer(item);
itemState = DistributionQueueItemState.QUEUED;
} catch (Exception e) {
log.error("cannot add an item to the queue", e);
} finally {
- statusMap.put(item, new DistributionQueueItemStatus(Calendar.getInstance(), itemState, 0, name));
+ statusMap.put(entryId, new DistributionQueueItemStatus(Calendar.getInstance(), itemState, 0, name));
}
if (result) {
- return new DistributionQueueEntry(item.getPackageId(), item, statusMap.get(item));
+
+ return new DistributionQueueEntry(item.getPackageId(), item, statusMap.get(entryId));
}
return null;
@@ -106,10 +107,7 @@
public DistributionQueueEntry getHead() {
DistributionQueueItem element = queue.peek();
if (element != null) {
- DistributionQueueItemStatus itemState = statusMap.get(element);
- statusMap.put(element, new DistributionQueueItemStatus(itemState.getEntered(),
- itemState.getItemState(),
- itemState.getAttempts() + 1, name));
+ DistributionQueueItemStatus itemState = statusMap.get(element.getPackageId());
return new DistributionQueueEntry(element.getPackageId(), element, itemState);
}
@@ -119,7 +117,7 @@
@NotNull
private DistributionQueueState getState() {
DistributionQueueItem firstItem = queue.peek();
- DistributionQueueItemStatus firstItemStatus = firstItem != null ? statusMap.get(firstItem) : null;
+ DistributionQueueItemStatus firstItemStatus = firstItem != null ? statusMap.get(firstItem.getPackageId()) : null;
return DistributionQueueUtils.calculateState(firstItem, firstItemStatus);
}
@@ -146,7 +144,7 @@
List<DistributionQueueEntry> result = new ArrayList<DistributionQueueEntry>();
for (DistributionQueueItem item : queue) {
- result.add(new DistributionQueueEntry(item.getPackageId(), item, statusMap.get(item)));
+ result.add(new DistributionQueueEntry(item.getPackageId(), item, statusMap.get(item.getPackageId())));
}
return result;
}
@@ -155,7 +153,7 @@
public DistributionQueueEntry getEntry(@NotNull String id) {
for (DistributionQueueItem item : queue) {
if (id.equals(item.getPackageId())) {
- return new DistributionQueueEntry(id, item, statusMap.get(item));
+ return new DistributionQueueEntry(id, item, statusMap.get(item.getPackageId()));
}
}
@@ -182,6 +180,7 @@
boolean removed = false;
if (toRemove != null) {
removed = queue.remove(toRemove.getItem());
+ statusMap.remove(id);
}
log.debug("item with id {} removed from the queue: {}", id, removed);
if (removed) {
diff --git a/src/main/java/org/apache/sling/distribution/queue/impl/simple/SimpleDistributionQueueProcessor.java b/src/main/java/org/apache/sling/distribution/queue/impl/simple/SimpleDistributionQueueProcessor.java
index f4d3074..5aff79c 100644
--- a/src/main/java/org/apache/sling/distribution/queue/impl/simple/SimpleDistributionQueueProcessor.java
+++ b/src/main/java/org/apache/sling/distribution/queue/impl/simple/SimpleDistributionQueueProcessor.java
@@ -19,7 +19,11 @@
package org.apache.sling.distribution.queue.impl.simple;
import org.apache.sling.distribution.queue.spi.DistributionQueue;
+
+import java.util.Map;
+
import org.apache.sling.distribution.queue.DistributionQueueEntry;
+import org.apache.sling.distribution.queue.DistributionQueueItemStatus;
import org.apache.sling.distribution.queue.impl.DistributionQueueProcessor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -32,17 +36,23 @@
private final Logger log = LoggerFactory.getLogger(getClass());
private final DistributionQueue queue;
private final DistributionQueueProcessor queueProcessor;
+ private final Map<String, DistributionQueueItemStatus> statusMap;
public SimpleDistributionQueueProcessor(DistributionQueue queue,
- DistributionQueueProcessor queueProcessor) {
+ DistributionQueueProcessor queueProcessor,
+ Map<String, DistributionQueueItemStatus> statusMap) {
this.queue = queue;
this.queueProcessor = queueProcessor;
+ this.statusMap = statusMap;
}
public void run() {
try {
DistributionQueueEntry entry;
while ((entry = queue.getHead()) != null) {
+ DistributionQueueItemStatus itemStatus = entry.getStatus();
+ statusMap.put(entry.getId(), new DistributionQueueItemStatus(itemStatus.getEntered(),
+ itemStatus.getItemState(), itemStatus.getAttempts() + 1, queue.getName()));
if (queueProcessor.process(queue.getName(), entry)) {
if (queue.remove(entry.getId()) != null) {
log.debug("item {} processed and removed from the queue", entry.getItem());
diff --git a/src/main/java/org/apache/sling/distribution/queue/impl/simple/SimpleDistributionQueueProvider.java b/src/main/java/org/apache/sling/distribution/queue/impl/simple/SimpleDistributionQueueProvider.java
index 8622fdc..91ca984 100644
--- a/src/main/java/org/apache/sling/distribution/queue/impl/simple/SimpleDistributionQueueProvider.java
+++ b/src/main/java/org/apache/sling/distribution/queue/impl/simple/SimpleDistributionQueueProvider.java
@@ -26,6 +26,7 @@
import java.io.FilenameFilter;
import java.util.Collection;
import java.util.Map;
+import java.util.WeakHashMap;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.commons.io.IOUtils;
@@ -34,6 +35,7 @@
import org.apache.sling.commons.scheduler.Scheduler;
import org.apache.sling.distribution.queue.spi.DistributionQueue;
import org.apache.sling.distribution.queue.DistributionQueueItem;
+import org.apache.sling.distribution.queue.DistributionQueueItemStatus;
import org.apache.sling.distribution.queue.impl.DistributionQueueProcessor;
import org.apache.sling.distribution.queue.impl.DistributionQueueProvider;
import org.apache.sling.distribution.queue.DistributionQueueType;
@@ -56,6 +58,8 @@
private final Scheduler scheduler;
private final Map<String, SimpleDistributionQueue> queueMap = new ConcurrentHashMap<String, SimpleDistributionQueue>();
+ private final Map<SimpleDistributionQueue, Map<String, DistributionQueueItemStatus>> statusMap
+ = new WeakHashMap<SimpleDistributionQueue, Map<String, DistributionQueueItemStatus>>();
private final boolean checkpoint;
private File checkpointDirectory;
@@ -75,7 +79,8 @@
if (!checkpointDirectory.exists()) {
created = checkpointDirectory.mkdir();
}
- log.info("checkpoint directory created: {}, exists {}", created, checkpointDirectory.isDirectory() && checkpointDirectory.exists());
+ log.info("checkpoint directory created: {}, exists {}", created,
+ checkpointDirectory.isDirectory() && checkpointDirectory.exists());
}
this.scheduler = scheduler;
@@ -89,8 +94,11 @@
SimpleDistributionQueue queue = queueMap.get(key);
if (queue == null) {
log.debug("creating a queue with key {}", key);
- queue = new SimpleDistributionQueue(name, queueName);
+ Map<String, DistributionQueueItemStatus> queueStatusMap
+ = new ConcurrentHashMap<String, DistributionQueueItemStatus>();
+ queue = new SimpleDistributionQueue(name, queueName, queueStatusMap);
queueMap.put(key, queue);
+ statusMap.put(queue, queueStatusMap);
log.debug("queue created {}", queue);
}
return queue;
@@ -125,7 +133,7 @@
try {
LineIterator lineIterator = IOUtils.lineIterator(new FileReader(qf));
while (lineIterator.hasNext()) {
- String line = lineIterator.nextLine();
+ String line = lineIterator.nextLine();
DistributionQueueItem item = mapper.readQueueItem(line);
queue.add(item);
}
@@ -140,19 +148,18 @@
// enable checkpointing
for (String queueName : queueNames) {
- ScheduleOptions options = scheduler.NOW(-1, 15)
- .canRunConcurrently(false)
+ ScheduleOptions options = scheduler.NOW(-1, 15).canRunConcurrently(false)
.name(getJobName(queueName + "-checkpoint"));
- scheduler.schedule(new SimpleDistributionQueueCheckpoint(getQueue(queueName), checkpointDirectory), options);
+ scheduler.schedule(new SimpleDistributionQueueCheckpoint(getQueue(queueName), checkpointDirectory),
+ options);
}
}
// enable processing
for (String queueName : queueNames) {
- ScheduleOptions options = scheduler.NOW(-1, 1)
- .canRunConcurrently(false)
- .name(getJobName(queueName));
- scheduler.schedule(new SimpleDistributionQueueProcessor(getQueue(queueName), queueProcessor), options);
+ ScheduleOptions options = scheduler.NOW(-1, 1).canRunConcurrently(false).name(getJobName(queueName));
+ scheduler.schedule(new SimpleDistributionQueueProcessor(getQueue(queueName), queueProcessor,
+ statusMap.get(getQueue(queueName))), options);
}
}
diff --git a/src/test/java/org/apache/sling/distribution/agent/impl/SimpleDistributionAgentTest.java b/src/test/java/org/apache/sling/distribution/agent/impl/SimpleDistributionAgentTest.java
index e729c7c..433d2ff 100644
--- a/src/test/java/org/apache/sling/distribution/agent/impl/SimpleDistributionAgentTest.java
+++ b/src/test/java/org/apache/sling/distribution/agent/impl/SimpleDistributionAgentTest.java
@@ -21,7 +21,7 @@
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
-
+import java.util.HashMap;
import org.apache.sling.api.resource.ResourceResolver;
import org.apache.sling.api.resource.ResourceResolverFactory;
import org.apache.sling.distribution.DistributionRequest;
@@ -125,7 +125,7 @@
when(queueProvider.getQueue(DistributionQueueDispatchingStrategy.DEFAULT_QUEUE_NAME)).thenReturn(
- new SimpleDistributionQueue(name, "name"));
+ new SimpleDistributionQueue(name, "name", new HashMap<String, DistributionQueueItemStatus>()));
DistributionResponse response = agent.execute(resourceResolver, request);
assertNotNull(response);
assertEquals("[ERROR]", response.getMessage());
@@ -165,7 +165,7 @@
}
}).when(packageExporter).exportPackages(any(ResourceResolver.class), any(DistributionRequest.class), any(DistributionPackageProcessor.class));
when(queueProvider.getQueue(DistributionQueueDispatchingStrategy.DEFAULT_QUEUE_NAME)).thenReturn(
- new SimpleDistributionQueue(name, "name"));
+ new SimpleDistributionQueue(name, "name", new HashMap<String, DistributionQueueItemStatus>()));
DistributionResponse response = agent.execute(resourceResolver, request);
assertNotNull(response);
assertEquals("[QUEUED]", response.getMessage());
@@ -205,7 +205,7 @@
return null;
}
}).when(packageExporter).exportPackages(any(ResourceResolver.class), any(DistributionRequest.class), any(DistributionPackageProcessor.class)); when(queueProvider.getQueue(DistributionQueueDispatchingStrategy.DEFAULT_QUEUE_NAME)).thenReturn(
- new SimpleDistributionQueue(name, "name"));
+ new SimpleDistributionQueue(name, "name", new HashMap<String, DistributionQueueItemStatus>()));
agent.execute(resourceResolver, request);
}
@@ -298,7 +298,7 @@
return null;
}
}).when(packageExporter).exportPackages(any(ResourceResolver.class), any(DistributionRequest.class), any(DistributionPackageProcessor.class)); when(queueProvider.getQueue(DistributionQueueDispatchingStrategy.DEFAULT_QUEUE_NAME)).thenReturn(
- new SimpleDistributionQueue(name, "name"));
+ new SimpleDistributionQueue(name, "name", new HashMap<String, DistributionQueueItemStatus>()));
DistributionResponse response = agent.execute(resourceResolver, request);
@@ -344,7 +344,7 @@
return null;
}
}).when(packageExporter).exportPackages(any(ResourceResolver.class), any(DistributionRequest.class), any(DistributionPackageProcessor.class)); when(queueProvider.getQueue(DistributionQueueDispatchingStrategy.DEFAULT_QUEUE_NAME)).thenReturn(
- new SimpleDistributionQueue(name, "name"));
+ new SimpleDistributionQueue(name, "name", new HashMap<String, DistributionQueueItemStatus>()));
DistributionResponse response = agent.execute(resourceResolver, request);
diff --git a/src/test/java/org/apache/sling/distribution/queue/impl/simple/SimpleDistributionQueueProcessorTest.java b/src/test/java/org/apache/sling/distribution/queue/impl/simple/SimpleDistributionQueueProcessorTest.java
index 42128c8..b219f2d 100644
--- a/src/test/java/org/apache/sling/distribution/queue/impl/simple/SimpleDistributionQueueProcessorTest.java
+++ b/src/test/java/org/apache/sling/distribution/queue/impl/simple/SimpleDistributionQueueProcessorTest.java
@@ -21,7 +21,6 @@
import java.util.Collection;
import java.util.HashMap;
import java.util.LinkedList;
-
import org.apache.sling.distribution.queue.DistributionQueueItemState;
import org.apache.sling.distribution.queue.DistributionQueueItemStatus;
import org.apache.sling.distribution.queue.spi.DistributionQueue;
@@ -43,7 +42,7 @@
DistributionQueue queue = mock(DistributionQueue.class);
DistributionQueueProcessor queueProcessor = mock(DistributionQueueProcessor.class);
SimpleDistributionQueueProcessor simpleDistributionQueueProcessor = new SimpleDistributionQueueProcessor(
- queue, queueProcessor);
+ queue, queueProcessor, new HashMap<String, DistributionQueueItemStatus>());
simpleDistributionQueueProcessor.run();
}
@@ -57,7 +56,7 @@
when(queueProvider.getQueues()).thenReturn(queues);
DistributionQueueProcessor queueProcessor = mock(DistributionQueueProcessor.class);
SimpleDistributionQueueProcessor simpleDistributionQueueProcessor = new SimpleDistributionQueueProcessor(
- queue, queueProcessor);
+ queue, queueProcessor, new HashMap<String, DistributionQueueItemStatus>());
simpleDistributionQueueProcessor.run();
}
@@ -74,7 +73,7 @@
when(queueProvider.getQueues()).thenReturn(queues);
DistributionQueueProcessor queueProcessor = mock(DistributionQueueProcessor.class);
SimpleDistributionQueueProcessor simpleDistributionQueueProcessor = new SimpleDistributionQueueProcessor(
- queue, queueProcessor);
+ queue, queueProcessor, new HashMap<String, DistributionQueueItemStatus>());
simpleDistributionQueueProcessor.run();
}
}
\ No newline at end of file
diff --git a/src/test/java/org/apache/sling/distribution/queue/impl/simple/SimpleDistributionQueueTest.java b/src/test/java/org/apache/sling/distribution/queue/impl/simple/SimpleDistributionQueueTest.java
index b43ef88..aadd3a4 100644
--- a/src/test/java/org/apache/sling/distribution/queue/impl/simple/SimpleDistributionQueueTest.java
+++ b/src/test/java/org/apache/sling/distribution/queue/impl/simple/SimpleDistributionQueueTest.java
@@ -19,7 +19,6 @@
package org.apache.sling.distribution.queue.impl.simple;
import java.util.HashMap;
-
import org.apache.sling.distribution.queue.spi.DistributionQueue;
import org.apache.sling.distribution.queue.DistributionQueueEntry;
import org.apache.sling.distribution.queue.DistributionQueueItem;
@@ -39,7 +38,8 @@
@Test
public void testPackageAddition() throws Exception {
- DistributionQueue queue = new SimpleDistributionQueue("agentName", "default");
+ DistributionQueue queue = new SimpleDistributionQueue("agentName", "default",
+ new HashMap<String, DistributionQueueItemStatus>());
DistributionQueueItem pkg = new DistributionQueueItem("packageId", new HashMap<String, Object>());
assertNotNull(queue.add(pkg));
assertFalse(queue.getStatus().isEmpty());
@@ -47,7 +47,8 @@
@Test
public void testPackageAdditionAndRemoval() throws Exception {
- DistributionQueue queue = new SimpleDistributionQueue("agentName", "default");
+ DistributionQueue queue = new SimpleDistributionQueue("agentName", "default",
+ new HashMap<String, DistributionQueueItemStatus>());
DistributionQueueItem pkg = new DistributionQueueItem("id", new HashMap<String, Object>());
assertNotNull(queue.add(pkg));
assertFalse(queue.getStatus().isEmpty());
@@ -59,7 +60,8 @@
@Test
public void testPackageAdditionRetrievalAndRemoval() throws Exception {
- DistributionQueue queue = new SimpleDistributionQueue("agentName", "default");
+ DistributionQueue queue = new SimpleDistributionQueue("agentName", "default",
+ new HashMap<String, DistributionQueueItemStatus>());
DistributionQueueItem pkg = new DistributionQueueItem("id", new HashMap<String, Object>());
assertNotNull(queue.add(pkg));
assertFalse(queue.getStatus().isEmpty());
@@ -69,7 +71,7 @@
assertNotNull(queue.remove(pkg.getPackageId()));
assertTrue(queue.getStatus().isEmpty());
assertNotNull(status);
- assertEquals(1, status.getAttempts());
+ assertEquals(0, status.getAttempts());
}
}