SLING-8945 ActiveResourceQueue doesn't provide cluster-consistent processing-attempts view
* refactors the code a bit to contain status recording within queue instances themselves, freeing Queue Providers
  from the responsibility of maintaining them for the queues they create
diff --git a/src/main/java/org/apache/sling/distribution/queue/impl/resource/ActiveResourceQueue.java b/src/main/java/org/apache/sling/distribution/queue/impl/resource/ActiveResourceQueue.java
index 6a1339c..1921c22 100644
--- a/src/main/java/org/apache/sling/distribution/queue/impl/resource/ActiveResourceQueue.java
+++ b/src/main/java/org/apache/sling/distribution/queue/impl/resource/ActiveResourceQueue.java
@@ -71,4 +71,20 @@
             DistributionUtils.safelyLogout(resourceResolver);
         }
     }
+
+    public void recordProcessingAttempt(@NotNull DistributionQueueEntry entry) {
+        ResourceResolver resourceResolver = null;
+        try {
+            resourceResolver = DistributionUtils.loginService(resolverFactory, serviceName);
+            Resource queueRoot = ResourceQueueUtils.getRootResource(resourceResolver, queueRootPath);
+            Resource queueItemResource = ResourceQueueUtils.getResourceById(queueRoot, entry.getId());
+            ResourceQueueUtils.incrementProcessingAttemptForQueueItem(queueItemResource);
+            resourceResolver.commit();
+            log.debug("incremented processing-attempt for queue entry with id: {}", entry.getId());
+        } catch (Exception e) {
+            log.warn("Couldn't increment processing-attempt for queue entry with id: {}", entry.getId());
+        } finally {
+            DistributionUtils.safelyLogout(resourceResolver);
+        }
+    }
 }
diff --git a/src/main/java/org/apache/sling/distribution/queue/impl/resource/ResourceQueueProvider.java b/src/main/java/org/apache/sling/distribution/queue/impl/resource/ResourceQueueProvider.java
index 221501a..f35f2ce 100644
--- a/src/main/java/org/apache/sling/distribution/queue/impl/resource/ResourceQueueProvider.java
+++ b/src/main/java/org/apache/sling/distribution/queue/impl/resource/ResourceQueueProvider.java
@@ -23,7 +23,7 @@
 import org.apache.sling.commons.scheduler.ScheduleOptions;
 import org.apache.sling.commons.scheduler.Scheduler;
 import org.apache.sling.distribution.common.DistributionException;
-import org.apache.sling.distribution.queue.DistributionQueueItemStatus;
+import org.apache.sling.distribution.queue.DistributionQueueEntry;
 import org.apache.sling.distribution.queue.DistributionQueueType;
 import org.apache.sling.distribution.queue.impl.DistributionQueueProcessor;
 import org.apache.sling.distribution.queue.impl.DistributionQueueProvider;
