SLING-7754: implementation of Resource Based Queues
diff --git a/pom.xml b/pom.xml
index 6bd547f..8f62adf 100644
--- a/pom.xml
+++ b/pom.xml
@@ -48,6 +48,10 @@
       <tag>HEAD</tag>
   </scm>
 
+    <properties>
+        <exam.version>4.11.0</exam.version>
+    </properties>
+
     <!-- ======================================================================= -->
     <!-- B U I L D                                                               -->
     <!-- ======================================================================= -->
@@ -94,6 +98,9 @@
                 <artifactId>maven-surefire-plugin</artifactId>
                 <configuration>
                     <redirectTestOutputToFile>true</redirectTestOutputToFile>
+                    <excludes>
+                        <exclude>**/*IT.java</exclude>
+                    </excludes>
                 </configuration>
             </plugin>
             <plugin>
@@ -112,6 +119,35 @@
                     </excludes>
                 </configuration>
             </plugin>
+
+            <plugin>
+                <groupId>org.apache.servicemix.tooling</groupId>
+                <artifactId>depends-maven-plugin</artifactId>
+                <version>1.4.0</version>
+                <executions>
+                    <execution>
+                        <goals>
+                            <goal>generate-depends-file</goal>
+                        </goals>
+                    </execution>
+                </executions>
+            </plugin>
+            <plugin>
+                <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-failsafe-plugin</artifactId>
+                <configuration>
+                    <redirectTestOutputToFile>true</redirectTestOutputToFile>
+                    <systemProperties>
+                        <property>
+                            <name>bundle.filename</name>
+                            <value>${basedir}/target/${project.build.finalName}.jar</value>
+                        </property>
+                    </systemProperties>
+                    <includes>
+                        <include>**/*IT.java</include>
+                    </includes>
+                </configuration>
+            </plugin>
         </plugins>
     </build>
 
@@ -310,6 +346,45 @@
             <scope>provided</scope>
         </dependency>
 
+
+        <!-- Pax Exam -->
+        <dependency>
+            <groupId>org.apache.sling</groupId>
+            <artifactId>org.apache.sling.testing.paxexam</artifactId>
+            <version>2.0.0</version>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.ops4j.pax.exam</groupId>
+            <artifactId>pax-exam</artifactId>
+            <version>${exam.version}</version>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.ops4j.pax.exam</groupId>
+            <artifactId>pax-exam-cm</artifactId>
+            <version>${exam.version}</version>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.ops4j.pax.exam</groupId>
+            <artifactId>pax-exam-container-forked</artifactId>
+            <version>${exam.version}</version>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.ops4j.pax.exam</groupId>
+            <artifactId>pax-exam-junit4</artifactId>
+            <version>${exam.version}</version>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.ops4j.pax.exam</groupId>
+            <artifactId>pax-exam-link-mvn</artifactId>
+            <version>${exam.version}</version>
+            <scope>test</scope>
+        </dependency>
+
     </dependencies>
     <reporting>
         <plugins>
diff --git a/src/main/java/org/apache/sling/distribution/agent/impl/QueueDistributionAgentFactory.java b/src/main/java/org/apache/sling/distribution/agent/impl/QueueDistributionAgentFactory.java
index 7f3c507..3d0ad64 100644
--- a/src/main/java/org/apache/sling/distribution/agent/impl/QueueDistributionAgentFactory.java
+++ b/src/main/java/org/apache/sling/distribution/agent/impl/QueueDistributionAgentFactory.java
@@ -46,9 +46,10 @@
 import org.apache.sling.distribution.packaging.impl.exporter.LocalDistributionPackageExporter;
 import org.apache.sling.distribution.queue.impl.DistributionQueueProvider;
 import org.apache.sling.distribution.queue.impl.DistributionQueueDispatchingStrategy;
+import org.apache.sling.distribution.queue.impl.DistributionQueueProviderFactory;
 import org.apache.sling.distribution.queue.impl.PriorityQueueDispatchingStrategy;
 import org.apache.sling.distribution.queue.impl.SingleQueueDispatchingStrategy;
-import org.apache.sling.distribution.queue.impl.jobhandling.JobHandlingDistributionQueueProvider;
+import org.apache.sling.distribution.queue.impl.resource.ResourceQueueProvider;
 import org.apache.sling.distribution.trigger.DistributionTrigger;
 import org.apache.sling.event.jobs.JobManager;
 import org.apache.sling.jcr.api.SlingRepository;
@@ -111,6 +112,12 @@
     private DistributionRequestAuthorizationStrategy requestAuthorizationStrategy;
 
 
+    @Property(name = "queueProviderFactory.target", label = "Queue Provider Factory", description = "The target reference for the DistributionQueueProviderFactory used to build queues," +
+            "e.g. use target=(name=...) to bind to services by name.", value = "(name=jobQueue)")
+    @Reference(name = "queueProviderFactory")
+    private DistributionQueueProviderFactory queueProviderFactory;
+
+
     @Property(name = "packageBuilder.target", label = "Package Builder", description = "The target reference for the DistributionPackageBuilder used to create distribution packages, " +
             "e.g. use target=(name=...) to bind to services by name.", value = SettingsUtils.COMPONENT_NAME_DEFAULT)
     @Reference(name = "packageBuilder")
@@ -135,14 +142,13 @@
     private SlingSettingsService settingsService;
 
     @Reference
-    private JobManager jobManager;
-
-    @Reference
     private ResourceResolverFactory resourceResolverFactory;
 
     @Reference
     private SlingRepository slingRepository;
 
+    DistributionQueueProvider queueProvider;
+
     public QueueDistributionAgentFactory() {
         super(QueueDistributionAgentMBean.class);
     }
@@ -164,6 +170,8 @@
     @Deactivate
     protected void deactivate(BundleContext context) {
         super.deactivate(context);
+
+        queueProviderFactory.releaseProvider(queueProvider);
     }
 
     @Override
@@ -177,8 +185,9 @@
         Map<String, String> priorityQueues = PropertiesUtil.toMap(config.get(PRIORITY_QUEUES), new String[0]);
         priorityQueues = SettingsUtils.removeEmptyEntries(priorityQueues);
 
+        queueProvider = queueProviderFactory.getProvider(agentName, serviceName);
 
-        DistributionQueueProvider queueProvider = new MonitoringDistributionQueueProvider(new JobHandlingDistributionQueueProvider(agentName, jobManager, context), context);
+        MonitoringDistributionQueueProvider monitoringQueueProvider = new MonitoringDistributionQueueProvider(queueProvider, context);
 
         DistributionQueueDispatchingStrategy exportQueueStrategy = null;
 
@@ -194,7 +203,7 @@
 
         return new SimpleDistributionAgent(agentName, false, null,
                 serviceName, null, packageExporter, requestAuthorizationStrategy,
-                queueProvider, exportQueueStrategy, null, distributionEventFactory, resourceResolverFactory, slingRepository,
+                monitoringQueueProvider, exportQueueStrategy, null, distributionEventFactory, resourceResolverFactory, slingRepository,
                 distributionLog, allowedRequests, allowedRoots, 0);
     }
 
