Merge pull request #24 from actinium15/issue/SLING-8853

SLING-8853 Adds ActiveResourceQueue and makes it available as …
diff --git a/src/main/java/org/apache/sling/distribution/packaging/impl/DistributionPackageUtils.java b/src/main/java/org/apache/sling/distribution/packaging/impl/DistributionPackageUtils.java
index 72100e8..4b85fc7 100644
--- a/src/main/java/org/apache/sling/distribution/packaging/impl/DistributionPackageUtils.java
+++ b/src/main/java/org/apache/sling/distribution/packaging/impl/DistributionPackageUtils.java
@@ -20,6 +20,7 @@
 
 import org.apache.commons.io.FileUtils;
 import org.apache.commons.io.IOUtils;
+import org.apache.jackrabbit.JcrConstants;
 import org.apache.jackrabbit.commons.JcrUtils;
 import org.apache.sling.api.resource.PersistenceException;
 import org.apache.sling.api.resource.Resource;
@@ -37,7 +38,6 @@
 
 import javax.jcr.Binary;
 import javax.jcr.Node;
-import javax.jcr.Property;
 import javax.jcr.RepositoryException;
 import javax.jcr.nodetype.NodeType;
 import java.io.ByteArrayInputStream;
@@ -287,15 +287,16 @@
 
     public static InputStream getStream(Resource resource) throws RepositoryException {
         Node parent = resource.adaptTo(Node.class);
-        return parent.getProperty("bin/jcr:content/jcr:data").getBinary().getStream();
+        return parent.getProperty("bin/" + JcrConstants.JCR_CONTENT
+                + "/" + JcrConstants.JCR_DATA).getBinary().getStream();
     }
 
     public static void uploadStream(Resource resource, InputStream stream) throws RepositoryException {
         Node parent = resource.adaptTo(Node.class);
         Node file = JcrUtils.getOrAddNode(parent, "bin", NodeType.NT_FILE);
-        Node content = JcrUtils.getOrAddNode(file, Node.JCR_CONTENT, NodeType.NT_RESOURCE);
+        Node content = JcrUtils.getOrAddNode(file, JcrConstants.JCR_CONTENT, NodeType.NT_RESOURCE);
         Binary binary = parent.getSession().getValueFactory().createBinary(stream);
-        content.setProperty(Property.JCR_DATA, binary);
+        content.setProperty(JcrConstants.JCR_DATA, binary);
         JcrUtils.getOrAddNode(parent, "refs", NodeType.NT_UNSTRUCTURED);
     }
 
@@ -365,7 +366,7 @@
 
                 if (file.exists()) {
                     inputStream = getSafeObjectInputStream(new FileInputStream(file));
-                    @SuppressWarnings("unchecked") // type is known by sedign
+                    @SuppressWarnings("unchecked") // type is known by design
                     HashSet<String> fromStreamSet = (HashSet<String>) inputStream.readObject();
                     set = fromStreamSet;
                 } else {
diff --git a/src/main/java/org/apache/sling/distribution/packaging/impl/FileDistributionPackage.java b/src/main/java/org/apache/sling/distribution/packaging/impl/FileDistributionPackage.java
index bca2d1b..286629c 100644
--- a/src/main/java/org/apache/sling/distribution/packaging/impl/FileDistributionPackage.java
+++ b/src/main/java/org/apache/sling/distribution/packaging/impl/FileDistributionPackage.java
@@ -22,6 +22,8 @@
 import java.io.File;
 import java.io.IOException;
 import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.Map;
 
 import org.apache.commons.io.FileUtils;
 import org.apache.sling.distribution.DistributionRequestType;
@@ -43,11 +45,27 @@
     public FileDistributionPackage(@NotNull File file,
                                    @NotNull String type,
                                    @Nullable String digestAlgorithm,
-                                   @Nullable String digestMessage) {
+                                   @Nullable String digestMessage,
+                                   @Nullable Map<String, Object> baseInfoMap) {
         super(file.getName(), type, digestAlgorithm, digestMessage);
         this.file = file;
 
+        if (null == baseInfoMap) {
+            try (InputStream metaInfoIS = FileUtils.openInputStream(getMetaInfoFile())) {
+                DistributionPackageUtils.readInfo(metaInfoIS, this.getInfo());
+            } catch (IOException e) {
+                log.error("cannot read meta-info for the package at {}", file.getAbsoluteFile(), e);
+            }
+        }
         this.getInfo().put(DistributionPackageInfo.PROPERTY_REQUEST_TYPE, DistributionRequestType.ADD);
+        if (null != baseInfoMap) {
+            this.getInfo().putAll(baseInfoMap);
+            try (OutputStream metaInfoOS = FileUtils.openOutputStream(getMetaInfoFile(), false)) {
+                DistributionPackageUtils.writeInfo(metaInfoOS, baseInfoMap);
+            } catch (IOException e) {
+                log.error("cannot create meta-info for the package at {}", file.getAbsoluteFile(), e);
+            }
+        }
     }
 
     @NotNull
@@ -67,6 +85,7 @@
     public void delete() {
         FileUtils.deleteQuietly(file);
         FileUtils.deleteQuietly(getStatusFile());
+        FileUtils.deleteQuietly(getMetaInfoFile());
     }
 
     public File getFile() {
@@ -101,6 +120,11 @@
         return new File(statusFilePath);
     }
 
+    private File getMetaInfoFile() {
+        String metaInfoFilePath = file.getAbsolutePath() + ".metainfo";
+        return new File(metaInfoFilePath);
+    }
+
 
     public class PackageInputStream extends BufferedInputStream {
         private final File file;
diff --git a/src/main/java/org/apache/sling/distribution/packaging/impl/FileDistributionPackageBuilder.java b/src/main/java/org/apache/sling/distribution/packaging/impl/FileDistributionPackageBuilder.java
index a79a7a2..2359b03 100644
--- a/src/main/java/org/apache/sling/distribution/packaging/impl/FileDistributionPackageBuilder.java
+++ b/src/main/java/org/apache/sling/distribution/packaging/impl/FileDistributionPackageBuilder.java
@@ -37,6 +37,7 @@
 import org.apache.sling.distribution.common.DistributionException;
 import org.apache.sling.distribution.packaging.DistributionPackage;
 import org.apache.sling.distribution.packaging.DistributionPackageBuilder;
+import org.apache.sling.distribution.packaging.DistributionPackageInfo;
 import org.apache.sling.distribution.serialization.DistributionContentSerializer;
 import org.apache.sling.distribution.serialization.DistributionExportFilter;
 import org.apache.sling.distribution.serialization.DistributionExportOptions;
@@ -104,7 +105,10 @@
             if (digestAlgorithm != null) {
                 digestMessage = readDigestMessage((DigestOutputStream) outputStream);
             }
-            distributionPackage = new FileDistributionPackage(file, getType(), digestAlgorithm, digestMessage);
+            DistributionPackageInfo info = new DistributionPackageInfo(getType());
+            DistributionPackageUtils.fillInfo(info, request);
+            distributionPackage = new FileDistributionPackage(file, getType(), digestAlgorithm, digestMessage,
+                    info);
         } catch (IOException e) {
             throw new DistributionException(e);
         } finally {
@@ -140,7 +144,7 @@
             outputStream.flush();
 
             String digestMessage = readDigestMessage(outputStream);
-            distributionPackage = new FileDistributionPackage(file, getType(), digestAlgorithm, digestMessage);
+            distributionPackage = new FileDistributionPackage(file, getType(), digestAlgorithm, digestMessage, null);
         } catch (Exception e) {
             throw new DistributionException(e);
         } finally {
@@ -168,6 +172,6 @@
             log.warn("file package does not exist", file.getAbsolutePath());
             return null;
         }
-        return new FileDistributionPackage(file, getType(), null, null);
+        return new FileDistributionPackage(file, getType(), null, null, null);
     }
 }