@@ -40,6 +40,7 @@
 import java.util.Hashtable;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.function.Consumer;
 
 public class ResourceQueueProvider implements DistributionQueueProvider {
     public static final String TYPE = "resource";
@@ -57,9 +58,6 @@
 
     private final Map<String, ResourceQueue> queueMap = new ConcurrentHashMap<>();
 
-    private final Map<String, Map<String, DistributionQueueItemStatus>> statusMap = new ConcurrentHashMap<>();
-
-
     private ServiceRegistration<Runnable> cleanupTask;
 
     public ResourceQueueProvider(BundleContext context, ResourceResolverFactory resolverFactory,
@@ -81,13 +79,11 @@
     @NotNull
     @Override
     public DistributionQueue getQueue(@NotNull String queueName) throws DistributionException {
-        String key = getKey(queueName);
-        return queueMap.computeIfAbsent(key, k -> {
-            statusMap.put(key, new ConcurrentHashMap<>());
+        return queueMap.computeIfAbsent(queueName, name -> {
             if (isActive) {
-                return new ActiveResourceQueue(resolverFactory, serviceName, queueName, agentRootPath);
+                return new ActiveResourceQueue(resolverFactory, serviceName, name, agentRootPath);
             } else {
-                return new ResourceQueue(resolverFactory, serviceName, queueName, agentRootPath);
+                return new ResourceQueue(resolverFactory, serviceName, name, agentRootPath);
             }
         });
     }
@@ -111,8 +107,13 @@
                         .canRunConcurrently(false)
                         .onSingleInstanceOnly(true)
                         .name(getJobName(queueName));
-                scheduler.schedule(new SimpleDistributionQueueProcessor(getQueue(queueName), queueProcessor,
-                        statusMap.get(getKey(queueName))), options);
+                DistributionQueue queueImpl = getQueue(queueName);
+                Consumer<DistributionQueueEntry> processingAttemptRecorder = null;
+                if (isActive) {
+                    processingAttemptRecorder = ((ActiveResourceQueue)queueImpl)::recordProcessingAttempt;
+                }
+                scheduler.schedule(new SimpleDistributionQueueProcessor(queueImpl, queueProcessor, processingAttemptRecorder),
+                        options);
             }
         } else {
             throw new DistributionException(new UnsupportedOperationException("enable Processing not supported for Passive Queues"));
@@ -150,10 +151,6 @@
         cleanupTask = context.registerService(Runnable.class, cleanup, props);
     }
 
-    private String getKey(String queueName) {
-        return agentName + "#" + queueName;
-    }
-
     public void close() {
         if (cleanupTask != null) {
             cleanupTask.unregister();
diff --git a/src/main/java/org/apache/sling/distribution/queue/impl/resource/ResourceQueueUtils.java b/src/main/java/org/apache/sling/distribution/queue/impl/resource/ResourceQueueUtils.java
index a54767e..ccd76a2 100644
--- a/src/main/java/org/apache/sling/distribution/queue/impl/resource/ResourceQueueUtils.java
+++ b/src/main/java/org/apache/sling/distribution/queue/impl/resource/ResourceQueueUtils.java
@@ -20,6 +20,7 @@
 package org.apache.sling.distribution.queue.impl.resource;
 
 
+import org.apache.sling.api.resource.ModifiableValueMap;
 import org.apache.sling.api.resource.PersistenceException;
 import org.apache.sling.api.resource.Resource;
 import org.apache.sling.api.resource.ResourceResolver;
@@ -70,6 +71,7 @@
     private static final String DISTRIBUTION_PACKAGE_ID = DISTRIBUTION_PACKAGE_PREFIX + "item.id";
     private static final String DISTRIBUTION_PACKAGE_SIZE = DISTRIBUTION_PACKAGE_PREFIX + "package.size";
     private static final String ENTERED_DATE = "entered.date";
+    private static final String PROCESSING_ATTEMPTS = "processing.attempts";
 
 
     private static final AtomicLong itemCounter = new AtomicLong(0);
@@ -145,8 +147,9 @@
         ValueMap valueMap = resource.getValueMap();
         DistributionQueueItem queueItem = deserializeItem(valueMap);
         Calendar entered = valueMap.get(ENTERED_DATE, Calendar.getInstance());
+        int attempts = valueMap.get(PROCESSING_ATTEMPTS, 0);
         DistributionQueueItemStatus queueItemStatus = new DistributionQueueItemStatus(entered,
-                DistributionQueueItemState.QUEUED, 0, queueName);
+                DistributionQueueItemState.QUEUED, attempts, queueName);
 
         String entryId = getIdFromPath(queueRoot.getPath(), resource.getPath());
 
@@ -420,4 +423,10 @@
         return itemId.replace(ID_START, "").replace("--", "/");
     }
 
+    public static void incrementProcessingAttemptForQueueItem(Resource queueItemResource) {
+        ValueMap vm = queueItemResource.adaptTo(ModifiableValueMap.class);
+        int attempts = vm.get(PROCESSING_ATTEMPTS, 0);
+        vm.put(PROCESSING_ATTEMPTS, attempts + 1);
+    }
+
 }
diff --git a/src/main/java/org/apache/sling/distribution/queue/impl/simple/SimpleDistributionQueue.java b/src/main/java/org/apache/sling/distribution/queue/impl/simple/SimpleDistributionQueue.java
index 6b4a2c1..85a08b1 100644
--- a/src/main/java/org/apache/sling/distribution/queue/impl/simple/SimpleDistributionQueue.java
+++ b/src/main/java/org/apache/sling/distribution/queue/impl/simple/SimpleDistributionQueue.java
@@ -27,6 +27,7 @@
 import java.util.Map;
 import java.util.Queue;
 import java.util.Set;
+import java.util.WeakHashMap;
 import java.util.concurrent.LinkedBlockingQueue;
 
 import org.apache.sling.distribution.queue.spi.DistributionQueue;
@@ -67,13 +68,13 @@
 
     private final Queue<DistributionQueueItem> queue;
 
-    private final Map<String, DistributionQueueItemStatus> statusMap;
+    private final Map<DistributionQueueItem, DistributionQueueItemStatus> statusMap;
 
-    public SimpleDistributionQueue(String agentName, String name, Map<String, DistributionQueueItemStatus> statusMap) {
+    public SimpleDistributionQueue(String agentName, String name) {
         log.debug("starting a simple queue {} for agent {}", name, agentName);
         this.name = name;
         this.queue = new LinkedBlockingQueue<DistributionQueueItem>();
-        this.statusMap = statusMap;
+        this.statusMap = new WeakHashMap<DistributionQueueItem, DistributionQueueItemStatus>(10);
     }
 
     @NotNull
@@ -84,19 +85,17 @@
     public DistributionQueueEntry add(@NotNull DistributionQueueItem item) {
         DistributionQueueItemState itemState = DistributionQueueItemState.ERROR;
         boolean result = false;
-        String entryId = item.getPackageId();
         try {
             result = queue.offer(item);
             itemState = DistributionQueueItemState.QUEUED;
         } catch (Exception e) {
             log.error("cannot add an item to the queue", e);
         } finally {
-            statusMap.put(entryId, new DistributionQueueItemStatus(Calendar.getInstance(), itemState, 0, name));
+            statusMap.put(item, new DistributionQueueItemStatus(Calendar.getInstance(), itemState, 0, name));
         }
 
         if (result) {
-            
-            return new DistributionQueueEntry(item.getPackageId(), item, statusMap.get(entryId));
+            return new DistributionQueueEntry(item.getPackageId(), item, statusMap.get(item));
         }
 
         return null;
@@ -107,7 +106,7 @@
     public DistributionQueueEntry getHead() {
         DistributionQueueItem element = queue.peek();
         if (element != null) {
-            DistributionQueueItemStatus itemState = statusMap.get(element.getPackageId());
+            DistributionQueueItemStatus itemState = statusMap.get(element);
 
             return new DistributionQueueEntry(element.getPackageId(), element, itemState);
         }
@@ -117,7 +116,7 @@
     @NotNull
     private DistributionQueueState getState() {
         DistributionQueueItem firstItem = queue.peek();
-        DistributionQueueItemStatus firstItemStatus = firstItem != null ? statusMap.get(firstItem.getPackageId()) : null;
+        DistributionQueueItemStatus firstItemStatus = firstItem != null ? statusMap.get(firstItem) : null;
         return DistributionQueueUtils.calculateState(firstItem, firstItemStatus);
     }
 
@@ -144,7 +143,7 @@
         List<DistributionQueueEntry> result = new ArrayList<DistributionQueueEntry>();
 
         for (DistributionQueueItem item : queue) {
-            result.add(new DistributionQueueEntry(item.getPackageId(), item, statusMap.get(item.getPackageId())));
+            result.add(new DistributionQueueEntry(item.getPackageId(), item, statusMap.get(item)));
         }
         return result;
     }
@@ -153,7 +152,7 @@
     public DistributionQueueEntry getEntry(@NotNull String id) {
         for (DistributionQueueItem item : queue) {
             if (id.equals(item.getPackageId())) {
-                return new DistributionQueueEntry(id, item, statusMap.get(item.getPackageId()));
+                return new DistributionQueueEntry(id, item, statusMap.get(item));
             }
         }
 
@@ -180,7 +179,6 @@
         boolean removed = false;
         if (toRemove != null) {
             removed = queue.remove(toRemove.getItem());
-            statusMap.remove(id);
         }
         log.debug("item with id {} removed from the queue: {}", id, removed);
         if (removed) {
@@ -210,4 +208,10 @@
         return removedEntries;
     }
 
+    public void recordProcessingAttempt(@NotNull DistributionQueueEntry entry) {
+        statusMap.computeIfPresent(entry.getItem(), (item, status) -> {
+            return new DistributionQueueItemStatus(status.getEntered(),
+                    status.getItemState(), status.getAttempts() + 1, status.getQueueName());
+        });
+    }
 }
diff --git a/src/main/java/org/apache/sling/distribution/queue/impl/simple/SimpleDistributionQueueProcessor.java b/src/main/java/org/apache/sling/distribution/queue/impl/simple/SimpleDistributionQueueProcessor.java
index 462895e..3cf267b 100644
--- a/src/main/java/org/apache/sling/distribution/queue/impl/simple/SimpleDistributionQueueProcessor.java
+++ b/src/main/java/org/apache/sling/distribution/queue/impl/simple/SimpleDistributionQueueProcessor.java
@@ -20,10 +20,8 @@
 
 import org.apache.sling.distribution.queue.spi.DistributionQueue;
 
-import java.util.Map;
-
+import java.util.function.Consumer;
 import org.apache.sling.distribution.queue.DistributionQueueEntry;
-import org.apache.sling.distribution.queue.DistributionQueueItemStatus;
 import org.apache.sling.distribution.queue.impl.DistributionQueueProcessor;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -36,34 +34,35 @@
     private final Logger log = LoggerFactory.getLogger(getClass());
     private final DistributionQueue queue;
     private final DistributionQueueProcessor queueProcessor;
-    private final Map<String, DistributionQueueItemStatus> statusMap;
+    private Consumer<DistributionQueueEntry> recordProcessingAttempt;
 
     public SimpleDistributionQueueProcessor(DistributionQueue queue,
                                             DistributionQueueProcessor queueProcessor,
-                                            Map<String, DistributionQueueItemStatus> statusMap) {
+                                            Consumer<DistributionQueueEntry> processingAttemptRecorder) {
         this.queue = queue;
         this.queueProcessor = queueProcessor;
-        this.statusMap = statusMap;
+        this.recordProcessingAttempt = (null != processingAttemptRecorder)?
+                processingAttemptRecorder:
+                (entry) -> {};
     }
 
     public void run() {
         try {
             DistributionQueueEntry entry;
             while ((entry = queue.getHead()) != null) {
-                DistributionQueueItemStatus itemStatus = entry.getStatus();
-                statusMap.put(entry.getId(),  new DistributionQueueItemStatus(itemStatus.getEntered(),
-                        itemStatus.getItemState(), itemStatus.getAttempts() + 1, queue.getName()));
-                if (queueProcessor.process(queue.getName(), entry)) {
-                    if (queue.remove(entry.getId()) != null) {
-                        log.debug("item {} processed and removed from the queue", entry.getItem());
-                    }
+                boolean wasProcessed = queueProcessor.process(queue.getName(), entry);
+                boolean wasRemoved = wasProcessed?
+                        (queue.remove(entry.getId()) != null):
+                            false;
+                if (wasProcessed && wasRemoved) {
+                    log.debug("item {} processed and removed from the queue", entry.getItem());
                 } else {
-                    log.warn("processing of item {} failed", entry.getId());
+                    log.warn("processing and removal of item {} failed; will reattempt", entry.getId());
+                    this.recordProcessingAttempt.accept(entry);
                 }
             }
         } catch (Exception e) {
             log.error("error while processing queue {}", e);
         }
-
     }
 }
diff --git a/src/main/java/org/apache/sling/distribution/queue/impl/simple/SimpleDistributionQueueProvider.java b/src/main/java/org/apache/sling/distribution/queue/impl/simple/SimpleDistributionQueueProvider.java
index bc3d3c7..9315ee0 100644
--- a/src/main/java/org/apache/sling/distribution/queue/impl/simple/SimpleDistributionQueueProvider.java
+++ b/src/main/java/org/apache/sling/distribution/queue/impl/simple/SimpleDistributionQueueProvider.java
@@ -27,14 +27,15 @@
 import java.util.Collection;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.function.Consumer;
 
 import org.apache.commons.io.IOUtils;
 import org.apache.commons.io.LineIterator;
 import org.apache.sling.commons.scheduler.ScheduleOptions;
 import org.apache.sling.commons.scheduler.Scheduler;
 import org.apache.sling.distribution.queue.spi.DistributionQueue;
+import org.apache.sling.distribution.queue.DistributionQueueEntry;
 import org.apache.sling.distribution.queue.DistributionQueueItem;
-import org.apache.sling.distribution.queue.DistributionQueueItemStatus;
 import org.apache.sling.distribution.queue.impl.DistributionQueueProcessor;
 import org.apache.sling.distribution.queue.impl.DistributionQueueProvider;
 import org.apache.sling.distribution.queue.DistributionQueueType;
@@ -56,8 +57,7 @@
     private final String name;
     private final Scheduler scheduler;
 
-    private final Map<String, SimpleDistributionQueue> queueMap = new ConcurrentHashMap<>();
-    private final Map<String, Map<String, DistributionQueueItemStatus>> statusMap = new ConcurrentHashMap<>();
+    private final Map<String, SimpleDistributionQueue> queueMap = new ConcurrentHashMap<String, SimpleDistributionQueue>();
     private final boolean checkpoint;
     private File checkpointDirectory;
 
@@ -92,11 +92,8 @@
         SimpleDistributionQueue queue = queueMap.get(key);
         if (queue == null) {
             log.debug("creating a queue with key {}", key);
-            Map<String, DistributionQueueItemStatus> queueStatusMap
-                    = new ConcurrentHashMap<>();
-            queue = new SimpleDistributionQueue(name, queueName, queueStatusMap);
+            queue = new SimpleDistributionQueue(name, queueName);
             queueMap.put(key, queue);
-            statusMap.put(key, queueStatusMap);
             log.debug("queue created {}", queue);
         }
         return queue;
@@ -155,9 +152,14 @@
 
         // enable processing
         for (String queueName : queueNames) {
-            ScheduleOptions options = scheduler.NOW(-1, 1).canRunConcurrently(false).name(getJobName(queueName));
-            scheduler.schedule(new SimpleDistributionQueueProcessor(getQueue(queueName), queueProcessor,
-                    statusMap.get(getKey(queueName))), options);
+            ScheduleOptions options = scheduler.NOW(-1, 1)
+                    .canRunConcurrently(false)
+                    .name(getJobName(queueName));
+            DistributionQueue queueImpl = getQueue(queueName);
+            Consumer<DistributionQueueEntry> processingAttemptRecorder =
+                    ((SimpleDistributionQueue)queueImpl)::recordProcessingAttempt;
+            scheduler.schedule(new SimpleDistributionQueueProcessor(getQueue(queueName), queueProcessor, processingAttemptRecorder),
+                    options);
         }
 
     }
diff --git a/src/test/java/org/apache/sling/distribution/agent/impl/SimpleDistributionAgentTest.java b/src/test/java/org/apache/sling/distribution/agent/impl/SimpleDistributionAgentTest.java
index 433d2ff..aae6f7b 100644
--- a/src/test/java/org/apache/sling/distribution/agent/impl/SimpleDistributionAgentTest.java
+++ b/src/test/java/org/apache/sling/distribution/agent/impl/SimpleDistributionAgentTest.java
@@ -21,7 +21,6 @@
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
-import java.util.HashMap;
 import org.apache.sling.api.resource.ResourceResolver;
 import org.apache.sling.api.resource.ResourceResolverFactory;
 import org.apache.sling.distribution.DistributionRequest;
@@ -125,7 +124,7 @@
 
 
         when(queueProvider.getQueue(DistributionQueueDispatchingStrategy.DEFAULT_QUEUE_NAME)).thenReturn(
-                new SimpleDistributionQueue(name, "name", new HashMap<String, DistributionQueueItemStatus>()));
+                new SimpleDistributionQueue(name, "name"));
         DistributionResponse response = agent.execute(resourceResolver, request);
         assertNotNull(response);
         assertEquals("[ERROR]", response.getMessage());
@@ -165,7 +164,7 @@
             }
         }).when(packageExporter).exportPackages(any(ResourceResolver.class), any(DistributionRequest.class), any(DistributionPackageProcessor.class));
         when(queueProvider.getQueue(DistributionQueueDispatchingStrategy.DEFAULT_QUEUE_NAME)).thenReturn(
-                new SimpleDistributionQueue(name, "name", new HashMap<String, DistributionQueueItemStatus>()));
+                new SimpleDistributionQueue(name, "name"));
         DistributionResponse response = agent.execute(resourceResolver, request);
         assertNotNull(response);
         assertEquals("[QUEUED]", response.getMessage());
