SLING-8854 In-file and In-memory queue-providers for Forward Distribution report incorrect value for processing "Attempts" (#26)

diff --git a/src/main/java/org/apache/sling/distribution/queue/impl/simple/SimpleDistributionQueue.java b/src/main/java/org/apache/sling/distribution/queue/impl/simple/SimpleDistributionQueue.java
index 62e9e49..6b4a2c1 100644
--- a/src/main/java/org/apache/sling/distribution/queue/impl/simple/SimpleDistributionQueue.java
+++ b/src/main/java/org/apache/sling/distribution/queue/impl/simple/SimpleDistributionQueue.java
@@ -27,7 +27,6 @@
 import java.util.Map;
 import java.util.Queue;
 import java.util.Set;
-import java.util.WeakHashMap;
 import java.util.concurrent.LinkedBlockingQueue;
 
 import org.apache.sling.distribution.queue.spi.DistributionQueue;
@@ -68,13 +67,13 @@
 
     private final Queue<DistributionQueueItem> queue;
 
-    private final Map<DistributionQueueItem, DistributionQueueItemStatus> statusMap;
+    private final Map<String, DistributionQueueItemStatus> statusMap;
 
-    public SimpleDistributionQueue(String agentName, String name) {
+    public SimpleDistributionQueue(String agentName, String name, Map<String, DistributionQueueItemStatus> statusMap) {
         log.debug("starting a simple queue {} for agent {}", name, agentName);
         this.name = name;
         this.queue = new LinkedBlockingQueue<DistributionQueueItem>();
-        this.statusMap = new WeakHashMap<DistributionQueueItem, DistributionQueueItemStatus>(10);
+        this.statusMap = statusMap;
     }
 
     @NotNull
@@ -85,17 +84,19 @@
     public DistributionQueueEntry add(@NotNull DistributionQueueItem item) {
         DistributionQueueItemState itemState = DistributionQueueItemState.ERROR;
         boolean result = false;
+        String entryId = item.getPackageId();
         try {
             result = queue.offer(item);
             itemState = DistributionQueueItemState.QUEUED;
         } catch (Exception e) {
             log.error("cannot add an item to the queue", e);
         } finally {
-            statusMap.put(item, new DistributionQueueItemStatus(Calendar.getInstance(), itemState, 0, name));
+            statusMap.put(entryId, new DistributionQueueItemStatus(Calendar.getInstance(), itemState, 0, name));
         }
 
         if (result) {
-            return new DistributionQueueEntry(item.getPackageId(), item, statusMap.get(item));
+            
+            return new DistributionQueueEntry(item.getPackageId(), item, statusMap.get(entryId));
         }
 
         return null;
@@ -106,10 +107,7 @@
     public DistributionQueueEntry getHead() {
         DistributionQueueItem element = queue.peek();
         if (element != null) {
-            DistributionQueueItemStatus itemState = statusMap.get(element);
-            statusMap.put(element, new DistributionQueueItemStatus(itemState.getEntered(),
-                    itemState.getItemState(),
-                    itemState.getAttempts() + 1, name));
+            DistributionQueueItemStatus itemState = statusMap.get(element.getPackageId());
 
             return new DistributionQueueEntry(element.getPackageId(), element, itemState);
         }
@@ -119,7 +117,7 @@
     @NotNull
     private DistributionQueueState getState() {
         DistributionQueueItem firstItem = queue.peek();
-        DistributionQueueItemStatus firstItemStatus = firstItem != null ? statusMap.get(firstItem) : null;
+        DistributionQueueItemStatus firstItemStatus = firstItem != null ? statusMap.get(firstItem.getPackageId()) : null;
         return DistributionQueueUtils.calculateState(firstItem, firstItemStatus);
     }
 
@@ -146,7 +144,7 @@
         List<DistributionQueueEntry> result = new ArrayList<DistributionQueueEntry>();
 
         for (DistributionQueueItem item : queue) {
-            result.add(new DistributionQueueEntry(item.getPackageId(), item, statusMap.get(item)));
+            result.add(new DistributionQueueEntry(item.getPackageId(), item, statusMap.get(item.getPackageId())));
         }
         return result;
     }
@@ -155,7 +153,7 @@
     public DistributionQueueEntry getEntry(@NotNull String id) {
         for (DistributionQueueItem item : queue) {
             if (id.equals(item.getPackageId())) {
-                return new DistributionQueueEntry(id, item, statusMap.get(item));
+                return new DistributionQueueEntry(id, item, statusMap.get(item.getPackageId()));
             }
         }
 
@@ -182,6 +180,7 @@
         boolean removed = false;
         if (toRemove != null) {
             removed = queue.remove(toRemove.getItem());
+            statusMap.remove(id);
         }
         log.debug("item with id {} removed from the queue: {}", id, removed);
         if (removed) {
diff --git a/src/main/java/org/apache/sling/distribution/queue/impl/simple/SimpleDistributionQueueProcessor.java b/src/main/java/org/apache/sling/distribution/queue/impl/simple/SimpleDistributionQueueProcessor.java
index f4d3074..5aff79c 100644
--- a/src/main/java/org/apache/sling/distribution/queue/impl/simple/SimpleDistributionQueueProcessor.java
+++ b/src/main/java/org/apache/sling/distribution/queue/impl/simple/SimpleDistributionQueueProcessor.java
@@ -19,7 +19,11 @@
 package org.apache.sling.distribution.queue.impl.simple;
 
 import org.apache.sling.distribution.queue.spi.DistributionQueue;
+
+import java.util.Map;
+
 import org.apache.sling.distribution.queue.DistributionQueueEntry;
+import org.apache.sling.distribution.queue.DistributionQueueItemStatus;
 import org.apache.sling.distribution.queue.impl.DistributionQueueProcessor;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -32,17 +36,23 @@
     private final Logger log = LoggerFactory.getLogger(getClass());
     private final DistributionQueue queue;
     private final DistributionQueueProcessor queueProcessor;
+    private final Map<String, DistributionQueueItemStatus> statusMap;
 
     public SimpleDistributionQueueProcessor(DistributionQueue queue,
-                                            DistributionQueueProcessor queueProcessor) {
+                                            DistributionQueueProcessor queueProcessor,
+                                            Map<String, DistributionQueueItemStatus> statusMap) {
         this.queue = queue;
         this.queueProcessor = queueProcessor;
+        this.statusMap = statusMap;
     }
 
     public void run() {
         try {
             DistributionQueueEntry entry;
             while ((entry = queue.getHead()) != null) {
+                DistributionQueueItemStatus itemStatus = entry.getStatus();
+                statusMap.put(entry.getId(),  new DistributionQueueItemStatus(itemStatus.getEntered(),
+                        itemStatus.getItemState(), itemStatus.getAttempts() + 1, queue.getName()));
                 if (queueProcessor.process(queue.getName(), entry)) {
                     if (queue.remove(entry.getId()) != null) {
                         log.debug("item {} processed and removed from the queue", entry.getItem());
diff --git a/src/main/java/org/apache/sling/distribution/queue/impl/simple/SimpleDistributionQueueProvider.java b/src/main/java/org/apache/sling/distribution/queue/impl/simple/SimpleDistributionQueueProvider.java
index 8622fdc..91ca984 100644
--- a/src/main/java/org/apache/sling/distribution/queue/impl/simple/SimpleDistributionQueueProvider.java
+++ b/src/main/java/org/apache/sling/distribution/queue/impl/simple/SimpleDistributionQueueProvider.java
@@ -26,6 +26,7 @@
 import java.io.FilenameFilter;
 import java.util.Collection;
 import java.util.Map;
+import java.util.WeakHashMap;
 import java.util.concurrent.ConcurrentHashMap;
 
 import org.apache.commons.io.IOUtils;
@@ -34,6 +35,7 @@
 import org.apache.sling.commons.scheduler.Scheduler;
 import org.apache.sling.distribution.queue.spi.DistributionQueue;
 import org.apache.sling.distribution.queue.DistributionQueueItem;
+import org.apache.sling.distribution.queue.DistributionQueueItemStatus;
 import org.apache.sling.distribution.queue.impl.DistributionQueueProcessor;
 import org.apache.sling.distribution.queue.impl.DistributionQueueProvider;
 import org.apache.sling.distribution.queue.DistributionQueueType;
@@ -56,6 +58,8 @@
     private final Scheduler scheduler;
 
     private final Map<String, SimpleDistributionQueue> queueMap = new ConcurrentHashMap<String, SimpleDistributionQueue>();
+    private final Map<SimpleDistributionQueue, Map<String, DistributionQueueItemStatus>> statusMap
+            = new WeakHashMap<SimpleDistributionQueue, Map<String, DistributionQueueItemStatus>>();
     private final boolean checkpoint;
     private File checkpointDirectory;
 
@@ -75,7 +79,8 @@
             if (!checkpointDirectory.exists()) {
                 created = checkpointDirectory.mkdir();
             }
-            log.info("checkpoint directory created: {}, exists {}", created, checkpointDirectory.isDirectory() && checkpointDirectory.exists());
+            log.info("checkpoint directory created: {}, exists {}", created,
+                    checkpointDirectory.isDirectory() && checkpointDirectory.exists());
         }
 
         this.scheduler = scheduler;
@@ -89,8 +94,11 @@
         SimpleDistributionQueue queue = queueMap.get(key);
         if (queue == null) {
             log.debug("creating a queue with key {}", key);
-            queue = new SimpleDistributionQueue(name, queueName);
+            Map<String, DistributionQueueItemStatus> queueStatusMap
+                    = new ConcurrentHashMap<String, DistributionQueueItemStatus>();
+            queue = new SimpleDistributionQueue(name, queueName, queueStatusMap);
             queueMap.put(key, queue);
+            statusMap.put(queue, queueStatusMap);
             log.debug("queue created {}", queue);
         }
         return queue;
@@ -125,7 +133,7 @@
                     try {
                         LineIterator lineIterator = IOUtils.lineIterator(new FileReader(qf));
                         while (lineIterator.hasNext()) {
-                            String line  = lineIterator.nextLine();
+                            String line = lineIterator.nextLine();
                             DistributionQueueItem item = mapper.readQueueItem(line);
                             queue.add(item);
                         }
@@ -140,19 +148,18 @@
 
             // enable checkpointing
             for (String queueName : queueNames) {
-                ScheduleOptions options = scheduler.NOW(-1, 15)
-                        .canRunConcurrently(false)
+                ScheduleOptions options = scheduler.NOW(-1, 15).canRunConcurrently(false)
                         .name(getJobName(queueName + "-checkpoint"));
-                scheduler.schedule(new SimpleDistributionQueueCheckpoint(getQueue(queueName), checkpointDirectory), options);
+                scheduler.schedule(new SimpleDistributionQueueCheckpoint(getQueue(queueName), checkpointDirectory),
+                        options);
             }
         }
 
         // enable processing
         for (String queueName : queueNames) {
-            ScheduleOptions options = scheduler.NOW(-1, 1)
-                    .canRunConcurrently(false)
-                    .name(getJobName(queueName));
-            scheduler.schedule(new SimpleDistributionQueueProcessor(getQueue(queueName), queueProcessor), options);
+            ScheduleOptions options = scheduler.NOW(-1, 1).canRunConcurrently(false).name(getJobName(queueName));
+            scheduler.schedule(new SimpleDistributionQueueProcessor(getQueue(queueName), queueProcessor,
+                    statusMap.get(getQueue(queueName))), options);
         }
 
     }
diff --git a/src/test/java/org/apache/sling/distribution/agent/impl/SimpleDistributionAgentTest.java b/src/test/java/org/apache/sling/distribution/agent/impl/SimpleDistributionAgentTest.java
index e729c7c..433d2ff 100644
--- a/src/test/java/org/apache/sling/distribution/agent/impl/SimpleDistributionAgentTest.java
+++ b/src/test/java/org/apache/sling/distribution/agent/impl/SimpleDistributionAgentTest.java
@@ -21,7 +21,7 @@
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
-
+import java.util.HashMap;
 import org.apache.sling.api.resource.ResourceResolver;
 import org.apache.sling.api.resource.ResourceResolverFactory;
 import org.apache.sling.distribution.DistributionRequest;
@@ -125,7 +125,7 @@
 
 
         when(queueProvider.getQueue(DistributionQueueDispatchingStrategy.DEFAULT_QUEUE_NAME)).thenReturn(
-                new SimpleDistributionQueue(name, "name"));
+                new SimpleDistributionQueue(name, "name", new HashMap<String, DistributionQueueItemStatus>()));
         DistributionResponse response = agent.execute(resourceResolver, request);
         assertNotNull(response);
         assertEquals("[ERROR]", response.getMessage());
@@ -165,7 +165,7 @@
             }
         }).when(packageExporter).exportPackages(any(ResourceResolver.class), any(DistributionRequest.class), any(DistributionPackageProcessor.class));
         when(queueProvider.getQueue(DistributionQueueDispatchingStrategy.DEFAULT_QUEUE_NAME)).thenReturn(
-                new SimpleDistributionQueue(name, "name"));
+                new SimpleDistributionQueue(name, "name", new HashMap<String, DistributionQueueItemStatus>()));
         DistributionResponse response = agent.execute(resourceResolver, request);
         assertNotNull(response);
         assertEquals("[QUEUED]", response.getMessage());
@@ -205,7 +205,7 @@
                 return null;
             }
         }).when(packageExporter).exportPackages(any(ResourceResolver.class), any(DistributionRequest.class), any(DistributionPackageProcessor.class));        when(queueProvider.getQueue(DistributionQueueDispatchingStrategy.DEFAULT_QUEUE_NAME)).thenReturn(
-                new SimpleDistributionQueue(name, "name"));
+                new SimpleDistributionQueue(name, "name", new HashMap<String, DistributionQueueItemStatus>()));
 
         agent.execute(resourceResolver, request);
     }