diff --git a/src/main/java/org/apache/sling/distribution/packaging/impl/InMemoryDistributionPackage.java b/src/main/java/org/apache/sling/distribution/packaging/impl/InMemoryDistributionPackage.java
index e3a7aed..21ac0ad 100644
--- a/src/main/java/org/apache/sling/distribution/packaging/impl/InMemoryDistributionPackage.java
+++ b/src/main/java/org/apache/sling/distribution/packaging/impl/InMemoryDistributionPackage.java
@@ -21,6 +21,7 @@
 import java.io.ByteArrayInputStream;
 import java.io.IOException;
 import java.io.InputStream;
+import java.util.Map;
 
 import org.apache.sling.distribution.packaging.DistributionPackage;
 import org.apache.sling.distribution.packaging.DistributionPackageInfo;
@@ -38,12 +39,16 @@
 
     private final DistributionPackageInfo info;
 
-    public InMemoryDistributionPackage(String id, String type, byte[] data) {
+    public InMemoryDistributionPackage(String id, String type, byte[] data, Map<String, Object> baseInfoMap) {
         this.id = id;
         this.type = type;
         this.data = data;
         this.size = data.length;
         this.info = new DistributionPackageInfo(type);
+
+        if (null != baseInfoMap) {
+            this.info.putAll(baseInfoMap);
+        }
     }
 
     @NotNull
diff --git a/src/main/java/org/apache/sling/distribution/packaging/impl/InMemoryDistributionPackageBuilder.java b/src/main/java/org/apache/sling/distribution/packaging/impl/InMemoryDistributionPackageBuilder.java
index d55b77e..b74b164 100644
--- a/src/main/java/org/apache/sling/distribution/packaging/impl/InMemoryDistributionPackageBuilder.java
+++ b/src/main/java/org/apache/sling/distribution/packaging/impl/InMemoryDistributionPackageBuilder.java
@@ -33,6 +33,7 @@
 import org.apache.sling.distribution.DistributionRequest;
 import org.apache.sling.distribution.common.DistributionException;
 import org.apache.sling.distribution.packaging.DistributionPackage;
+import org.apache.sling.distribution.packaging.DistributionPackageInfo;
 import org.apache.sling.distribution.serialization.DistributionContentSerializer;
 import org.apache.sling.distribution.serialization.DistributionExportFilter;
 import org.apache.sling.distribution.serialization.DistributionExportOptions;
@@ -75,7 +76,9 @@
 
         String packageId = "dstrpck-" + System.currentTimeMillis() + "-" + UUID.randomUUID().toString();
 
-        return new InMemoryDistributionPackage(packageId, getType(), baos.toByteArray());
+        DistributionPackageInfo info = new DistributionPackageInfo(getType());
+        DistributionPackageUtils.fillInfo(info, request);
+        return new InMemoryDistributionPackage(packageId, getType(), baos.toByteArray(), info);
     }
 
     @Override
@@ -102,7 +105,7 @@
             baos.flush();
 
             byte[] data = baos.toByteArray();
-            return new InMemoryDistributionPackage(packageId, getType(), data);
+            return new InMemoryDistributionPackage(packageId, getType(), data, info);
         } catch (IOException e) {
             throw new DistributionException(e);
         }
diff --git a/src/main/java/org/apache/sling/distribution/packaging/impl/ResourceDistributionPackage.java b/src/main/java/org/apache/sling/distribution/packaging/impl/ResourceDistributionPackage.java
index d94ec7b..fbd804a 100644
--- a/src/main/java/org/apache/sling/distribution/packaging/impl/ResourceDistributionPackage.java
+++ b/src/main/java/org/apache/sling/distribution/packaging/impl/ResourceDistributionPackage.java
@@ -22,6 +22,7 @@
 import java.io.BufferedInputStream;
 import java.io.IOException;
 import java.io.InputStream;
+import java.util.Map;
 
 import org.apache.sling.api.resource.PersistenceException;
 import org.apache.sling.api.resource.Resource;
@@ -50,14 +51,26 @@
                                 String type,
                                 ResourceResolver resourceResolver,
                                 @Nullable String digestAlgorithm,
-                                @Nullable String digestMessage) {
+                                @Nullable String digestMessage,
+                                @Nullable Map<String, Object> baseInfoMap) {
         super(resource.getName(), type, digestAlgorithm, digestMessage);
         this.resourceResolver = resourceResolver;
         ValueMap valueMap = resource.getValueMap();
         assert type.equals(valueMap.get("type")) : "wrong resource type";
         this.resource = resource;
         Object sizeProperty = resource.getValueMap().get("size");
+        Object paths = resource.getValueMap().get(DistributionPackageInfo.PROPERTY_REQUEST_PATHS);
+        Object deepPaths = resource.getValueMap().get(DistributionPackageInfo.PROPERTY_REQUEST_DEEP_PATHS);
         this.size = sizeProperty == null ? -1 : Long.parseLong(sizeProperty.toString());
+        if (null != baseInfoMap) {
+            this.getInfo().putAll(baseInfoMap);
+        }
+        if (paths instanceof String[]) {
+            this.getInfo().put(DistributionPackageInfo.PROPERTY_REQUEST_PATHS, (String[])paths);
+        }
+        if (deepPaths instanceof String[]) {
+            this.getInfo().put(DistributionPackageInfo.PROPERTY_REQUEST_DEEP_PATHS, (String[])deepPaths);
+        }
 
         this.getInfo().put(DistributionPackageInfo.PROPERTY_REQUEST_TYPE, DistributionRequestType.ADD);
     }
diff --git a/src/main/java/org/apache/sling/distribution/packaging/impl/ResourceDistributionPackageBuilder.java b/src/main/java/org/apache/sling/distribution/packaging/impl/ResourceDistributionPackageBuilder.java
index 919d3d9..8c412aa 100644
--- a/src/main/java/org/apache/sling/distribution/packaging/impl/ResourceDistributionPackageBuilder.java
+++ b/src/main/java/org/apache/sling/distribution/packaging/impl/ResourceDistributionPackageBuilder.java
@@ -44,6 +44,7 @@
 import org.apache.sling.distribution.DistributionRequest;
 import org.apache.sling.distribution.common.DistributionException;
 import org.apache.sling.distribution.packaging.DistributionPackage;