@@ -205,7 +204,7 @@
                 return null;
             }
         }).when(packageExporter).exportPackages(any(ResourceResolver.class), any(DistributionRequest.class), any(DistributionPackageProcessor.class));        when(queueProvider.getQueue(DistributionQueueDispatchingStrategy.DEFAULT_QUEUE_NAME)).thenReturn(
-                new SimpleDistributionQueue(name, "name", new HashMap<String, DistributionQueueItemStatus>()));
+                new SimpleDistributionQueue(name, "name"));
 
         agent.execute(resourceResolver, request);
     }
@@ -298,7 +297,7 @@
                 return null;
             }
         }).when(packageExporter).exportPackages(any(ResourceResolver.class), any(DistributionRequest.class), any(DistributionPackageProcessor.class));        when(queueProvider.getQueue(DistributionQueueDispatchingStrategy.DEFAULT_QUEUE_NAME)).thenReturn(
-                new SimpleDistributionQueue(name, "name", new HashMap<String, DistributionQueueItemStatus>()));
+                new SimpleDistributionQueue(name, "name"));
 
         DistributionResponse response = agent.execute(resourceResolver, request);
 
@@ -344,7 +343,7 @@
                 return null;
             }
         }).when(packageExporter).exportPackages(any(ResourceResolver.class), any(DistributionRequest.class), any(DistributionPackageProcessor.class));        when(queueProvider.getQueue(DistributionQueueDispatchingStrategy.DEFAULT_QUEUE_NAME)).thenReturn(
-                new SimpleDistributionQueue(name, "name", new HashMap<String, DistributionQueueItemStatus>()));
+                new SimpleDistributionQueue(name, "name"));
 
         DistributionResponse response = agent.execute(resourceResolver, request);
 