@@ -298,7 +298,7 @@
                 return null;
             }
         }).when(packageExporter).exportPackages(any(ResourceResolver.class), any(DistributionRequest.class), any(DistributionPackageProcessor.class));        when(queueProvider.getQueue(DistributionQueueDispatchingStrategy.DEFAULT_QUEUE_NAME)).thenReturn(
-                new SimpleDistributionQueue(name, "name"));
+                new SimpleDistributionQueue(name, "name", new HashMap<String, DistributionQueueItemStatus>()));
 
         DistributionResponse response = agent.execute(resourceResolver, request);
 
@@ -344,7 +344,7 @@
                 return null;
             }
         }).when(packageExporter).exportPackages(any(ResourceResolver.class), any(DistributionRequest.class), any(DistributionPackageProcessor.class));        when(queueProvider.getQueue(DistributionQueueDispatchingStrategy.DEFAULT_QUEUE_NAME)).thenReturn(
-                new SimpleDistributionQueue(name, "name"));
+                new SimpleDistributionQueue(name, "name", new HashMap<String, DistributionQueueItemStatus>()));
 
         DistributionResponse response = agent.execute(resourceResolver, request);
 
diff --git a/src/test/java/org/apache/sling/distribution/queue/impl/simple/SimpleDistributionQueueProcessorTest.java b/src/test/java/org/apache/sling/distribution/queue/impl/simple/SimpleDistributionQueueProcessorTest.java
index 42128c8..b219f2d 100644
--- a/src/test/java/org/apache/sling/distribution/queue/impl/simple/SimpleDistributionQueueProcessorTest.java
+++ b/src/test/java/org/apache/sling/distribution/queue/impl/simple/SimpleDistributionQueueProcessorTest.java
@@ -21,7 +21,6 @@
 import java.util.Collection;
 import java.util.HashMap;
 import java.util.LinkedList;