+import org.apache.sling.distribution.packaging.DistributionPackageInfo;
 import org.apache.sling.distribution.serialization.DistributionContentSerializer;
 import org.apache.sling.distribution.serialization.DistributionExportFilter;
 import org.apache.sling.distribution.serialization.DistributionExportOptions;
@@ -124,13 +125,15 @@
             try {
                 inputStream = outputStream.openWrittenDataInputStream();
 
-                packageResource = uploadStream(resourceResolver, packagesRoot, inputStream, outputStream.size());
+                packageResource = uploadStream(resourceResolver, packagesRoot, inputStream, outputStream.size(),
+                        request);
             } finally {
                 IOUtils.closeQuietly(inputStream);
                 outputStream.clean();
             }
 
-            distributionPackage = new ResourceDistributionPackage(packageResource, getType(), resourceResolver, digestAlgorithm, digestMessage);
+            distributionPackage = new ResourceDistributionPackage(packageResource, getType(), resourceResolver,
+                    digestAlgorithm, digestMessage, null);
         } catch (IOException e) {
             throw new DistributionException(e);
         }
@@ -150,8 +153,8 @@
         try {
             Resource packagesRoot = DistributionPackageUtils.getPackagesRoot(resourceResolver, packagesPath);
 
-            Resource packageResource = uploadStream(resourceResolver, packagesRoot, inputStream, -1);
-            return new ResourceDistributionPackage(packageResource, getType(), resourceResolver, null, null);
+            Resource packageResource = uploadStream(resourceResolver, packagesRoot, inputStream, -1, null);
+            return new ResourceDistributionPackage(packageResource, getType(), resourceResolver, null, null, null);
         } catch (PersistenceException e) {
             throw new DistributionException(e);
         }
@@ -176,7 +179,7 @@
             if (packageResource == null) {
                 return null;
             } else {
-                return new ResourceDistributionPackage(packageResource, getType(), resourceResolver, null, null);
+                return new ResourceDistributionPackage(packageResource, getType(), resourceResolver, null, null, null);
             }
         } catch (PersistenceException e) {
             return null;
@@ -184,7 +187,8 @@
     }
 
 