diff --git a/src/test/java/org/apache/sling/distribution/queue/impl/resource/ResourceQueueProcessingTest.java b/src/test/java/org/apache/sling/distribution/queue/impl/resource/ResourceQueueProcessingTest.java
index 68d08fa..6e6630f 100644
--- a/src/test/java/org/apache/sling/distribution/queue/impl/resource/ResourceQueueProcessingTest.java
+++ b/src/test/java/org/apache/sling/distribution/queue/impl/resource/ResourceQueueProcessingTest.java
@@ -87,7 +87,7 @@
             assertEquals(MAX_ENTRIES, resourceQueue.getStatus().getItemsCount());
 
             when(mockResourceQueueProcessor.process(eq(QUEUE_NAME), Matchers.any(DistributionQueueEntry.class)))
-                .thenReturn(true);
+                .thenReturn(false, true);
 
             resourceQueueProvider.enableQueueProcessing(mockResourceQueueProcessor, QUEUE_NAME);
             while (!resourceQueue.getStatus().getState().equals(DistributionQueueState.IDLE)) {
diff --git a/src/test/java/org/apache/sling/distribution/queue/impl/simple/SimpleDistributionQueueProcessorTest.java b/src/test/java/org/apache/sling/distribution/queue/impl/simple/SimpleDistributionQueueProcessorTest.java
index b219f2d..ec7d5c5 100644
--- a/src/test/java/org/apache/sling/distribution/queue/impl/simple/SimpleDistributionQueueProcessorTest.java
+++ b/src/test/java/org/apache/sling/distribution/queue/impl/simple/SimpleDistributionQueueProcessorTest.java
@@ -21,6 +21,7 @@
 import java.util.Collection;
 import java.util.HashMap;
 import java.util.LinkedList;
+
 import org.apache.sling.distribution.queue.DistributionQueueItemState;
 import org.apache.sling.distribution.queue.DistributionQueueItemStatus;
 import org.apache.sling.distribution.queue.spi.DistributionQueue;
@@ -28,7 +29,6 @@
 import org.apache.sling.distribution.queue.DistributionQueueItem;
 import org.apache.sling.distribution.queue.impl.DistributionQueueProcessor;
 import org.junit.Test;
-
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 
@@ -42,7 +42,7 @@
         DistributionQueue queue = mock(DistributionQueue.class);
         DistributionQueueProcessor queueProcessor = mock(DistributionQueueProcessor.class);
         SimpleDistributionQueueProcessor simpleDistributionQueueProcessor = new SimpleDistributionQueueProcessor(
-                queue, queueProcessor, new HashMap<String, DistributionQueueItemStatus>());
+                queue, queueProcessor, null);
         simpleDistributionQueueProcessor.run();
     }
 