-
 import org.apache.sling.distribution.queue.DistributionQueueItemState;
 import org.apache.sling.distribution.queue.DistributionQueueItemStatus;
 import org.apache.sling.distribution.queue.spi.DistributionQueue;
@@ -43,7 +42,7 @@
         DistributionQueue queue = mock(DistributionQueue.class);
         DistributionQueueProcessor queueProcessor = mock(DistributionQueueProcessor.class);
         SimpleDistributionQueueProcessor simpleDistributionQueueProcessor = new SimpleDistributionQueueProcessor(
-                queue, queueProcessor);
+                queue, queueProcessor, new HashMap<String, DistributionQueueItemStatus>());
         simpleDistributionQueueProcessor.run();
     }
 
@@ -57,7 +56,7 @@
         when(queueProvider.getQueues()).thenReturn(queues);
         DistributionQueueProcessor queueProcessor = mock(DistributionQueueProcessor.class);
         SimpleDistributionQueueProcessor simpleDistributionQueueProcessor = new SimpleDistributionQueueProcessor(
-                queue, queueProcessor);
+                queue, queueProcessor, new HashMap<String, DistributionQueueItemStatus>());
         simpleDistributionQueueProcessor.run();
     }
 
@@ -74,7 +73,7 @@
         when(queueProvider.getQueues()).thenReturn(queues);
         DistributionQueueProcessor queueProcessor = mock(DistributionQueueProcessor.class);
         SimpleDistributionQueueProcessor simpleDistributionQueueProcessor = new SimpleDistributionQueueProcessor(
-                queue, queueProcessor);
+                queue, queueProcessor, new HashMap<String, DistributionQueueItemStatus>());
         simpleDistributionQueueProcessor.run();
     }
 }