-    private Resource uploadStream(ResourceResolver resourceResolver, Resource parent, InputStream stream, long size) throws PersistenceException {
+    private Resource uploadStream(ResourceResolver resourceResolver, Resource parent, InputStream stream,
+            long size, DistributionRequest request) throws PersistenceException {
 
         String name;
         log.debug("uploading stream");
@@ -211,6 +215,13 @@
         Map<String, Object> props = new HashMap<String, Object>();
         props.put(ResourceResolver.PROPERTY_RESOURCE_TYPE, "sling:Folder");
         props.put("type", getType());
+        if (null != request) {
+            DistributionPackageInfo info = new DistributionPackageInfo(getType());
+            DistributionPackageUtils.fillInfo(info, request);
+            props.put(DistributionPackageInfo.PROPERTY_REQUEST_PATHS, info.getPaths());
+            props.put(DistributionPackageInfo.PROPERTY_REQUEST_DEEP_PATHS,
+                    info.get(DistributionPackageInfo.PROPERTY_REQUEST_DEEP_PATHS));
+        }
 
         if (size != -1) {
             props.put("size", size);
@@ -272,7 +283,7 @@
         @Override
         public ResourceDistributionPackage next() {
             Resource packageResource = packages.next();
-            return new ResourceDistributionPackage(packageResource, type, resourceResolver, null, null);
+            return new ResourceDistributionPackage(packageResource, type, resourceResolver, null, null, null);
         }
 
         @Override
diff --git a/src/main/java/org/apache/sling/distribution/queue/impl/simple/SimpleDistributionQueue.java b/src/main/java/org/apache/sling/distribution/queue/impl/simple/SimpleDistributionQueue.java
index 62e9e49..6b4a2c1 100644
--- a/src/main/java/org/apache/sling/distribution/queue/impl/simple/SimpleDistributionQueue.java
+++ b/src/main/java/org/apache/sling/distribution/queue/impl/simple/SimpleDistributionQueue.java
@@ -27,7 +27,6 @@
 import java.util.Map;
 import java.util.Queue;
 import java.util.Set;
-import java.util.WeakHashMap;
 import java.util.concurrent.LinkedBlockingQueue;
 
 import org.apache.sling.distribution.queue.spi.DistributionQueue;
@@ -68,13 +67,13 @@
 
     private final Queue<DistributionQueueItem> queue;
 
-    private final Map<DistributionQueueItem, DistributionQueueItemStatus> statusMap;
+    private final Map<String, DistributionQueueItemStatus> statusMap;
 
-    public SimpleDistributionQueue(String agentName, String name) {
+    public SimpleDistributionQueue(String agentName, String name, Map<String, DistributionQueueItemStatus> statusMap) {
         log.debug("starting a simple queue {} for agent {}", name, agentName);
         this.name = name;
         this.queue = new LinkedBlockingQueue<DistributionQueueItem>();
-        this.statusMap = new WeakHashMap<DistributionQueueItem, DistributionQueueItemStatus>(10);
+        this.statusMap = statusMap;
     }
 
     @NotNull
@@ -85,17 +84,19 @@
     public DistributionQueueEntry add(@NotNull DistributionQueueItem item) {
         DistributionQueueItemState itemState = DistributionQueueItemState.ERROR;
         boolean result = false;
+        String entryId = item.getPackageId();
         try {
             result = queue.offer(item);
             itemState = DistributionQueueItemState.QUEUED;
         } catch (Exception e) {
             log.error("cannot add an item to the queue", e);
         } finally {
-            statusMap.put(item, new DistributionQueueItemStatus(Calendar.getInstance(), itemState, 0, name));
+            statusMap.put(entryId, new DistributionQueueItemStatus(Calendar.getInstance(), itemState, 0, name));
         }
 
         if (result) {
-            return new DistributionQueueEntry(item.getPackageId(), item, statusMap.get(item));
+            
+            return new DistributionQueueEntry(item.getPackageId(), item, statusMap.get(entryId));
         }
 
         return null;
@@ -106,10 +107,7 @@
     public DistributionQueueEntry getHead() {
         DistributionQueueItem element = queue.peek();
         if (element != null) {
-            DistributionQueueItemStatus itemState = statusMap.get(element);
-            statusMap.put(element, new DistributionQueueItemStatus(itemState.getEntered(),
-                    itemState.getItemState(),
-                    itemState.getAttempts() + 1, name));
+            DistributionQueueItemStatus itemState = statusMap.get(element.getPackageId());
 
             return new DistributionQueueEntry(element.getPackageId(), element, itemState);
         }
@@ -119,7 +117,7 @@
     @NotNull
     private DistributionQueueState getState() {
         DistributionQueueItem firstItem = queue.peek();
-        DistributionQueueItemStatus firstItemStatus = firstItem != null ? statusMap.get(firstItem) : null;
+        DistributionQueueItemStatus firstItemStatus = firstItem != null ? statusMap.get(firstItem.getPackageId()) : null;
         return DistributionQueueUtils.calculateState(firstItem, firstItemStatus);
     }
 
@@ -146,7 +144,7 @@
         List<DistributionQueueEntry> result = new ArrayList<DistributionQueueEntry>();
 
         for (DistributionQueueItem item : queue) {
-            result.add(new DistributionQueueEntry(item.getPackageId(), item, statusMap.get(item)));
+            result.add(new DistributionQueueEntry(item.getPackageId(), item, statusMap.get(item.getPackageId())));
         }
         return result;
     }
@@ -155,7 +153,7 @@
     public DistributionQueueEntry getEntry(@NotNull String id) {
         for (DistributionQueueItem item : queue) {
             if (id.equals(item.getPackageId())) {
-                return new DistributionQueueEntry(id, item, statusMap.get(item));
+                return new DistributionQueueEntry(id, item, statusMap.get(item.getPackageId()));
             }
         }
 
@@ -182,6 +180,7 @@
         boolean removed = false;
         if (toRemove != null) {
             removed = queue.remove(toRemove.getItem());
+            statusMap.remove(id);
         }
         log.debug("item with id {} removed from the queue: {}", id, removed);
         if (removed) {
diff --git a/src/main/java/org/apache/sling/distribution/queue/impl/simple/SimpleDistributionQueueProcessor.java b/src/main/java/org/apache/sling/distribution/queue/impl/simple/SimpleDistributionQueueProcessor.java
index 54c37eb..462895e 100644
--- a/src/main/java/org/apache/sling/distribution/queue/impl/simple/SimpleDistributionQueueProcessor.java
+++ b/src/main/java/org/apache/sling/distribution/queue/impl/simple/SimpleDistributionQueueProcessor.java
@@ -19,7 +19,11 @@
 package org.apache.sling.distribution.queue.impl.simple;
 
 import org.apache.sling.distribution.queue.spi.DistributionQueue;
+
+import java.util.Map;
+
 import org.apache.sling.distribution.queue.DistributionQueueEntry;
+import org.apache.sling.distribution.queue.DistributionQueueItemStatus;
 import org.apache.sling.distribution.queue.impl.DistributionQueueProcessor;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -32,17 +36,23 @@
     private final Logger log = LoggerFactory.getLogger(getClass());
     private final DistributionQueue queue;
     private final DistributionQueueProcessor queueProcessor;
+    private final Map<String, DistributionQueueItemStatus> statusMap;
 
     public SimpleDistributionQueueProcessor(DistributionQueue queue,
-                                            DistributionQueueProcessor queueProcessor) {
+                                            DistributionQueueProcessor queueProcessor,
+                                            Map<String, DistributionQueueItemStatus> statusMap) {
         this.queue = queue;
         this.queueProcessor = queueProcessor;
+        this.statusMap = statusMap;
     }
 
     public void run() {
         try {
             DistributionQueueEntry entry;
             while ((entry = queue.getHead()) != null) {
+                DistributionQueueItemStatus itemStatus = entry.getStatus();
+                statusMap.put(entry.getId(),  new DistributionQueueItemStatus(itemStatus.getEntered(),
+                        itemStatus.getItemState(), itemStatus.getAttempts() + 1, queue.getName()));
                 if (queueProcessor.process(queue.getName(), entry)) {
                     if (queue.remove(entry.getId()) != null) {
                         log.debug("item {} processed and removed from the queue", entry.getItem());
diff --git a/src/main/java/org/apache/sling/distribution/queue/impl/simple/SimpleDistributionQueueProvider.java b/src/main/java/org/apache/sling/distribution/queue/impl/simple/SimpleDistributionQueueProvider.java
index 8622fdc..91ca984 100644
--- a/src/main/java/org/apache/sling/distribution/queue/impl/simple/SimpleDistributionQueueProvider.java
+++ b/src/main/java/org/apache/sling/distribution/queue/impl/simple/SimpleDistributionQueueProvider.java
@@ -26,6 +26,7 @@
 import java.io.FilenameFilter;
 import java.util.Collection;
 import java.util.Map;
+import java.util.WeakHashMap;
 import java.util.concurrent.ConcurrentHashMap;
 
 import org.apache.commons.io.IOUtils;
@@ -34,6 +35,7 @@
 import org.apache.sling.commons.scheduler.Scheduler;
 import org.apache.sling.distribution.queue.spi.DistributionQueue;
 import org.apache.sling.distribution.queue.DistributionQueueItem;
+import org.apache.sling.distribution.queue.DistributionQueueItemStatus;
 import org.apache.sling.distribution.queue.impl.DistributionQueueProcessor;
 import org.apache.sling.distribution.queue.impl.DistributionQueueProvider;
 import org.apache.sling.distribution.queue.DistributionQueueType;
@@ -56,6 +58,8 @@
     private final Scheduler scheduler;
 
     private final Map<String, SimpleDistributionQueue> queueMap = new ConcurrentHashMap<String, SimpleDistributionQueue>();
+    private final Map<SimpleDistributionQueue, Map<String, DistributionQueueItemStatus>> statusMap
+            = new WeakHashMap<SimpleDistributionQueue, Map<String, DistributionQueueItemStatus>>();
     private final boolean checkpoint;
     private File checkpointDirectory;
 
@@ -75,7 +79,8 @@
             if (!checkpointDirectory.exists()) {
                 created = checkpointDirectory.mkdir();
             }
-            log.info("checkpoint directory created: {}, exists {}", created, checkpointDirectory.isDirectory() && checkpointDirectory.exists());
+            log.info("checkpoint directory created: {}, exists {}", created,
+                    checkpointDirectory.isDirectory() && checkpointDirectory.exists());
         }
 
         this.scheduler = scheduler;
@@ -89,8 +94,11 @@
         SimpleDistributionQueue queue = queueMap.get(key);
         if (queue == null) {
             log.debug("creating a queue with key {}", key);
-            queue = new SimpleDistributionQueue(name, queueName);
+            Map<String, DistributionQueueItemStatus> queueStatusMap
+                    = new ConcurrentHashMap<String, DistributionQueueItemStatus>();
+            queue = new SimpleDistributionQueue(name, queueName, queueStatusMap);
             queueMap.put(key, queue);
+            statusMap.put(queue, queueStatusMap);
             log.debug("queue created {}", queue);
         }
         return queue;
@@ -125,7 +133,7 @@
                     try {
                         LineIterator lineIterator = IOUtils.lineIterator(new FileReader(qf));
                         while (lineIterator.hasNext()) {
-                            String line  = lineIterator.nextLine();
+                            String line = lineIterator.nextLine();
                             DistributionQueueItem item = mapper.readQueueItem(line);
                             queue.add(item);
                         }
@@ -140,19 +148,18 @@
 
             // enable checkpointing
             for (String queueName : queueNames) {
-                ScheduleOptions options = scheduler.NOW(-1, 15)
-                        .canRunConcurrently(false)
+                ScheduleOptions options = scheduler.NOW(-1, 15).canRunConcurrently(false)
                         .name(getJobName(queueName + "-checkpoint"));
-                scheduler.schedule(new SimpleDistributionQueueCheckpoint(getQueue(queueName), checkpointDirectory), options);
+                scheduler.schedule(new SimpleDistributionQueueCheckpoint(getQueue(queueName), checkpointDirectory),
+                        options);
             }
         }
 
         // enable processing
         for (String queueName : queueNames) {
-            ScheduleOptions options = scheduler.NOW(-1, 1)
-                    .canRunConcurrently(false)
-                    .name(getJobName(queueName));
-            scheduler.schedule(new SimpleDistributionQueueProcessor(getQueue(queueName), queueProcessor), options);
+            ScheduleOptions options = scheduler.NOW(-1, 1).canRunConcurrently(false).name(getJobName(queueName));
+            scheduler.schedule(new SimpleDistributionQueueProcessor(getQueue(queueName), queueProcessor,
+                    statusMap.get(getQueue(queueName))), options);
         }
 
     }
diff --git a/src/main/java/org/apache/sling/distribution/transport/impl/SimpleHttpDistributionTransport.java b/src/main/java/org/apache/sling/distribution/transport/impl/SimpleHttpDistributionTransport.java
index 3f7bd51..0e0cb92 100644
--- a/src/main/java/org/apache/sling/distribution/transport/impl/SimpleHttpDistributionTransport.java
+++ b/src/main/java/org/apache/sling/distribution/transport/impl/SimpleHttpDistributionTransport.java
@@ -20,7 +20,6 @@
 
 import java.io.InputStream;
 import java.net.URI;
-import java.util.Collections;
 import java.util.Map;
 import java.util.UUID;
 import org.apache.commons.io.IOUtils;
@@ -32,8 +31,6 @@
 import org.apache.http.client.fluent.Response;
 import org.apache.http.conn.HttpHostConnectException;
 import org.apache.http.entity.ContentType;
-import org.apache.http.impl.client.HttpClientBuilder;
-import org.apache.http.impl.client.HttpClients;
 import org.apache.http.message.BasicHeader;
 import org.apache.http.protocol.HTTP;
 import org.apache.sling.api.resource.ResourceResolver;
@@ -128,6 +125,11 @@
                         .addHeader(HTTP.CONN_DIRECTIVE, HTTP.CONN_CLOSE)
                         .useExpectContinue();
 
+                String authorizationHeader = getAuthSecret();
+                if (null != authorizationHeader) {
+                    req.addHeader(new BasicHeader(HttpHeaders.AUTHORIZATION, authorizationHeader));
+                }
+
                 // add the message body digest, see https://tools.ietf.org/html/rfc3230#section-4.3.2
                 if (distributionPackage instanceof AbstractDistributionPackage) {
                     AbstractDistributionPackage adb = (AbstractDistributionPackage) distributionPackage;
@@ -222,27 +224,32 @@
         return executor;
     }
 
-    private Executor buildAuthExecutor(String authorizationHeader) {
-        HttpClientBuilder builder = HttpClients.custom();
-        builder.setDefaultHeaders(Collections.singletonList(new BasicHeader(HttpHeaders.AUTHORIZATION, authorizationHeader)));
-        Executor executor = Executor.newInstance(builder.build());
-        log.debug("set Authorization header, endpoint={}", distributionEndpoint.getUri());
-        return executor;
-    }
-
-    private Executor buildAuthExecutor(@NotNull Map<String, String> credentialsMap) {
-        return (credentialsMap.containsKey(AUTHORIZATION))
-                ? buildAuthExecutor(credentialsMap.get(AUTHORIZATION))
-                : buildAuthExecutor(credentialsMap.get(USERNAME), credentialsMap.get(PASSWORD));
-    }
-
-    private Executor buildExecutor() {
-        DistributionTransportSecret secret = secretProvider.getSecret(distributionEndpoint.getUri());
-        Map<String, String> credentialsMap = secret.asCredentialsMap();
-        return (credentialsMap != null)
-                ? buildAuthExecutor(credentialsMap)
+    private Executor buildAuthExecutor(Map<String, String> credentialsMap) {
+        return (null != credentialsMap && !credentialsMap.containsKey(AUTHORIZATION))
+                ? buildAuthExecutor(credentialsMap.get(USERNAME), credentialsMap.get(PASSWORD))
                 : Executor.newInstance();
     }
 
+    private Executor buildExecutor() {
+        Map<String, String> credentialsMap = getCredentialsMap();
+        return buildAuthExecutor(credentialsMap);
+    }
+
+    private String getAuthSecret() {
+        Map<String, String> credentialsMap = getCredentialsMap();
+        if (null != credentialsMap && credentialsMap.containsKey(AUTHORIZATION)) {
+            return credentialsMap.get(AUTHORIZATION);
+        }
+        return null;
+    }
+    
+    private Map<String, String> getCredentialsMap() {
+        DistributionTransportSecret secret = secretProvider.getSecret(distributionEndpoint.getUri());
+        Map<String, String> credentialsMap = null;
+        if (null != secret) {
+            credentialsMap = secret.asCredentialsMap();
+        }
+        return credentialsMap;
+    }
 
 }
diff --git a/src/test/java/org/apache/sling/distribution/agent/impl/SimpleDistributionAgentTest.java b/src/test/java/org/apache/sling/distribution/agent/impl/SimpleDistributionAgentTest.java
index e729c7c..433d2ff 100644
--- a/src/test/java/org/apache/sling/distribution/agent/impl/SimpleDistributionAgentTest.java
+++ b/src/test/java/org/apache/sling/distribution/agent/impl/SimpleDistributionAgentTest.java
@@ -21,7 +21,7 @@
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
-
+import java.util.HashMap;
 import org.apache.sling.api.resource.ResourceResolver;
 import org.apache.sling.api.resource.ResourceResolverFactory;
 import org.apache.sling.distribution.DistributionRequest;
@@ -125,7 +125,7 @@
 
 
         when(queueProvider.getQueue(DistributionQueueDispatchingStrategy.DEFAULT_QUEUE_NAME)).thenReturn(
-                new SimpleDistributionQueue(name, "name"));
+                new SimpleDistributionQueue(name, "name", new HashMap<String, DistributionQueueItemStatus>()));
         DistributionResponse response = agent.execute(resourceResolver, request);
         assertNotNull(response);
         assertEquals("[ERROR]", response.getMessage());
@@ -165,7 +165,7 @@
             }
         }).when(packageExporter).exportPackages(any(ResourceResolver.class), any(DistributionRequest.class), any(DistributionPackageProcessor.class));
         when(queueProvider.getQueue(DistributionQueueDispatchingStrategy.DEFAULT_QUEUE_NAME)).thenReturn(
-                new SimpleDistributionQueue(name, "name"));
+                new SimpleDistributionQueue(name, "name", new HashMap<String, DistributionQueueItemStatus>()));
         DistributionResponse response = agent.execute(resourceResolver, request);
         assertNotNull(response);
         assertEquals("[QUEUED]", response.getMessage());
@@ -205,7 +205,7 @@
                 return null;
             }
         }).when(packageExporter).exportPackages(any(ResourceResolver.class), any(DistributionRequest.class), any(DistributionPackageProcessor.class));        when(queueProvider.getQueue(DistributionQueueDispatchingStrategy.DEFAULT_QUEUE_NAME)).thenReturn(
-                new SimpleDistributionQueue(name, "name"));
+                new SimpleDistributionQueue(name, "name", new HashMap<String, DistributionQueueItemStatus>()));
 
         agent.execute(resourceResolver, request);
     }