@@ -56,7 +56,7 @@
         when(queueProvider.getQueues()).thenReturn(queues);
         DistributionQueueProcessor queueProcessor = mock(DistributionQueueProcessor.class);
         SimpleDistributionQueueProcessor simpleDistributionQueueProcessor = new SimpleDistributionQueueProcessor(
-                queue, queueProcessor, new HashMap<String, DistributionQueueItemStatus>());
+                queue, queueProcessor, null);
         simpleDistributionQueueProcessor.run();
     }
 
@@ -73,7 +73,7 @@
         when(queueProvider.getQueues()).thenReturn(queues);
         DistributionQueueProcessor queueProcessor = mock(DistributionQueueProcessor.class);
         SimpleDistributionQueueProcessor simpleDistributionQueueProcessor = new SimpleDistributionQueueProcessor(
-                queue, queueProcessor, new HashMap<String, DistributionQueueItemStatus>());
+                queue, queueProcessor, queue::recordProcessingAttempt);
         simpleDistributionQueueProcessor.run();
     }
 }
\ No newline at end of file
diff --git a/src/test/java/org/apache/sling/distribution/queue/impl/simple/SimpleDistributionQueueTest.java b/src/test/java/org/apache/sling/distribution/queue/impl/simple/SimpleDistributionQueueTest.java
index aadd3a4..169b871 100644
--- a/src/test/java/org/apache/sling/distribution/queue/impl/simple/SimpleDistributionQueueTest.java
+++ b/src/test/java/org/apache/sling/distribution/queue/impl/simple/SimpleDistributionQueueTest.java
@@ -19,6 +19,7 @@
 package org.apache.sling.distribution.queue.impl.simple;
 
 import java.util.HashMap;