diff --git a/src/main/java/org/apache/sling/distribution/queue/impl/DistributionQueueProviderFactory.java b/src/main/java/org/apache/sling/distribution/queue/impl/DistributionQueueProviderFactory.java
new file mode 100644
index 0000000..ee84d5c
--- /dev/null
+++ b/src/main/java/org/apache/sling/distribution/queue/impl/DistributionQueueProviderFactory.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.sling.distribution.queue.impl;
+
+public interface DistributionQueueProviderFactory {
+
+    DistributionQueueProvider getProvider(String agentName, String serviceName);
+
+    void releaseProvider(DistributionQueueProvider queueProvider);
+
+}
diff --git a/src/main/java/org/apache/sling/distribution/queue/impl/jobhandling/JobHandlingQueueDistributionProviderFactory.java b/src/main/java/org/apache/sling/distribution/queue/impl/jobhandling/JobHandlingQueueDistributionProviderFactory.java
new file mode 100644
index 0000000..d6a450d
--- /dev/null
+++ b/src/main/java/org/apache/sling/distribution/queue/impl/jobhandling/JobHandlingQueueDistributionProviderFactory.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.sling.distribution.queue.impl.jobhandling;
+
+import org.apache.felix.scr.annotations.Activate;
+import org.apache.felix.scr.annotations.Component;
+import org.apache.felix.scr.annotations.Property;
+import org.apache.felix.scr.annotations.Reference;
+import org.apache.felix.scr.annotations.Service;
+import org.apache.sling.distribution.component.impl.DistributionComponentConstants;
+import org.apache.sling.distribution.queue.impl.DistributionQueueProvider;
+import org.apache.sling.distribution.queue.impl.DistributionQueueProviderFactory;
+import org.apache.sling.event.jobs.JobManager;
+import org.osgi.framework.BundleContext;
+
+import java.util.Map;
+
+@Component
+@Service(DistributionQueueProviderFactory.class)
+@Property(name = DistributionComponentConstants.PN_NAME, value = "jobQueue")
+public class JobHandlingQueueDistributionProviderFactory implements DistributionQueueProviderFactory {
+
+    @Reference
+    JobManager jobManager;
+
+    BundleContext context;
+
+
+    @Activate
+    protected void activate(BundleContext context, Map<String, Object> config)
+    {
+        this.context = context;
+    }
+
+    @Override
+    public DistributionQueueProvider getProvider(String agentName, String serviceName) {
+        return new JobHandlingDistributionQueueProvider(agentName, jobManager, context);
+    }
+
+    @Override
+    public void releaseProvider(DistributionQueueProvider queueProvider) {
+
+    }
+}
diff --git a/src/main/java/org/apache/sling/distribution/queue/impl/resource/ResourceIterator.java b/src/main/java/org/apache/sling/distribution/queue/impl/resource/ResourceIterator.java
new file mode 100644
index 0000000..4236d56
--- /dev/null
+++ b/src/main/java/org/apache/sling/distribution/queue/impl/resource/ResourceIterator.java
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.sling.distribution.queue.impl.resource;
+
+
+import org.apache.sling.api.resource.Resource;
+
+import java.util.Iterator;
+import java.util.Stack;
+
+public class ResourceIterator implements Iterator<Resource> {
+
+    private final String folderResourceType;
+    private final boolean includeFolders;
+    private final boolean includeLeafs;
+    private final boolean includeRoot = false;
+    private final String rootPath;
+
+    private Stack<Iterator<Resource>> folderIterators = new Stack<Iterator<Resource>>();
+
+    private Resource currentFolder;
+    private Iterator<Resource> currentIterator;
+
+    private Resource next;
+
+
+    public ResourceIterator(Resource root, String folderResourceType, boolean includeFolders, boolean includeLeafs) {
+        this.folderResourceType = folderResourceType;
+        this.includeFolders = includeFolders;
+        this.includeLeafs = includeLeafs;
+        this.rootPath = root.getPath();
+
+        currentFolder = root;
+        currentIterator = currentFolder.listChildren();
+        next = seek();
+    }
+
+    @Override
+    public boolean hasNext() {
+        return next != null;
+    }
+
+    @Override
+    public Resource next() {
+        Resource result = next;
+        next = seek();
+        return result;
+    }
+
+    @Override
+    public void remove() {
+        throw new UnsupportedOperationException();
+    }
+
+    Resource seek() {
+        Resource res;
+        while ((res = seekAll()) != null) {
+
+            if (rootPath.equals(res.getPath())) {
+                if (includeRoot) {
+                    return res;
+                } else {
+                    continue;
+                }
+            }
+
+            if (includeFolders && res.isResourceType(folderResourceType)) {
+                return res;
+            }
+
+            if (includeLeafs && !res.isResourceType(folderResourceType)) {
+                return res;
+            }
+        }
+
+        return null;
+    }
+
+    // depth first, post order (children before parents)
+    Resource seekAll() {
+
+        while (currentIterator != null) {
+            if (currentIterator.hasNext()) {
+                Resource res = currentIterator.next();
+
+                if (res.isResourceType(folderResourceType)) {
+                    folderIterators.push(currentIterator);
+
+                    currentFolder = res;
+                    currentIterator = currentFolder.listChildren();
+                } else {
+                    return res;
+                }
+            } else {
+                Resource folder = currentFolder;
+
+                currentFolder  = currentFolder.getParent();
+                currentIterator = folderIterators.empty() ? null  : folderIterators.pop();
+
+                return folder;
+            }
+        }
+
+        return null;
+    }
+}
diff --git a/src/main/java/org/apache/sling/distribution/queue/impl/resource/ResourceQueue.java b/src/main/java/org/apache/sling/distribution/queue/impl/resource/ResourceQueue.java
new file mode 100644
index 0000000..28162c3
--- /dev/null
+++ b/src/main/java/org/apache/sling/distribution/queue/impl/resource/ResourceQueue.java
@@ -0,0 +1,231 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.sling.distribution.queue.impl.resource;
+
+import org.apache.sling.api.resource.LoginException;
+import org.apache.sling.api.resource.PersistenceException;
+import org.apache.sling.api.resource.Resource;
+import org.apache.sling.api.resource.ResourceResolver;
+import org.apache.sling.api.resource.ResourceResolverFactory;
+import org.apache.sling.distribution.queue.DistributionQueueEntry;
+import org.apache.sling.distribution.queue.DistributionQueueItem;
+import org.apache.sling.distribution.queue.DistributionQueueState;
+import org.apache.sling.distribution.queue.DistributionQueueStatus;
+import org.apache.sling.distribution.queue.DistributionQueueType;
+import org.apache.sling.distribution.queue.spi.DistributionQueue;
+import org.apache.sling.distribution.util.impl.DistributionUtils;
+import org.jetbrains.annotations.NotNull;
+import org.jetbrains.annotations.Nullable;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.List;
+
+
+public class ResourceQueue implements DistributionQueue {
+    private final Logger log = LoggerFactory.getLogger(getClass());
+
+
+    private final ResourceResolverFactory resolverFactory;
+    private String serviceName;
+    private String queueName;
+    private final String queueRootPath;
+
+    public ResourceQueue(ResourceResolverFactory resolverFactory, String serviceName, String queueName, String rootPath) {
+        this.resolverFactory = resolverFactory;
+        this.serviceName = serviceName;
+        this.queueName = queueName;
+        this.queueRootPath = rootPath + "/" + queueName;
+    }
+
+    @NotNull
+    @Override
+    public String getName() {
+        return queueName;
+    }
+
+    @Nullable
+    @Override
+    public DistributionQueueEntry add(@NotNull DistributionQueueItem item) {
+
+        ResourceResolver resourceResolver = null;
+        try {
+            resourceResolver = DistributionUtils.loginService(resolverFactory, serviceName);
+
+            Resource queueRoot = ResourceQueueUtils.getRootResource(resourceResolver, queueRootPath);
+
+            Resource resource = ResourceQueueUtils.createResource(queueRoot, item);
+
+            DistributionQueueEntry entry = ResourceQueueUtils.readEntry(queueRoot, resource);
+
+            logEntry(entry, "add");
+
+            return entry;
+
+        } catch (LoginException e) {
+            throw new RuntimeException(e);
+        } catch (PersistenceException e) {
+            throw new RuntimeException(e);
+        } finally {
+            DistributionUtils.safelyLogout(resourceResolver);
+        }
+    }
+
+
+    @Override
+    public DistributionQueueEntry getHead() {
+        ResourceResolver resourceResolver = null;
+        try {
+            resourceResolver = DistributionUtils.loginService(resolverFactory, serviceName);
+            Resource queueRoot = ResourceQueueUtils.getRootResource(resourceResolver, queueRootPath);
+
+
+            DistributionQueueEntry head =  ResourceQueueUtils.getHead(queueRoot);
+
+            logEntry(head, "getHead");
+
+            return head;
+
+        } catch (LoginException e) {
+            throw new RuntimeException(e);
+        } catch (PersistenceException e) {
+            throw new RuntimeException(e);
+        } finally {
+            DistributionUtils.safelyLogout(resourceResolver);
+        }
+    }
+
+    @NotNull
+    @Override
+    public Iterable<DistributionQueueEntry> getItems(int skip, int limit) {
+        ResourceResolver resourceResolver = null;
+        try {
+            resourceResolver = DistributionUtils.loginService(resolverFactory, serviceName);
+            Resource queueRoot = ResourceQueueUtils.getRootResource(resourceResolver, queueRootPath);
+
+            List<DistributionQueueEntry> entries =  ResourceQueueUtils.getEntries(queueRoot, skip, limit);
+
+            log.debug("queue[{}] getItems entries={}", new Object[] { queueName, entries.size() });
+
+            return entries;
+        } catch (LoginException e) {
+            throw new RuntimeException(e);
+        } catch (PersistenceException e) {
+            throw new RuntimeException(e);
+        } finally {
+            DistributionUtils.safelyLogout(resourceResolver);
+        }
+
+    }
+
+    @Nullable
+    @Override
+    public DistributionQueueEntry getItem(@NotNull String itemId) {
+        ResourceResolver resourceResolver = null;
+        try {
+            resourceResolver = DistributionUtils.loginService(resolverFactory, serviceName);
+            Resource queueRoot = ResourceQueueUtils.getRootResource(resourceResolver, queueRootPath);
+
+            Resource itemResource =  ResourceQueueUtils.getResourceById(queueRoot, itemId);
+
+            DistributionQueueEntry entry = ResourceQueueUtils.readEntry(queueRoot, itemResource);
+
+            logEntry(entry, "getItem");
+
+            return entry;
+
+        } catch (LoginException e) {
+            throw new RuntimeException(e);
+        } catch (PersistenceException e) {
+            throw new RuntimeException(e);
+        } finally {
+            DistributionUtils.safelyLogout(resourceResolver);
+        }
+    }
+
+    @Nullable
+    @Override
+    public DistributionQueueEntry remove(@NotNull String itemId) {
+        ResourceResolver resourceResolver = null;
+        try {
+            resourceResolver = DistributionUtils.loginService(resolverFactory, serviceName);
+            Resource queueRoot = ResourceQueueUtils.getRootResource(resourceResolver, queueRootPath);
+
+            Resource itemResource = ResourceQueueUtils.getResourceById(queueRoot, itemId);
+
+            DistributionQueueEntry entry = ResourceQueueUtils.readEntry(queueRoot, itemResource);
+
+            ResourceQueueUtils.deleteResource(itemResource);
+
+            logEntry(entry, "remove");
+
+            return entry;
+
+        } catch (LoginException e) {
+            throw new RuntimeException(e);
+        } catch (PersistenceException e) {
+            throw new RuntimeException(e);
+        } finally {
+            DistributionUtils.safelyLogout(resourceResolver);
+        }
+    }
+
+    @Nullable
+    @Override
+    public DistributionQueueStatus getStatus() {
+        ResourceResolver resourceResolver = null;
+        try {
+            resourceResolver = DistributionUtils.loginService(resolverFactory, serviceName);
+            Resource queueRoot = ResourceQueueUtils.getRootResource(resourceResolver, queueRootPath);
+
+            int count = ResourceQueueUtils.getResourceCount(queueRoot);
+
+            return new DistributionQueueStatus(count, DistributionQueueState.PASSIVE);
+        } catch (LoginException e) {
+            throw new RuntimeException(e);
+        } catch (PersistenceException e) {
+            throw new RuntimeException(e);
+        } finally {
+            DistributionUtils.safelyLogout(resourceResolver);
+        }
+    }
+
+    @NotNull
+    @Override
+    public DistributionQueueType getType() {
+        return DistributionQueueType.ORDERED;
+    }
+
+    void logEntry(DistributionQueueEntry entry, String scope) {
+        if (entry == null) {
+            log.debug("queue[{}] {} null entry", new Object[] { queueName, scope });
+            return;
+        }
+
+        if (entry.getItem() == null) {
+            log.debug("queue[{}] {} null item (should not happen)", new Object[] { queueName, scope });
+            return;
+        }
+
+        String entryId = entry.getId();
+        DistributionQueueItem item = entry.getItem();
+        log.debug("queue[{}] {} entryId={} packageId={}", new Object[] { queueName, scope, entryId, item.getPackageId() });
+    }
+}
diff --git a/src/main/java/org/apache/sling/distribution/queue/impl/resource/ResourceQueueCleanupTask.java b/src/main/java/org/apache/sling/distribution/queue/impl/resource/ResourceQueueCleanupTask.java
new file mode 100644
index 0000000..f41b16e
--- /dev/null
+++ b/src/main/java/org/apache/sling/distribution/queue/impl/resource/ResourceQueueCleanupTask.java
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.sling.distribution.queue.impl.resource;
+
+import org.apache.sling.api.resource.PersistenceException;
+import org.apache.sling.api.resource.Resource;
+import org.apache.sling.api.resource.ResourceResolver;
+import org.apache.sling.api.resource.ResourceResolverFactory;
+import org.apache.sling.distribution.util.impl.DistributionUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Calendar;
+import java.util.Iterator;
+
+
+public class ResourceQueueCleanupTask implements Runnable {
+    private final Logger log = LoggerFactory.getLogger(getClass());
+
+
+    private final ResourceResolverFactory resolverFactory;
+    private final String serviceName;
+    private final String rootPath;
+
+    public ResourceQueueCleanupTask(ResourceResolverFactory resolverFactory, String serviceName, String rootPath) {
+
+        this.resolverFactory = resolverFactory;
+        this.serviceName = serviceName;
+        this.rootPath = rootPath;
+    }
+
+    @Override
+    public void run() {
+        log.debug("Cleaning up resource queues at {}", rootPath);
+
+        ResourceResolver resourceResolver = null;
+        try {
+            resourceResolver = DistributionUtils.loginService(resolverFactory, serviceName);
+            Resource root = ResourceQueueUtils.getRootResource(resourceResolver, rootPath);
+
+            Iterator<Resource> it = root.listChildren();
+
+            while (it.hasNext()) {
+                Resource queueRoot = it.next();
+
+                log.debug("Starting cleaning up queue at {}", queueRoot.getPath());
+
+                removeEmptyFolders(queueRoot);
+
+                log.debug("Finished cleaning up queue at {}", queueRoot.getPath());
+            }
+        } catch (Throwable e) {
+            log.error("Error cleaning up resource queues", e);
+        } finally {
+            DistributionUtils.safelyLogout(resourceResolver);
+        }
+    }
+
+
+    public void removeEmptyFolders(Resource root) throws PersistenceException {
+        Calendar now = Calendar.getInstance();
+        now.add(Calendar.MINUTE, -5);
+
+        ResourceResolver resolver = root.getResourceResolver();
+        ResourceIterator it = new ResourceIterator(root, ResourceQueueUtils.RESOURCE_FOLDER, true, false);
+
+        String nowPath =  ResourceQueueUtils.getTimePath(now);
+
+        while (it.hasNext()) {
+            Resource res = it.next();
+
+            String resPath = res.getPath().substring(root.getPath().length()+1);
+
+            if (!res.isResourceType(ResourceQueueUtils.RESOURCE_FOLDER)) {
+                continue;
+            }
+
+            // now 2018/03/15/03/48
+            // res 2018/02/15
+            if (!ResourceQueueUtils.isSafeToDelete(nowPath, resPath)) {
+                continue;
+            }
+
+            if (res.hasChildren()) {
+                continue;
+            }
+
+            log.debug("removing empty folder {}", res.getPath());
+
+            resolver.delete(res);
+            resolver.commit();
+        }
+
+    }
+}
diff --git a/src/main/java/org/apache/sling/distribution/queue/impl/resource/ResourceQueueProvider.java b/src/main/java/org/apache/sling/distribution/queue/impl/resource/ResourceQueueProvider.java
new file mode 100644
index 0000000..9c3c393
--- /dev/null
+++ b/src/main/java/org/apache/sling/distribution/queue/impl/resource/ResourceQueueProvider.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.sling.distribution.queue.impl.resource;
+
+import org.apache.sling.api.resource.ResourceResolverFactory;
+import org.apache.sling.commons.scheduler.Scheduler;
+import org.apache.sling.distribution.common.DistributionException;
+import org.apache.sling.distribution.queue.DistributionQueueType;
+import org.apache.sling.distribution.queue.impl.DistributionQueueProcessor;
+import org.apache.sling.distribution.queue.impl.DistributionQueueProvider;
+import org.apache.sling.distribution.queue.spi.DistributionQueue;
+
+import org.jetbrains.annotations.NotNull;
+import org.osgi.framework.BundleContext;
+import org.osgi.framework.ServiceRegistration;
+
+import java.util.Dictionary;
+import java.util.Hashtable;
+
+public class ResourceQueueProvider implements DistributionQueueProvider {
+
+    private final static String QUEUES_ROOT = "/var/sling/distribution/queues/";
+    private ResourceResolverFactory resolverFactory;
+    private String serviceName;
+    private String agentRootPath;
+
+    private ServiceRegistration cleanupTask;
+
+
+    public ResourceQueueProvider(BundleContext context, ResourceResolverFactory resolverFactory, String serviceName, String agentName) {
+        this.resolverFactory = resolverFactory;
+        this.serviceName = serviceName;
+        this.agentRootPath = QUEUES_ROOT + agentName;
+
+        register(context);
+    }
+
+    @NotNull
+    @Override
+    public DistributionQueue getQueue(@NotNull String queueName) throws DistributionException {
+        return new ResourceQueue(resolverFactory, serviceName, queueName, agentRootPath);
+    }
+
+    @NotNull
+    @Override
+    public DistributionQueue getQueue(@NotNull String queueName, @NotNull DistributionQueueType type) {
+        return new ResourceQueue(resolverFactory, serviceName, queueName, agentRootPath);
+    }
+
+    @Override
+    public void enableQueueProcessing(@NotNull DistributionQueueProcessor queueProcessor, String... queueNames) throws DistributionException {
+        // processing not supported
+    }
+
+    @Override
+    public void disableQueueProcessing() throws DistributionException {
+        // processing not supported
+    }
+
+
+    private void register(BundleContext context) {
+        Runnable cleanup = new ResourceQueueCleanupTask(resolverFactory, serviceName, agentRootPath);
+        Dictionary<String, Object> props = new Hashtable<String, Object>();
+        props.put(Scheduler.PROPERTY_SCHEDULER_CONCURRENT, false);
+        props.put(Scheduler.PROPERTY_SCHEDULER_PERIOD, 300L);
+        cleanupTask = context.registerService(Runnable.class.getName(), cleanup, props);
+    }
+
+    public void close() {
+        if (cleanupTask != null) {
+            cleanupTask.unregister();
+            cleanupTask = null;
+        }
+    }
+}
diff --git a/src/main/java/org/apache/sling/distribution/queue/impl/resource/ResourceQueueProviderFactory.java b/src/main/java/org/apache/sling/distribution/queue/impl/resource/ResourceQueueProviderFactory.java
new file mode 100644
index 0000000..ac0bafd
--- /dev/null
+++ b/src/main/java/org/apache/sling/distribution/queue/impl/resource/ResourceQueueProviderFactory.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.sling.distribution.queue.impl.resource;
+
+import org.apache.felix.scr.annotations.Activate;
+import org.apache.felix.scr.annotations.Component;
+import org.apache.felix.scr.annotations.Property;
+import org.apache.felix.scr.annotations.Reference;
+import org.apache.felix.scr.annotations.Service;
+import org.apache.sling.api.resource.ResourceResolverFactory;
+import org.apache.sling.distribution.component.impl.DistributionComponentConstants;
+import org.apache.sling.distribution.queue.impl.DistributionQueueProvider;
+import org.apache.sling.distribution.queue.impl.DistributionQueueProviderFactory;
+import org.osgi.framework.BundleContext;
+
+import java.util.Map;
+
+@Component
+@Service(DistributionQueueProviderFactory.class)
+@Property(name = DistributionComponentConstants.PN_NAME, value = "resourceQueue")
+public class ResourceQueueProviderFactory implements DistributionQueueProviderFactory {
+
+    @Reference
+    ResourceResolverFactory resourceResolverFactory;
+
+    BundleContext context;
+
+    @Activate
+    protected void activate(BundleContext context, Map<String, Object> config)
+    {
+        this.context = context;
+    }
+
+    @Override
+    public DistributionQueueProvider getProvider(String agentName, String serviceName) {
+        return new ResourceQueueProvider(context, resourceResolverFactory, serviceName, agentName);
+    }
+
+    @Override
+    public void releaseProvider(DistributionQueueProvider queueProvider) {
+        if (queueProvider instanceof ResourceQueueProvider) {
+            ((ResourceQueueProvider) queueProvider).close();
+        }
+    }
+}
\ No newline at end of file
diff --git a/src/main/java/org/apache/sling/distribution/queue/impl/resource/ResourceQueueUtils.java b/src/main/java/org/apache/sling/distribution/queue/impl/resource/ResourceQueueUtils.java
new file mode 100644
index 0000000..1408cb6
--- /dev/null
+++ b/src/main/java/org/apache/sling/distribution/queue/impl/resource/ResourceQueueUtils.java
@@ -0,0 +1,416 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.sling.distribution.queue.impl.resource;
+
+
+import org.apache.sling.api.resource.PersistenceException;
+import org.apache.sling.api.resource.Resource;
+import org.apache.sling.api.resource.ResourceResolver;
+import org.apache.sling.api.resource.ResourceUtil;
+import org.apache.sling.api.resource.ValueMap;
+import org.apache.sling.distribution.DistributionRequestType;
+import org.apache.sling.distribution.packaging.DistributionPackageInfo;
+import org.apache.sling.distribution.queue.DistributionQueueEntry;
+import org.apache.sling.distribution.queue.DistributionQueueItem;
+import org.apache.sling.distribution.queue.DistributionQueueItemState;
+import org.apache.sling.distribution.queue.DistributionQueueItemStatus;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+import javax.jcr.NodeIterator;
+import javax.jcr.RepositoryException;
+import javax.jcr.Session;
+import javax.jcr.query.Query;
+import javax.jcr.query.QueryManager;
+import javax.jcr.query.QueryResult;
+import java.text.SimpleDateFormat;
+import java.util.ArrayList;
+import java.util.Calendar;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.atomic.AtomicLong;
+
+public class ResourceQueueUtils {
+
+    // prefix for queue entry ids
+    private static final String ID_START = "distrq-";
+
+    // resource folder for queue roots
+    private static final String RESOURCE_ROOT = "sling:Folder";
+
+    // resource type for internal ordered folders
+    public static final String RESOURCE_FOLDER = "sling:OrderedFolder";
+
+    // resource type for internal entries
+    private static final String RESOURCE_ITEM = "nt:unstructured";
+
+    private static final String DISTRIBUTION_PACKAGE_PREFIX = "distribution.";
+    private static final String DISTRIBUTION_PACKAGE_ID = DISTRIBUTION_PACKAGE_PREFIX + "item.id";
+    private static final String DISTRIBUTION_PACKAGE_SIZE = DISTRIBUTION_PACKAGE_PREFIX + "package.size";
+
+
+    private static final AtomicLong itemCounter = new AtomicLong(0);
+    private static final Logger log = LoggerFactory.getLogger(ResourceQueueUtils.class);
+
+
+    private static Map<String, Object> serializeItem(DistributionQueueItem queueItem) {
+
+        Map<String, Object> properties = new HashMap<String, Object>();
+
+        for (String key : queueItem.keySet()) {
+            Object value = queueItem.get(key);
+
+            if (DistributionPackageInfo.PROPERTY_REQUEST_TYPE.equals(key)) {
+                if (value instanceof DistributionRequestType) {
+                    value = ((DistributionRequestType) value).name();
+                }
+            }
+
+            if (value != null) {
+                properties.put(DISTRIBUTION_PACKAGE_PREFIX + key, value);
+            }
+        }
+
+        properties.put(DISTRIBUTION_PACKAGE_ID, queueItem.getPackageId());
+        properties.put(DISTRIBUTION_PACKAGE_SIZE, queueItem.getSize());
+
+        return properties;
+    }
+
+    private static DistributionQueueItem deserializeItem(ValueMap valueMap) {
+
+        String packageId = valueMap.get(DISTRIBUTION_PACKAGE_ID, String.class);
+        Long sizeProperty = valueMap.get(DISTRIBUTION_PACKAGE_SIZE, Long.class);
+        long size = sizeProperty == null ? -1 : sizeProperty;
+
+        Map<String, Object> properties = new HashMap<String, Object>();
+
+        for (String key : valueMap.keySet()) {
+            if (key.startsWith(DISTRIBUTION_PACKAGE_PREFIX)) {
+                String infoKey = key.substring(DISTRIBUTION_PACKAGE_PREFIX.length());
+                Object value = valueMap.get(key);
+
+                if (DistributionPackageInfo.PROPERTY_REQUEST_TYPE.equals(infoKey)) {
+                    if (value instanceof String) {
+                        value = DistributionRequestType.valueOf((String) value);
+                    }
+                }
+
+                properties.put(infoKey, value);
+            }
+        }
+
+        DistributionQueueItem queueItem = new DistributionQueueItem(packageId, size, properties);
+        return queueItem;
+    }
+
+    static DistributionQueueEntry readEntry(Resource queueRoot, Resource resource) {
+
+        if (resource == null) {
+            return null;
+        }
+
+        if (!resource.getPath().startsWith(queueRoot.getPath() + "/")) {
+            return null;
+        }
+
+        if (!resource.isResourceType(RESOURCE_ITEM)) {
+            return null;
+        }
+
+        String queueName = queueRoot.getName();
+        DistributionQueueItem queueItem = deserializeItem(resource.getValueMap());
+        DistributionQueueItemStatus queueItemStatus = new DistributionQueueItemStatus(DistributionQueueItemState.QUEUED, queueName);
+
+        String entryId = getIdFromPath(queueRoot.getPath(), resource.getPath());
+
+        return new DistributionQueueEntry(entryId, queueItem, queueItemStatus);
+    }
+
+    static List<DistributionQueueEntry> getEntries(Resource queueRoot, int skip, int limit) {
+        Iterator<Resource> it = new ResourceIterator(queueRoot, RESOURCE_FOLDER, false, true);
+
+        List<DistributionQueueEntry> entries = new ArrayList<DistributionQueueEntry>();
+
+        int i = 0;
+        while (it.hasNext()) {
+            Resource resource = it.next();
+
+            if (i++ < skip) {
+                continue;
+            }
+
+            DistributionQueueEntry entry = readEntry(queueRoot, resource);
+            entries.add(entry);
+
+            if (entries.size() >= limit) {
+                break;
+            }
+        }
+
+        return entries;
+
+    }
+
+
+    static DistributionQueueEntry getHead(Resource root) {
+        Iterator<DistributionQueueEntry> it =  getEntries(root, 0, 1).iterator();
+
+        if (it.hasNext()) {
+            return it.next();
+        }
+
+        return null;
+    }
+
+    public static Resource getRootResource(ResourceResolver resourceResolver, String rootPath) throws PersistenceException {
+        Resource resource =  ResourceUtil.getOrCreateResource(resourceResolver, rootPath, RESOURCE_FOLDER, RESOURCE_ROOT, true);
+
+        return resource;
+    }
+
+    public static Resource getResourceById(Resource root, String entryId)  {
+        String entryPath = getPathFromId(root.getPath(), entryId);
+        return root.getResourceResolver().getResource(entryPath);
+    }
+
+
+
+    public static Resource createResource(Resource root, DistributionQueueItem queueItem) throws PersistenceException {
+
+        Resource minuteResource = getOrCreateMinuteResource(root);
+
+        String entryPath = getUniqueEntryPath(minuteResource);
+
+        ResourceResolver resourceResolver = root.getResourceResolver();
+
+        Map<String, Object> properties = serializeItem(queueItem);
+
+        properties.put("sling:resourceType", RESOURCE_ITEM);
+        Resource resourceItem =  ResourceUtil.getOrCreateResource(resourceResolver, entryPath, properties,
+                RESOURCE_FOLDER, true);
+
+        resourceResolver.commit();
+
+        return resourceItem;
+    }
+
+
+    /**
+     * Creates a minute resource by retrying several times. If it fails even the last time it will throw an exception.
+     */
+    private static Resource getOrCreateMinuteResource(Resource root) throws PersistenceException {
+
+        for (int i=0; i<2; i++) {
+            try {
+                return tryGetOrCreateMinutes(root);
+            } catch (PersistenceException e) {
+                log.warn("creating minute resource failed. retrying {} more times.", 5-i);
+            }
+
+            root.getResourceResolver().revert();
+            root.getResourceResolver().refresh();
+        }
+
+        return tryGetOrCreateMinutes(root);
+    }
+
+    /**
+     * Creates a set of resources for consecutive minutes.
+     * This ensures that consecutive minutes are created by a single thread, and that are created in order.
+     * This might fail due to concurrency issues and needs to be retried a couple of times.
+     */
+    private static Resource tryGetOrCreateMinutes(Resource root) throws PersistenceException {
+
+        ResourceResolver resourceResolver = root.getResourceResolver();
+        Calendar now = Calendar.getInstance();
+
+        String firstMinutePath = getTimePath(now);
+        Resource firstMinuteResource = resourceResolver.getResource(root, firstMinutePath);
+
+        if (firstMinuteResource != null) {
+            return firstMinuteResource;
+        }
+
+        for (int i=0; i < 3; i++) {
+            now.add(Calendar.MINUTE, i);
+            String newMinutePath = getTimePath(now);
+            Resource resource = createResource(root, newMinutePath);
+        }
+
+        resourceResolver.commit();
+
+        firstMinuteResource = resourceResolver.getResource(root, firstMinutePath);
+
+        return firstMinuteResource;
+    }
+
+    /*
+     * Creates a new resource at the specified path
+     * This is different than ResourceUtil.getOrCreateResource as it only creates the resource, it does not retrieve it.
+     * This ensures that consecutive minutes are always created atomically.
+     */
+    private static Resource createResource(Resource root, String relPath) throws PersistenceException {
+        ResourceResolver resourceResolver = root.getResourceResolver();
+
+        String path = root.getPath() + "/" + relPath;
+        final String parentPath = ResourceUtil.getParent(path);
+        final String name = ResourceUtil.getName(path);
+
+        Resource parent =  ResourceUtil.getOrCreateResource(resourceResolver, parentPath, RESOURCE_FOLDER,
+                RESOURCE_FOLDER, false);
+
+        Map<String, Object> props = Collections.singletonMap(ResourceResolver.PROPERTY_RESOURCE_TYPE, (Object) RESOURCE_FOLDER);
+
+        return resourceResolver.create(parent, name, props);
+    }
+
+    public static void deleteResource(Resource resource) throws PersistenceException {
+        ResourceResolver resolver = resource.getResourceResolver();
+
+        String path = resource.getPath();
+
+        try {
+            resolver.delete(resource);
+            resolver.commit();
+        } catch (PersistenceException var10) {
+            resolver.revert();
+            resolver.refresh();
+            resource = resolver.getResource(path);
+            if (resource != null) {
+                resolver.delete(resource);
+                resolver.commit();
+            }
+        }
+    }
+
+
+    public static int getResourceCount(Resource root) {
+        ResourceResolver resolver = root.getResourceResolver();
+        Session session = resolver.adaptTo(Session.class);
+
+        StringBuilder buf = new StringBuilder();
+        buf.append("/jcr:root");
+        buf.append(root.getPath());
+        buf.append("//element(*,");
+        buf.append(RESOURCE_ITEM);
+        buf.append(")");
+
+        try {
+            QueryManager qManager = session.getWorkspace().getQueryManager();
+            Query q = qManager.createQuery(buf.toString(), "xpath");
+            final QueryResult res = q.execute();
+
+            NodeIterator it = res.getNodes();
+            return  (int) it.getSize();
+        } catch (RepositoryException e) {
+            return -1;
+        }
+    }
+
+
+    private static String getUniqueEntryPath(Resource parent) {
+        final StringBuilder sb = new StringBuilder();
+        sb.append(parent.getPath());
+        sb.append('/');
+        sb.append(UUID.randomUUID().toString().replace("-", ""));
+        sb.append('_');
+        sb.append(itemCounter.getAndIncrement());
+
+        return sb.toString();
+    }
+
+    /**
+     * Transforms current time to path 2018/01/03/23/54
+     * @param now the current time
+     * @return the serialized time
+     */
+    public static String getTimePath(Calendar now) {
+        SimpleDateFormat sdf = new SimpleDateFormat("yyyy/MM/dd/HH/mm");
+
+        return sdf.format(now.getTime());
+    }
+
+
+    /**
+     * Checks if path is safe to delete at this time.
+     * A path is safe to delete if the nowPath does not overlap with it.
+     *
+     * @param nowPath represents a full path of current time (e.g. 2018/01/03/23/54)
+     * @param path the path to be checked (it can be a partial path e.g. 2018/01)
+     * @return true if checked path is in the past
+     */
+    public static boolean isSafeToDelete(String nowPath, String path) {
+
+        // should not happen
+        if (nowPath.length() < path.length()) {
+            return false;
+        }
+
+        nowPath = nowPath.substring(0, path.length());
+
+        return nowPath.compareTo(path) > 0;
+    }
+
+
+
+    private static String getPathFromId(String roothPath, String entryId) {
+        String entryPath = unescapeId(entryId);
+        return roothPath + "/" + entryPath;
+    }
+
+    private static String getIdFromPath(String rootPath, String path) {
+
+        if (path.startsWith(rootPath)) {
+            String entryPath = path.substring(rootPath.length()+1);
+
+            String entryId = escapeId(entryPath);
+
+            return entryId;
+        }
+        throw new IllegalArgumentException("entry path does not start with " + rootPath);
+    }
+
+
+    private static String escapeId(String jobId) {
+        //return id;
+        if (jobId == null) {
+            return null;
+        }
+        return ID_START + jobId.replace("/", "--");
+    }
+
+    public static String unescapeId(String itemId) {
+        if (itemId == null) {
+            return null;
+        }
+        if (!itemId.startsWith(ID_START)) {
+            return null;
+        }
+
+        return itemId.replace(ID_START, "").replace("--", "/");
+    }
+
+}
diff --git a/src/test/java/org/apache/sling/distribution/DistributionBaseIT.java b/src/test/java/org/apache/sling/distribution/DistributionBaseIT.java
new file mode 100644
index 0000000..707d3f8
--- /dev/null
+++ b/src/test/java/org/apache/sling/distribution/DistributionBaseIT.java
@@ -0,0 +1,152 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.sling.distribution;
+
+import org.apache.sling.api.resource.ResourceResolverFactory;
+import org.apache.sling.distribution.agent.impl.PrivilegeDistributionRequestAuthorizationStrategyFactory;
+import org.apache.sling.distribution.agent.impl.QueueDistributionAgentFactory;
+import org.apache.sling.distribution.agent.spi.DistributionAgent;
+import org.apache.sling.distribution.serialization.impl.vlt.VaultDistributionPackageBuilderFactory;
+import org.apache.sling.testing.paxexam.SlingOptions;
+import org.apache.sling.testing.paxexam.TestSupport;
+import org.ops4j.pax.exam.Configuration;
+import org.ops4j.pax.exam.CoreOptions;
+import org.ops4j.pax.exam.Option;
+import org.osgi.framework.BundleContext;
+import org.osgi.framework.Filter;
+import org.osgi.framework.InvalidSyntaxException;
+import org.osgi.framework.ServiceReference;
+
+import javax.inject.Inject;
+
+import java.io.File;
+
+import static org.apache.sling.testing.paxexam.SlingOptions.logback;
+import static org.apache.sling.testing.paxexam.SlingOptions.slingDistribution;
+import static org.apache.sling.testing.paxexam.SlingOptions.slingQuickstartOakTar;
+import static org.ops4j.pax.exam.CoreOptions.composite;
+import static org.ops4j.pax.exam.CoreOptions.junitBundles;
+import static org.ops4j.pax.exam.cm.ConfigurationAdminOptions.factoryConfiguration;
+import static org.ops4j.pax.exam.cm.ConfigurationAdminOptions.newConfiguration;
+
+public class DistributionBaseIT extends TestSupport {
+
+    protected static final String AGENT_RESOURCE_QUEUE = "agentResourceQueue";
+    protected static final String AGENT_JOB_QUEUE = "agentJobQueue";
+
+
+    @Inject
+    protected ResourceResolverFactory resolverFactory;
+
+    @Inject
+    protected BundleContext context;
+
+
+    @Configuration
+    public Option[] configuration() {
+        return new Option[]{
+                baseConfiguration(),
+                slingQuickstart(),
+                logback(),
+                // build artifact
+                slingDistribution(),
+                // testing
+                testBundle(    "bundle.filename"),
+                defaultOsgiConfigs(),
+                junitBundles()
+        };
+    }
+
+    protected Option slingQuickstart() {
+        final String workingDirectory = workingDirectory(); // from TestSupport
+        final int httpPort = findFreePort(); // from TestSupport
+        return composite(
+                slingQuickstartOakTar(workingDirectory, httpPort) // from SlingOptions
+        );
+    }
+
+    public static Option defaultOsgiConfigs() {
+        return composite(
+                newConfiguration("org.apache.sling.jcr.resource.internal.JcrSystemUserValidator")
+                        .put("allow.only.system.user", false).asOption(),
+
+                newConfiguration("org.apache.sling.jcr.base.internal.LoginAdminWhitelist")
+                        .put("whitelist.bypass", "true").asOption(),
+
+                // For production the users would be: replication-service,content-writer-service
+                factoryConfiguration("org.apache.sling.serviceusermapping.impl.ServiceUserMapperImpl.amended")
+                        .put("user.mapping", new String[]{"org.apache.sling.distribution.core:testService=admin"})
+                        .asOption(),
+
+                factoryConfiguration(PrivilegeDistributionRequestAuthorizationStrategyFactory.class.getName())
+                        .put("name", "default")
+                        .put("jcrPrivilege", "jcr:read")
+                        .asOption(),
+
+                factoryConfiguration(VaultDistributionPackageBuilderFactory.class.getName())
+                        .put("name", "default")
+                        .put("type", "jcrvlt")
+                        .asOption(),
+
+                factoryConfiguration(QueueDistributionAgentFactory.class.getName())
+                        .put("name", AGENT_RESOURCE_QUEUE)
+                        .put("serviceName", "testService")
+                        .put("enabled", true)
+                        .put("queueProviderFactory.target", "(name=resourceQueue)")
+                        .asOption(),
+
+                factoryConfiguration(QueueDistributionAgentFactory.class.getName())
+                        .put("name", AGENT_JOB_QUEUE)
+                        .put("serviceName", "testService")
+                        .put("enabled", true)
+                        .put("queueProviderFactory.target", "(name=jobQueue)")
+                        .asOption()
+        );
+    }
+
+
+    protected DistributionAgent getAgent(String agentName) {
+
+        for (int i=0; i< 10; i++) {
+            try {
+                Filter filter = context.createFilter("(name="+agentName+")");
+
+                ServiceReference[] srs = this.context.getServiceReferences(DistributionAgent.class.getName(), filter.toString());
+                if (srs == null || srs.length == 0) {
+                    Thread.sleep(1000);
+                    continue;
+                }
+                ServiceReference sr = srs[0];
+
+                Object service = context.getService(sr);
+
+                return (DistributionAgent) service;
+            } catch (InvalidSyntaxException e) {
+
+            } catch (InterruptedException e) {
+            }
+
+
+        }
+
+        return null;
+    }
+
+}
diff --git a/src/test/java/org/apache/sling/distribution/queue/impl/resource/ResourceIteratorTest.java b/src/test/java/org/apache/sling/distribution/queue/impl/resource/ResourceIteratorTest.java
new file mode 100644
index 0000000..285ca3a
--- /dev/null
+++ b/src/test/java/org/apache/sling/distribution/queue/impl/resource/ResourceIteratorTest.java
@@ -0,0 +1,166 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.sling.distribution.queue.impl.resource;
+
+import org.apache.sling.api.resource.PersistenceException;
+import org.apache.sling.api.resource.Resource;
+import org.apache.sling.testing.mock.sling.junit.SlingContext;
+import org.apache.sling.testing.resourceresolver.MockHelper;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+import static org.junit.Assert.assertEquals;
+
+public class ResourceIteratorTest {
+
+    static final String FOLDER_TYPE = ResourceQueueUtils.RESOURCE_FOLDER;
+    static final String ITEM_TYPE = "nt:unstructured";
+
+    @Rule
+    public final SlingContext context = new SlingContext();
+
+    @Before
+    public void testSetup() {
+        MockHelper helper = MockHelper.create(context.resourceResolver());
+
+        helper.resource("/root").p("prop", "value")
+                .resource("2018").p("sling:resourceType", FOLDER_TYPE)
+                .resource("10").p("sling:resourceType", FOLDER_TYPE)
+                .resource("15").p("sling:resourceType", FOLDER_TYPE)
+                .resource("11").p("sling:resourceType", FOLDER_TYPE)
+                .resource("35").p("sling:resourceType", FOLDER_TYPE)
+                .resource("item1").p("sling:resourceType", ITEM_TYPE)
+                .resource("../item2").p("sling:resourceType", ITEM_TYPE)
+                .resource("../../36").p("sling:resourceType", FOLDER_TYPE)
+                .resource("item3").p("sling:resourceType", ITEM_TYPE);
+
+        helper.resource("/root2").p("prop", "value")
+                .resource("2018").p("sling:resourceType", FOLDER_TYPE)
+                .resource("10").p("sling:resourceType", FOLDER_TYPE)
+                .resource("15").p("sling:resourceType", FOLDER_TYPE)
+                .resource("11").p("sling:resourceType", FOLDER_TYPE)
+                .resource("35").p("sling:resourceType", FOLDER_TYPE)
+                .resource("../36").p("sling:resourceType", FOLDER_TYPE)
+                .resource("item3").p("sling:resourceType", ITEM_TYPE);
+
+        try {
+            helper.commit();
+        } catch (PersistenceException e) {
+
+        }
+
+    }
+
+    @Test
+    public void testEmptyRoot() {
+        Resource root =  context.resourceResolver().getResource("/root");
+
+        ResourceIterator it =  new ResourceIterator(root, ResourceQueueUtils.RESOURCE_FOLDER, false, false);
+
+
+        test(it, new String[0]);
+    }
+
+    @Test
+    public void testFullPath() {
+        Resource root =  context.resourceResolver().getResource("/root");
+
+        ResourceIterator it =  new ResourceIterator(root, ResourceQueueUtils.RESOURCE_FOLDER, true, true);
+
+        test(it, new String[] {
+                "/root/2018/10/15/11/35/item1",
+                "/root/2018/10/15/11/35/item2",
+                "/root/2018/10/15/11/35",
+                "/root/2018/10/15/11/36/item3",
+                "/root/2018/10/15/11/36",
+                "/root/2018/10/15/11",
+                "/root/2018/10/15",
+                "/root/2018/10",
+                "/root/2018"
+        });
+    }
+
+    @Test
+    public void testFullPath2() {
+        Resource root =  context.resourceResolver().getResource("/root2");
+
+        ResourceIterator it =  new ResourceIterator(root, ResourceQueueUtils.RESOURCE_FOLDER, true, true);
+
+        test(it, new String[] {
+                "/root2/2018/10/15/11/35",
+                "/root2/2018/10/15/11/36/item3",
+                "/root2/2018/10/15/11/36",
+                "/root2/2018/10/15/11",
+                "/root2/2018/10/15",
+                "/root2/2018/10",
+                "/root2/2018"
+        });
+    }
+
+    @Test
+    public void testFullPathNoFolder() {
+        Resource root =  context.resourceResolver().getResource("/root");
+
+        ResourceIterator it =  new ResourceIterator(root, ResourceQueueUtils.RESOURCE_FOLDER, false, true);
+
+
+        test(it, new String[] {
+                "/root/2018/10/15/11/35/item1",
+                "/root/2018/10/15/11/35/item2",
+                "/root/2018/10/15/11/36/item3",
+        });
+    }
+
+    @Test
+    public void testFullPathNoLeafs() {
+        Resource root =  context.resourceResolver().getResource("/root");
+
+        ResourceIterator it =  new ResourceIterator(root, ResourceQueueUtils.RESOURCE_FOLDER, true, false);
+
+        test(it, new String[] {
+                "/root/2018/10/15/11/35",
+                "/root/2018/10/15/11/36",
+                "/root/2018/10/15/11",
+                "/root/2018/10/15",
+                "/root/2018/10",
+                "/root/2018"
+        });
+    }
+
+    public void test(ResourceIterator it, String[] paths) {
+        List<String> expected = Arrays.asList(paths);
+
+        assertEquals(expected, toPaths(it));
+    }
+
+
+    List<String> toPaths(ResourceIterator it) {
+        List<String> paths = new ArrayList<String>();
+        while (it.hasNext()) {
+            paths.add(it.next().getPath());
+        }
+        return paths;
+    }
+}
diff --git a/src/test/java/org/apache/sling/distribution/queue/impl/resource/ResourceQueueIT.java b/src/test/java/org/apache/sling/distribution/queue/impl/resource/ResourceQueueIT.java
new file mode 100644
index 0000000..b599aa5
--- /dev/null
+++ b/src/test/java/org/apache/sling/distribution/queue/impl/resource/ResourceQueueIT.java
@@ -0,0 +1,237 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.sling.distribution.queue.impl.resource;
+
+import org.apache.sling.api.resource.LoginException;
+import org.apache.sling.api.resource.ResourceResolver;
+import org.apache.sling.distribution.DistributionRequestType;
+import org.apache.sling.distribution.DistributionResponse;
+import org.apache.sling.distribution.Distributor;
+import org.apache.sling.distribution.SimpleDistributionRequest;
+import org.apache.sling.distribution.agent.spi.DistributionAgent;
+import org.apache.sling.distribution.common.DistributionException;
+import org.apache.sling.distribution.queue.DistributionQueueEntry;
+import org.apache.sling.distribution.queue.DistributionQueueItem;
+import org.apache.sling.distribution.DistributionBaseIT;
+import org.apache.sling.distribution.queue.spi.DistributionQueue;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.ops4j.pax.exam.junit.PaxExam;
+
+import javax.inject.Inject;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+
+@RunWith(PaxExam.class)
+public class ResourceQueueIT extends DistributionBaseIT {
+
+    static final int ITERATIONS = 1000;
+    static final String QUEUE_NAME = "default";
+
+    @Inject
+    protected Distributor distributor;
+
+
+    @Test
+    public void testExecute() throws LoginException, DistributionException {
+
+        DistributionAgent agent = getAgent(AGENT_RESOURCE_QUEUE);
+
+        assertNotNull(agent);
+
+        ResourceResolver resourceResolver =  resolverFactory.getAdministrativeResourceResolver(null);
+
+        DistributionResponse response = agent.execute(resourceResolver,
+                new SimpleDistributionRequest(DistributionRequestType.ADD, "/content" ));
+        assertTrue(response.isSuccessful());
+
+        DistributionQueue queue = agent.getQueue(QUEUE_NAME);
+
+        assertEquals(1, queue.getStatus().getItemsCount());
+
+        clear(queue);
+
+    }
+
+
+    @Test
+    public void testFifo() {
+        DistributionAgent agent = getAgent(AGENT_RESOURCE_QUEUE);
+
+        DistributionQueue queue = agent.getQueue(QUEUE_NAME);
+
+        queue.add(new DistributionQueueItem("packageId", 10, new HashMap<String, Object>()));
+
+        assertEquals(1, queue.getStatus().getItemsCount());
+
+        clear(queue);
+    }
+
+    @Test
+    public void testConcurrentFifo () throws InterruptedException {
+        final DistributionAgent agent = getAgent(AGENT_RESOURCE_QUEUE);
+        final DistributionQueue queue = agent.getQueue(QUEUE_NAME);
+
+        Producer p1 = new Producer(queue, "p1");
+        Producer p2 = new Producer(queue, "p2");
+        Producer p3 = new Producer(queue, "p3");
+        Producer p4 = new Producer(queue, "p4");
+
+        Map<String, Producer> producerMap = new HashMap<String, Producer>();
+        producerMap.put("p1", p1);
+        producerMap.put("p2", p2);
+        producerMap.put("p3", p3);
+        producerMap.put("p4", p4);
+
+        Consumer c1 = new Consumer(queue, "c1", new String[] { "p1", "p2" });
+        Consumer c2 = new Consumer(queue, "c2", new String[] { "p3", "p4" });
+
+        Map<String, Consumer> consumerMap = new HashMap<String, Consumer>();
+        consumerMap.put("c1", c1);
+        consumerMap.put("c2", c2);
+
+        List<Thread> threads = new ArrayList<Thread>();
+        for (Producer p : producerMap.values()) {
+            threads.add(new Thread(p));
+        }
+        for (Consumer c : consumerMap.values()) {
+            threads.add(new Thread(c));
+        }
+
+        for (Thread t : threads) {
+            t.start();
+        }
+
+        for (Thread t : threads) {
+            t.join(600 * 1000);
+        }
+
+        for (Producer p : producerMap.values()) {
+            assertEquals(ITERATIONS, p.created.size());
+        }
+
+        for (Consumer c : consumerMap.values()) {
+            for (String source : c.sources) {
+                Producer p = producerMap.get(source);
+                assertEquals(p.created, c.removedBySource.get(source));
+            }
+        }
+
+        assertEquals(0, queue.getStatus().getItemsCount());
+
+        clear(queue);
+    }
+
+    void clear(DistributionQueue queue) {
+        for (DistributionQueueEntry entry : queue.getItems(0, -1)) {
+            queue.remove(entry.getId());
+        }
+    }
+
+    class Producer implements Runnable {
+        private final DistributionQueue queue;
+        private final String name;
+        final List<String> created = Collections.synchronizedList(new ArrayList<String>());
+
+        Producer(DistributionQueue queue, String name) {
+            this.queue = queue;
+            this.name = name;
+        }
+
+
+        public void run() {
+            for(int i=0; i<ITERATIONS; i++) {
+
+                if (i % 1000 == 0) {
+                    System.out.println("Producer " + name + " " + i);
+                }
+
+                final Map<String, Object> props = new HashMap<String, Object>();
+                final String packageId = UUID.randomUUID().toString();
+                final long size = 10;
+
+                props.put("auuid", UUID.randomUUID().toString());
+                props.put("no", i);
+                props.put("source", name);
+
+                final DistributionQueueItem item = new DistributionQueueItem(packageId, size, props);
+
+                final DistributionQueueEntry entry = queue.add(item);
+
+                created.add(entry.getId());
+            }
+        }
+    }
+
+    class Consumer implements Runnable {
+        private final DistributionQueue queue;
+        private final String name;
+        private final String[] sources;
+
+        final List<String> removed = new ArrayList<String>();
+
+        final Map<String, List<String>> removedBySource = new HashMap<String, List<String>>();
+
+
+        Consumer(DistributionQueue queue, String name, String[] sources) {
+
+            this.queue = queue;
+            this.name = name;
+            this.sources = sources;
+            for (String source: sources) {
+                removedBySource.put(source, new ArrayList<String>());
+            }
+        }
+
+        public void run() {
+            while (removed.size() < sources.length * ITERATIONS) {
+
+                final DistributionQueueEntry entry = queue.getHead();
+
+                if (entry != null) {
+
+                    String source = (String) entry.getItem().get("source");
+
+                    if (removedBySource.containsKey(source)) {
+                        if (removed.size() % 1000 == 0) {
+                            System.out.println("Consumer " + name + " " + removed.size());
+                        }
+
+                        final DistributionQueueEntry removedEntry = queue.remove(entry.getId());
+
+                        assertEquals(removedEntry.getId(), entry.getId());
+
+                        removed.add(entry.getId());
+                        removedBySource.get(source).add(entry.getId());
+                    }
+                }
+            }
+        }
+    }
+}
diff --git a/src/test/java/org/apache/sling/distribution/queue/impl/resource/ResourceQueueUtilsTest.java b/src/test/java/org/apache/sling/distribution/queue/impl/resource/ResourceQueueUtilsTest.java
new file mode 100644
index 0000000..af04a81
--- /dev/null
+++ b/src/test/java/org/apache/sling/distribution/queue/impl/resource/ResourceQueueUtilsTest.java
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sling.distribution.queue.impl.resource;
+
+import org.junit.Test;
+
+import java.util.GregorianCalendar;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+public class ResourceQueueUtilsTest {
+
+    @Test
+    public void testTimePath() throws Exception {
+
+
+        assertEquals("2018/01/01/00/00",
+                ResourceQueueUtils.getTimePath(new GregorianCalendar(2018, 0, 1, 0, 0)));
+
+
+        assertEquals("2018/12/01/00/00",
+                ResourceQueueUtils.getTimePath(new GregorianCalendar(2018, 11, 1, 0, 0)));
+
+        assertEquals("2018/12/31/23/59",
+                ResourceQueueUtils.getTimePath(new GregorianCalendar(2018, 11, 31, 23, 59)));
+    }
+
+
+    @Test
+    public void testIsSafeToDelete() throws Exception {
+
+        assertTrue(ResourceQueueUtils.isSafeToDelete("2018/02/15/03/09", "2017"));
+        assertFalse(ResourceQueueUtils.isSafeToDelete("2018/02/15/03/09", "2018"));
+        assertFalse(ResourceQueueUtils.isSafeToDelete("2018/02/15/03/09", "2019"));
+
+        assertTrue(ResourceQueueUtils.isSafeToDelete("2018/02/15/03/09", "2018/01"));
+        assertFalse(ResourceQueueUtils.isSafeToDelete("2018/02/15/03/09", "2018/02"));
+        assertFalse(ResourceQueueUtils.isSafeToDelete("2018/02/15/03/09", "2018/03"));
+
+        assertTrue(ResourceQueueUtils.isSafeToDelete("2018/02/15/03/09", "2018/02/14"));
+        assertFalse(ResourceQueueUtils.isSafeToDelete("2018/02/15/03/09", "2018/02/15"));
+        assertFalse(ResourceQueueUtils.isSafeToDelete("2018/02/15/03/09", "2018/02/16"));
+
+        assertTrue(ResourceQueueUtils.isSafeToDelete("2018/02/15/03/09", "2018/02/15/02"));
+        assertFalse(ResourceQueueUtils.isSafeToDelete("2018/02/15/03/09", "2018/02/15/03"));
+        assertFalse(ResourceQueueUtils.isSafeToDelete("2018/02/15/03/09", "2018/02/15/04"));
+
+        assertTrue(ResourceQueueUtils.isSafeToDelete("2018/02/15/03/09", "2018/02/15/03/08"));
+        assertFalse(ResourceQueueUtils.isSafeToDelete("2018/02/15/03/09", "2018/02/15/03/09"));
+        assertFalse(ResourceQueueUtils.isSafeToDelete("2018/02/15/03/09", "2018/02/15/03/10"));
+
+        assertFalse(ResourceQueueUtils.isSafeToDelete("2018/02/15/03/09", "2018/02/15/03/10"));
+        assertFalse(ResourceQueueUtils.isSafeToDelete("2018/02/15/03/09", "2018/03/16/04/09"));
+
+
+
+
+        // new year
+        assertTrue(ResourceQueueUtils.isSafeToDelete("2018/12/31/23/59", "2017"));
+        assertFalse(ResourceQueueUtils.isSafeToDelete("2018/12/31/23/59", "2018"));
+        assertFalse(ResourceQueueUtils.isSafeToDelete("2018/12/31/23/59", "2019"));
+
+
+        assertTrue(ResourceQueueUtils.isSafeToDelete("2018/12/31/23/59", "2018/11"));
+        assertFalse(ResourceQueueUtils.isSafeToDelete("2018/12/31/23/59", "2018/12"));
+        assertFalse(ResourceQueueUtils.isSafeToDelete("2018/12/31/23/59", "2019/01"));
+
+
+        assertTrue(ResourceQueueUtils.isSafeToDelete("2018/12/31/23/59", "2018/12/30"));
+        assertFalse(ResourceQueueUtils.isSafeToDelete("2018/12/31/23/59", "2018/12/31"));
+        assertFalse(ResourceQueueUtils.isSafeToDelete("2018/12/31/23/59", "2019/01/00"));
+
+        assertTrue(ResourceQueueUtils.isSafeToDelete("2018/12/31/23/59", "2018/12/31/22"));
+        assertFalse(ResourceQueueUtils.isSafeToDelete("2018/12/31/23/59", "2018/12/31/23"));
+        assertFalse(ResourceQueueUtils.isSafeToDelete("2018/12/31/23/59", "2019/01/01/00"));
+
+        assertTrue(ResourceQueueUtils.isSafeToDelete("2018/12/31/23/59", "2018/12/31/23/58"));
+        assertFalse(ResourceQueueUtils.isSafeToDelete("2018/12/31/23/59", "2018/12/31/23/59"));
+        assertFalse(ResourceQueueUtils.isSafeToDelete("2018/12/31/23/59", "2019/01/01/00/00"));
+
+    }
+}