@@ -298,7 +298,7 @@
                 return null;
             }
         }).when(packageExporter).exportPackages(any(ResourceResolver.class), any(DistributionRequest.class), any(DistributionPackageProcessor.class));        when(queueProvider.getQueue(DistributionQueueDispatchingStrategy.DEFAULT_QUEUE_NAME)).thenReturn(
-                new SimpleDistributionQueue(name, "name"));
+                new SimpleDistributionQueue(name, "name", new HashMap<String, DistributionQueueItemStatus>()));
 
         DistributionResponse response = agent.execute(resourceResolver, request);
 
@@ -344,7 +344,7 @@
                 return null;
             }
         }).when(packageExporter).exportPackages(any(ResourceResolver.class), any(DistributionRequest.class), any(DistributionPackageProcessor.class));        when(queueProvider.getQueue(DistributionQueueDispatchingStrategy.DEFAULT_QUEUE_NAME)).thenReturn(
-                new SimpleDistributionQueue(name, "name"));
+                new SimpleDistributionQueue(name, "name", new HashMap<String, DistributionQueueItemStatus>()));
 
         DistributionResponse response = agent.execute(resourceResolver, request);
 
diff --git a/src/test/java/org/apache/sling/distribution/packaging/impl/FileDistributionPackageBuilderTest.java b/src/test/java/org/apache/sling/distribution/packaging/impl/FileDistributionPackageBuilderTest.java
index ea6072c..5948b09 100644
--- a/src/test/java/org/apache/sling/distribution/packaging/impl/FileDistributionPackageBuilderTest.java
+++ b/src/test/java/org/apache/sling/distribution/packaging/impl/FileDistributionPackageBuilderTest.java
@@ -20,16 +20,20 @@
 package org.apache.sling.distribution.packaging.impl;
 
 import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
 import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
 
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
+import java.util.Arrays;
 
 import org.apache.sling.api.resource.ResourceResolver;
 import org.apache.sling.distribution.DistributionRequest;
 import org.apache.sling.distribution.common.DistributionException;
 import org.apache.sling.distribution.packaging.DistributionPackage;