+
 import org.apache.sling.distribution.queue.spi.DistributionQueue;
 import org.apache.sling.distribution.queue.DistributionQueueEntry;
 import org.apache.sling.distribution.queue.DistributionQueueItem;
@@ -38,8 +39,7 @@
 
     @Test
     public void testPackageAddition() throws Exception {
-        DistributionQueue queue = new SimpleDistributionQueue("agentName", "default",
-                new HashMap<String, DistributionQueueItemStatus>());
+        DistributionQueue queue = new SimpleDistributionQueue("agentName", "default");
         DistributionQueueItem pkg = new DistributionQueueItem("packageId", new HashMap<String, Object>());
         assertNotNull(queue.add(pkg));
         assertFalse(queue.getStatus().isEmpty());
@@ -47,8 +47,7 @@
 
     @Test
     public void testPackageAdditionAndRemoval() throws Exception {
-        DistributionQueue queue = new SimpleDistributionQueue("agentName", "default",
-                new HashMap<String, DistributionQueueItemStatus>());
+        DistributionQueue queue = new SimpleDistributionQueue("agentName", "default");
         DistributionQueueItem pkg = new DistributionQueueItem("id", new HashMap<String, Object>());
         assertNotNull(queue.add(pkg));
         assertFalse(queue.getStatus().isEmpty());
@@ -60,18 +59,26 @@
 
     @Test
     public void testPackageAdditionRetrievalAndRemoval() throws Exception {
-        DistributionQueue queue = new SimpleDistributionQueue("agentName", "default",
-                new HashMap<String, DistributionQueueItemStatus>());
+        DistributionQueue queue = new SimpleDistributionQueue("agentName", "default");
         DistributionQueueItem pkg = new DistributionQueueItem("id", new HashMap<String, Object>());
         assertNotNull(queue.add(pkg));
         assertFalse(queue.getStatus().isEmpty());
-        assertEquals(pkg, queue.getHead().getItem());
-        assertFalse(queue.getStatus().isEmpty());
+        DistributionQueueEntry entry = queue.getHead();
+
         DistributionQueueItemStatus status = queue.getEntry(pkg.getPackageId()).getStatus();
+        assertNotNull(status);
+        assertEquals(0, status.getAttempts());
+
+        ((SimpleDistributionQueue)queue).recordProcessingAttempt(entry);
+
+        assertEquals(pkg, entry.getItem());
+        assertFalse(queue.getStatus().isEmpty());
+
+        status = queue.getEntry(pkg.getPackageId()).getStatus();
         assertNotNull(queue.remove(pkg.getPackageId()));
         assertTrue(queue.getStatus().isEmpty());
         assertNotNull(status);
-        assertEquals(0, status.getAttempts());
+        assertEquals(1, status.getAttempts());
     }
 
 }