\ No newline at end of file
diff --git a/src/test/java/org/apache/sling/distribution/queue/impl/simple/SimpleDistributionQueueTest.java b/src/test/java/org/apache/sling/distribution/queue/impl/simple/SimpleDistributionQueueTest.java
index b43ef88..aadd3a4 100644
--- a/src/test/java/org/apache/sling/distribution/queue/impl/simple/SimpleDistributionQueueTest.java
+++ b/src/test/java/org/apache/sling/distribution/queue/impl/simple/SimpleDistributionQueueTest.java
@@ -19,7 +19,6 @@
 package org.apache.sling.distribution.queue.impl.simple;
 
 import java.util.HashMap;
-
 import org.apache.sling.distribution.queue.spi.DistributionQueue;
 import org.apache.sling.distribution.queue.DistributionQueueEntry;
 import org.apache.sling.distribution.queue.DistributionQueueItem;
@@ -39,7 +38,8 @@
 
     @Test
     public void testPackageAddition() throws Exception {
-        DistributionQueue queue = new SimpleDistributionQueue("agentName", "default");
+        DistributionQueue queue = new SimpleDistributionQueue("agentName", "default",
+                new HashMap<String, DistributionQueueItemStatus>());
         DistributionQueueItem pkg = new DistributionQueueItem("packageId", new HashMap<String, Object>());
         assertNotNull(queue.add(pkg));
         assertFalse(queue.getStatus().isEmpty());
@@ -47,7 +47,8 @@
 
     @Test
     public void testPackageAdditionAndRemoval() throws Exception {
-        DistributionQueue queue = new SimpleDistributionQueue("agentName", "default");
+        DistributionQueue queue = new SimpleDistributionQueue("agentName", "default",
+                new HashMap<String, DistributionQueueItemStatus>());
         DistributionQueueItem pkg = new DistributionQueueItem("id", new HashMap<String, Object>());
         assertNotNull(queue.add(pkg));
         assertFalse(queue.getStatus().isEmpty());
@@ -59,7 +60,8 @@
 
     @Test
     public void testPackageAdditionRetrievalAndRemoval() throws Exception {
-        DistributionQueue queue = new SimpleDistributionQueue("agentName", "default");
+        DistributionQueue queue = new SimpleDistributionQueue("agentName", "default",
+                new HashMap<String, DistributionQueueItemStatus>());
         DistributionQueueItem pkg = new DistributionQueueItem("id", new HashMap<String, Object>());
         assertNotNull(queue.add(pkg));
         assertFalse(queue.getStatus().isEmpty());
@@ -69,7 +71,7 @@
         assertNotNull(queue.remove(pkg.getPackageId()));
         assertTrue(queue.getStatus().isEmpty());
         assertNotNull(status);
-        assertEquals(1, status.getAttempts());
+        assertEquals(0, status.getAttempts());
     }
 
 }