+import org.apache.sling.distribution.packaging.DistributionPackageInfo;
 import org.apache.sling.distribution.serialization.DistributionContentSerializer;
 import org.apache.sling.distribution.serialization.DistributionExportOptions;
 import org.junit.Test;
@@ -38,14 +42,37 @@
 
     @Test
     public void testDefaultTempDirectory() throws DistributionException, IOException {
+        final String testPath = "/a/test/path";
+        final String testDeepPath = "/a/deep/test/path";
+        final String[] requestPaths = {testPath, testDeepPath};
+        DistributionRequest mockRequest = mock(DistributionRequest.class);
+        when(mockRequest.getPaths()).thenReturn(requestPaths);
+        when(mockRequest.isDeep(testDeepPath)).thenReturn(true);
+        when(mockRequest.isDeep(testPath)).thenReturn(false);
+
         FileDistributionPackageBuilder builder = new FileDistributionPackageBuilder("test", new TestSerializer(), null, null, new String[0],
                 new String[0]);
-        DistributionPackage createdPackage = builder.createPackageForAdd(mock(ResourceResolver.class), mock(DistributionRequest.class));
+
+        DistributionPackage createdPackage = builder.createPackageForAdd(mock(ResourceResolver.class), mockRequest);
 
         try {
             assertNotNull(createdPackage.createInputStream());
             DistributionPackage gotPackage = builder.getPackageInternal(mock(ResourceResolver.class), createdPackage.getId());
             assertNotNull(gotPackage.createInputStream()); // this will throw an exception when the file doesn't exist
+            final String[] createdPackagePaths = createdPackage.getInfo().getPaths();
+            final String[] gotPackagePaths = gotPackage.getInfo().getPaths();
+            final String[] createdPackageDeepPaths = (String[]) createdPackage.getInfo()
+                    .get(DistributionPackageInfo.PROPERTY_REQUEST_DEEP_PATHS);
+            final String[] gotPackageDeepPaths = (String[]) gotPackage.getInfo()
+                    .get(DistributionPackageInfo.PROPERTY_REQUEST_DEEP_PATHS);
+            assertTrue("packaged Paths at createPackage and getPackage not consistent. "
+                    + "expected " + Arrays.toString(createdPackagePaths)
+                    + ", found " + Arrays.toString(gotPackagePaths),
+                    Arrays.equals(createdPackagePaths, gotPackagePaths));
+            assertTrue("packaged deep Paths at createPackage and getPackage not consistent. "
+                    + "expected " + Arrays.toString(createdPackageDeepPaths)
+                    + ", found " + Arrays.toString(gotPackageDeepPaths),
+                    Arrays.equals(createdPackageDeepPaths, gotPackageDeepPaths));
         } finally {
             createdPackage.delete();
         }
diff --git a/src/test/java/org/apache/sling/distribution/packaging/impl/InMemoryDistributionPackageBuilderTest.java b/src/test/java/org/apache/sling/distribution/packaging/impl/InMemoryDistributionPackageBuilderTest.java
index f20dc66..463cbd9 100644
--- a/src/test/java/org/apache/sling/distribution/packaging/impl/InMemoryDistributionPackageBuilderTest.java
+++ b/src/test/java/org/apache/sling/distribution/packaging/impl/InMemoryDistributionPackageBuilderTest.java
@@ -21,7 +21,6 @@
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
-
 import org.apache.commons.io.IOUtils;
 import org.apache.sling.api.resource.ResourceResolver;
 import org.apache.sling.distribution.DistributionRequestType;
@@ -40,8 +39,8 @@
     @Test
     public void testCreatePackage() throws Exception {
         InMemoryDistributionPackageBuilder builder = new InMemoryDistributionPackageBuilder("name", new InMemDistributionContentSerializer(), new String[0], new String[0]);
-        DistributionPackage pkg = builder.createPackageForAdd(mock(ResourceResolver.class), new SimpleDistributionRequest(DistributionRequestType.ADD, false, "/test"));
-        assertNotNull(pkg.createInputStream());
+        DistributionPackage createdPkg = builder.createPackageForAdd(mock(ResourceResolver.class), new SimpleDistributionRequest(DistributionRequestType.ADD, false, "/test"));
+        assertNotNull(createdPkg.createInputStream());
     }
 
     private final class InMemDistributionContentSerializer implements DistributionContentSerializer {
diff --git a/src/test/java/org/apache/sling/distribution/packaging/impl/InMemoryDistributionPackageTest.java b/src/test/java/org/apache/sling/distribution/packaging/impl/InMemoryDistributionPackageTest.java
index 1737d63..9d778e8 100644
--- a/src/test/java/org/apache/sling/distribution/packaging/impl/InMemoryDistributionPackageTest.java
+++ b/src/test/java/org/apache/sling/distribution/packaging/impl/InMemoryDistributionPackageTest.java
@@ -18,30 +18,51 @@
  */
 package org.apache.sling.distribution.packaging.impl;
 
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
 import java.util.Random;
 
 import org.apache.sling.distribution.packaging.DistributionPackage;
+import org.apache.sling.distribution.packaging.DistributionPackageInfo;
 import org.junit.Assert;
 import org.junit.Test;
 
 public class InMemoryDistributionPackageTest {
 
+    @SuppressWarnings("serial")
     @Test
     public void testGetInfo() throws Exception {
         int size = 1000;
         byte[] data = new byte[size];
+        final String testPath = "/a/test/path";
+        final String testDeepPath = "/a/test/deepPath";
         new Random().nextBytes(data);
-        DistributionPackage pkg = new InMemoryDistributionPackage("id", "type", data);
+        Map <String, Object> baseInfoMap = new HashMap<String, Object>() {{
+            put(DistributionPackageInfo.PROPERTY_REQUEST_PATHS, new String[] {testPath, testDeepPath});
+            put(DistributionPackageInfo.PROPERTY_REQUEST_DEEP_PATHS, new String[] {testDeepPath});
+        }};
+        DistributionPackage pkg = new InMemoryDistributionPackage("id", "type", data, baseInfoMap);
         Assert.assertEquals("type", pkg.getType());
         Assert.assertEquals("id", pkg.getId());
         Assert.assertEquals(size, pkg.getSize());
+        Assert.assertTrue("DistributionRequest provided Paths and those retrieved from"
+                + "DistributionPackage.getInfo() don't match",
+                Arrays.equals(new String[] {testPath, testDeepPath},
+                        (String[])pkg.getInfo().get(DistributionPackageInfo.PROPERTY_REQUEST_PATHS))
+                );
+        Assert.assertTrue("DistributionRequest deep Paths and those retrieved from"
+                + "DistributionPackage.getInfo() don't match",
+                Arrays.equals(new String[] {testDeepPath},
+                        (String[]) pkg.getInfo().get(DistributionPackageInfo.PROPERTY_REQUEST_DEEP_PATHS))
+                );
     }
 
     @Test
     public void testCreateInputStream() throws Exception {
         byte[] data = new byte[1000];
         new Random().nextBytes(data);
-        DistributionPackage pkg = new InMemoryDistributionPackage("id", "type", data);
+        DistributionPackage pkg = new InMemoryDistributionPackage("id", "type", data, null);
         Assert.assertNotEquals(pkg.createInputStream(), pkg.createInputStream());
     }
 }
\ No newline at end of file
diff --git a/src/test/java/org/apache/sling/distribution/packaging/impl/ResourceDistributionPackageBuilderTest.java b/src/test/java/org/apache/sling/distribution/packaging/impl/ResourceDistributionPackageBuilderTest.java
new file mode 100644
index 0000000..3850ae5
--- /dev/null
+++ b/src/test/java/org/apache/sling/distribution/packaging/impl/ResourceDistributionPackageBuilderTest.java
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.sling.distribution.packaging.impl;
+
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.Arrays;
+
+import org.apache.sling.api.resource.ResourceResolver;
+import org.apache.sling.distribution.DistributionRequest;
+import org.apache.sling.distribution.common.DistributionException;
+import org.apache.sling.distribution.packaging.DistributionPackage;
+import org.apache.sling.distribution.packaging.DistributionPackageInfo;
+import org.apache.sling.distribution.serialization.DistributionContentSerializer;
+import org.apache.sling.distribution.serialization.DistributionExportOptions;
+import org.apache.sling.distribution.util.impl.FileBackedMemoryOutputStream.MemoryUnit;
+import org.apache.sling.testing.mock.osgi.MockOsgi;
+import org.apache.sling.testing.mock.sling.MockSling;
+import org.apache.sling.testing.mock.sling.ResourceResolverType;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.osgi.framework.BundleContext;
+
+public class ResourceDistributionPackageBuilderTest {
+    BundleContext bundleContext = null;
+    ResourceResolver resolver = null;
+
+    @Test
+    public void testResourceDistributionBuilder() throws DistributionException, IOException {
+        final String testPath = "/a/test/path";
+        final String testDeepPath = "/a/deep/test/path";
+        final String[] requestPaths = {testPath, testDeepPath};
+
+        DistributionRequest mockRequest = mock(DistributionRequest.class);
+        when(mockRequest.getPaths()).thenReturn(requestPaths);
+        when(mockRequest.isDeep(testDeepPath)).thenReturn(true);
+        when(mockRequest.isDeep(testPath)).thenReturn(false);
+
+        ResourceDistributionPackageBuilder builder = new ResourceDistributionPackageBuilder("test",
+                new TestSerializer(), null, 0, MemoryUnit.valueOf("MEGA_BYTES"), false, null,
+                new String[0],new String[0]);
+
+        DistributionPackage createdPackage = builder.createPackageForAdd(resolver, mockRequest);
+
+        InputStream createdPackageContentIS = createdPackage.createInputStream();
+        assertNotNull("Couldn't create stream from DistributionPackage", createdPackageContentIS);
+        // create a new package from the stream of created-package
+        assertNotNull("Couldn't read stream from DistributionPackage",
+                builder.readPackage(resolver, createdPackageContentIS));
+
+        try {
+            DistributionPackage gotPackage = builder.getPackageInternal(resolver, createdPackage.getId());
+            final String[] createdPackagePaths = createdPackage.getInfo().getPaths();
+            final String[] gotPackagePaths = gotPackage.getInfo().getPaths();
+            final String[] createdPackageDeepPaths = (String[]) createdPackage.getInfo()
+                    .get(DistributionPackageInfo.PROPERTY_REQUEST_DEEP_PATHS);
+            final String[] gotPackageDeepPaths = (String[]) gotPackage.getInfo()
+                    .get(DistributionPackageInfo.PROPERTY_REQUEST_DEEP_PATHS);
+            assertTrue("packaged Paths at createPackage and getPackage not consistent. "
+                    + "expected " + Arrays.toString(createdPackagePaths)
+                    + ", found " + Arrays.toString(gotPackagePaths),
+                    Arrays.equals(createdPackagePaths, gotPackagePaths));
+            assertTrue("packaged deep Paths at createPackage and getPackage not consistent. "
+                    + "expected " + Arrays.toString(createdPackageDeepPaths)
+                    + ", found " + Arrays.toString(gotPackageDeepPaths),
+                    Arrays.equals(createdPackageDeepPaths, gotPackageDeepPaths));
+        } finally {
+            createdPackage.delete();
+        }
+    }
+
+    class TestSerializer implements DistributionContentSerializer {
+
+        @Override public void exportToStream(ResourceResolver resourceResolver, DistributionExportOptions exportOptions,
+                OutputStream outputStream) throws DistributionException {
+            try {
+                outputStream.write("test".getBytes());
+            } catch (IOException ex) {
+                throw new DistributionException(ex);
+            }
+        }
+
+        @Override public void importFromStream(ResourceResolver resourceResolver, InputStream inputStream) throws DistributionException {
+            throw new DistributionException("unsupported");
+        }
+
+        @Override public String getName() {
+            return "test";
+        }
+
+        @Override public boolean isRequestFiltering() {
+            return true;
+        }
+    }
+
+    @Before
+    public void setUp() {
+        bundleContext = MockOsgi.newBundleContext();
+        MockSling.setAdapterManagerBundleContext(bundleContext);
+        resolver = MockSling.newResourceResolver(ResourceResolverType.JCR_MOCK, bundleContext);
+    }
+
+    @After
+    public void tearDown() {
+        resolver.close();
+        MockSling.clearAdapterManagerBundleContext();
+    }
+}
diff --git a/src/test/java/org/apache/sling/distribution/queue/impl/simple/SimpleDistributionQueueProcessorTest.java b/src/test/java/org/apache/sling/distribution/queue/impl/simple/SimpleDistributionQueueProcessorTest.java
index 42128c8..b219f2d 100644
--- a/src/test/java/org/apache/sling/distribution/queue/impl/simple/SimpleDistributionQueueProcessorTest.java
+++ b/src/test/java/org/apache/sling/distribution/queue/impl/simple/SimpleDistributionQueueProcessorTest.java
@@ -21,7 +21,6 @@
 import java.util.Collection;
 import java.util.HashMap;
 import java.util.LinkedList;
-
 import org.apache.sling.distribution.queue.DistributionQueueItemState;
 import org.apache.sling.distribution.queue.DistributionQueueItemStatus;
 import org.apache.sling.distribution.queue.spi.DistributionQueue;
@@ -43,7 +42,7 @@
         DistributionQueue queue = mock(DistributionQueue.class);
         DistributionQueueProcessor queueProcessor = mock(DistributionQueueProcessor.class);
         SimpleDistributionQueueProcessor simpleDistributionQueueProcessor = new SimpleDistributionQueueProcessor(
-                queue, queueProcessor);
+                queue, queueProcessor, new HashMap<String, DistributionQueueItemStatus>());
         simpleDistributionQueueProcessor.run();
     }
 
@@ -57,7 +56,7 @@
         when(queueProvider.getQueues()).thenReturn(queues);
         DistributionQueueProcessor queueProcessor = mock(DistributionQueueProcessor.class);
         SimpleDistributionQueueProcessor simpleDistributionQueueProcessor = new SimpleDistributionQueueProcessor(
-                queue, queueProcessor);
+                queue, queueProcessor, new HashMap<String, DistributionQueueItemStatus>());
         simpleDistributionQueueProcessor.run();
     }
 
@@ -74,7 +73,7 @@
         when(queueProvider.getQueues()).thenReturn(queues);
         DistributionQueueProcessor queueProcessor = mock(DistributionQueueProcessor.class);
         SimpleDistributionQueueProcessor simpleDistributionQueueProcessor = new SimpleDistributionQueueProcessor(
-                queue, queueProcessor);
+                queue, queueProcessor, new HashMap<String, DistributionQueueItemStatus>());
         simpleDistributionQueueProcessor.run();
     }
 }
\ No newline at end of file
diff --git a/src/test/java/org/apache/sling/distribution/queue/impl/simple/SimpleDistributionQueueTest.java b/src/test/java/org/apache/sling/distribution/queue/impl/simple/SimpleDistributionQueueTest.java
index b43ef88..aadd3a4 100644
--- a/src/test/java/org/apache/sling/distribution/queue/impl/simple/SimpleDistributionQueueTest.java
+++ b/src/test/java/org/apache/sling/distribution/queue/impl/simple/SimpleDistributionQueueTest.java
@@ -19,7 +19,6 @@
 package org.apache.sling.distribution.queue.impl.simple;
 
 import java.util.HashMap;
-
 import org.apache.sling.distribution.queue.spi.DistributionQueue;
 import org.apache.sling.distribution.queue.DistributionQueueEntry;
 import org.apache.sling.distribution.queue.DistributionQueueItem;
@@ -39,7 +38,8 @@
 
     @Test
     public void testPackageAddition() throws Exception {
-        DistributionQueue queue = new SimpleDistributionQueue("agentName", "default");
+        DistributionQueue queue = new SimpleDistributionQueue("agentName", "default",
+                new HashMap<String, DistributionQueueItemStatus>());
         DistributionQueueItem pkg = new DistributionQueueItem("packageId", new HashMap<String, Object>());
         assertNotNull(queue.add(pkg));
         assertFalse(queue.getStatus().isEmpty());
@@ -47,7 +47,8 @@
 
     @Test
     public void testPackageAdditionAndRemoval() throws Exception {
-        DistributionQueue queue = new SimpleDistributionQueue("agentName", "default");
+        DistributionQueue queue = new SimpleDistributionQueue("agentName", "default",
+                new HashMap<String, DistributionQueueItemStatus>());
         DistributionQueueItem pkg = new DistributionQueueItem("id", new HashMap<String, Object>());
         assertNotNull(queue.add(pkg));
         assertFalse(queue.getStatus().isEmpty());
@@ -59,7 +60,8 @@
 
     @Test
     public void testPackageAdditionRetrievalAndRemoval() throws Exception {
-        DistributionQueue queue = new SimpleDistributionQueue("agentName", "default");
+        DistributionQueue queue = new SimpleDistributionQueue("agentName", "default",
+                new HashMap<String, DistributionQueueItemStatus>());
         DistributionQueueItem pkg = new DistributionQueueItem("id", new HashMap<String, Object>());
         assertNotNull(queue.add(pkg));
         assertFalse(queue.getStatus().isEmpty());
@@ -69,7 +71,7 @@
         assertNotNull(queue.remove(pkg.getPackageId()));
         assertTrue(queue.getStatus().isEmpty());
         assertNotNull(status);
-        assertEquals(1, status.getAttempts());
+        assertEquals(0, status.getAttempts());
     }
 
 }