SLING-7754: implementation of Resource Based Queues
diff --git a/pom.xml b/pom.xml
index 6bd547f..8f62adf 100644
--- a/pom.xml
+++ b/pom.xml
@@ -48,6 +48,10 @@
<tag>HEAD</tag>
</scm>
+ <properties>
+ <exam.version>4.11.0</exam.version>
+ </properties>
+
<!-- ======================================================================= -->
<!-- B U I L D -->
<!-- ======================================================================= -->
@@ -94,6 +98,9 @@
<artifactId>maven-surefire-plugin</artifactId>
<configuration>
<redirectTestOutputToFile>true</redirectTestOutputToFile>
+ <excludes>
+ <exclude>**/*IT.java</exclude>
+ </excludes>
</configuration>
</plugin>
<plugin>
@@ -112,6 +119,35 @@
</excludes>
</configuration>
</plugin>
+
+ <plugin>
+ <groupId>org.apache.servicemix.tooling</groupId>
+ <artifactId>depends-maven-plugin</artifactId>
+ <version>1.4.0</version>
+ <executions>
+ <execution>
+ <goals>
+ <goal>generate-depends-file</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-failsafe-plugin</artifactId>
+ <configuration>
+ <redirectTestOutputToFile>true</redirectTestOutputToFile>
+ <systemProperties>
+ <property>
+ <name>bundle.filename</name>
+ <value>${basedir}/target/${project.build.finalName}.jar</value>
+ </property>
+ </systemProperties>
+ <includes>
+ <include>**/*IT.java</include>
+ </includes>
+ </configuration>
+ </plugin>
</plugins>
</build>
@@ -310,6 +346,45 @@
<scope>provided</scope>
</dependency>
+
+ <!-- Pax Exam -->
+ <dependency>
+ <groupId>org.apache.sling</groupId>
+ <artifactId>org.apache.sling.testing.paxexam</artifactId>
+ <version>2.0.0</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.ops4j.pax.exam</groupId>
+ <artifactId>pax-exam</artifactId>
+ <version>${exam.version}</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.ops4j.pax.exam</groupId>
+ <artifactId>pax-exam-cm</artifactId>
+ <version>${exam.version}</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.ops4j.pax.exam</groupId>
+ <artifactId>pax-exam-container-forked</artifactId>
+ <version>${exam.version}</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.ops4j.pax.exam</groupId>
+ <artifactId>pax-exam-junit4</artifactId>
+ <version>${exam.version}</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.ops4j.pax.exam</groupId>
+ <artifactId>pax-exam-link-mvn</artifactId>
+ <version>${exam.version}</version>
+ <scope>test</scope>
+ </dependency>
+
</dependencies>
<reporting>
<plugins>
diff --git a/src/main/java/org/apache/sling/distribution/agent/impl/QueueDistributionAgentFactory.java b/src/main/java/org/apache/sling/distribution/agent/impl/QueueDistributionAgentFactory.java
index 7f3c507..3d0ad64 100644
--- a/src/main/java/org/apache/sling/distribution/agent/impl/QueueDistributionAgentFactory.java
+++ b/src/main/java/org/apache/sling/distribution/agent/impl/QueueDistributionAgentFactory.java
@@ -46,9 +46,10 @@
import org.apache.sling.distribution.packaging.impl.exporter.LocalDistributionPackageExporter;
import org.apache.sling.distribution.queue.impl.DistributionQueueProvider;
import org.apache.sling.distribution.queue.impl.DistributionQueueDispatchingStrategy;
+import org.apache.sling.distribution.queue.impl.DistributionQueueProviderFactory;
import org.apache.sling.distribution.queue.impl.PriorityQueueDispatchingStrategy;
import org.apache.sling.distribution.queue.impl.SingleQueueDispatchingStrategy;
-import org.apache.sling.distribution.queue.impl.jobhandling.JobHandlingDistributionQueueProvider;
+import org.apache.sling.distribution.queue.impl.resource.ResourceQueueProvider;
import org.apache.sling.distribution.trigger.DistributionTrigger;
import org.apache.sling.event.jobs.JobManager;
import org.apache.sling.jcr.api.SlingRepository;
@@ -111,6 +112,12 @@
private DistributionRequestAuthorizationStrategy requestAuthorizationStrategy;
+ @Property(name = "queueProviderFactory.target", label = "Queue Provider Factory", description = "The target reference for the DistributionQueueProviderFactory used to build queues," +
+ "e.g. use target=(name=...) to bind to services by name.", value = "(name=jobQueue)")
+ @Reference(name = "queueProviderFactory")
+ private DistributionQueueProviderFactory queueProviderFactory;
+
+
@Property(name = "packageBuilder.target", label = "Package Builder", description = "The target reference for the DistributionPackageBuilder used to create distribution packages, " +
"e.g. use target=(name=...) to bind to services by name.", value = SettingsUtils.COMPONENT_NAME_DEFAULT)
@Reference(name = "packageBuilder")
@@ -135,14 +142,13 @@
private SlingSettingsService settingsService;
@Reference
- private JobManager jobManager;
-
- @Reference
private ResourceResolverFactory resourceResolverFactory;
@Reference
private SlingRepository slingRepository;
+ DistributionQueueProvider queueProvider;
+
public QueueDistributionAgentFactory() {
super(QueueDistributionAgentMBean.class);
}
@@ -164,6 +170,8 @@
@Deactivate
protected void deactivate(BundleContext context) {
super.deactivate(context);
+
+ queueProviderFactory.releaseProvider(queueProvider);
}
@Override
@@ -177,8 +185,9 @@
Map<String, String> priorityQueues = PropertiesUtil.toMap(config.get(PRIORITY_QUEUES), new String[0]);
priorityQueues = SettingsUtils.removeEmptyEntries(priorityQueues);
+ queueProvider = queueProviderFactory.getProvider(agentName, serviceName);
- DistributionQueueProvider queueProvider = new MonitoringDistributionQueueProvider(new JobHandlingDistributionQueueProvider(agentName, jobManager, context), context);
+ MonitoringDistributionQueueProvider monitoringQueueProvider = new MonitoringDistributionQueueProvider(queueProvider, context);
DistributionQueueDispatchingStrategy exportQueueStrategy = null;
@@ -194,7 +203,7 @@
return new SimpleDistributionAgent(agentName, false, null,
serviceName, null, packageExporter, requestAuthorizationStrategy,
- queueProvider, exportQueueStrategy, null, distributionEventFactory, resourceResolverFactory, slingRepository,
+ monitoringQueueProvider, exportQueueStrategy, null, distributionEventFactory, resourceResolverFactory, slingRepository,
distributionLog, allowedRequests, allowedRoots, 0);
}
diff --git a/src/main/java/org/apache/sling/distribution/queue/impl/DistributionQueueProviderFactory.java b/src/main/java/org/apache/sling/distribution/queue/impl/DistributionQueueProviderFactory.java
new file mode 100644
index 0000000..ee84d5c
--- /dev/null
+++ b/src/main/java/org/apache/sling/distribution/queue/impl/DistributionQueueProviderFactory.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.sling.distribution.queue.impl;
+
+public interface DistributionQueueProviderFactory {
+
+ DistributionQueueProvider getProvider(String agentName, String serviceName);
+
+ void releaseProvider(DistributionQueueProvider queueProvider);
+
+}
diff --git a/src/main/java/org/apache/sling/distribution/queue/impl/jobhandling/JobHandlingQueueDistributionProviderFactory.java b/src/main/java/org/apache/sling/distribution/queue/impl/jobhandling/JobHandlingQueueDistributionProviderFactory.java
new file mode 100644
index 0000000..d6a450d
--- /dev/null
+++ b/src/main/java/org/apache/sling/distribution/queue/impl/jobhandling/JobHandlingQueueDistributionProviderFactory.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.sling.distribution.queue.impl.jobhandling;
+
+import org.apache.felix.scr.annotations.Activate;
+import org.apache.felix.scr.annotations.Component;
+import org.apache.felix.scr.annotations.Property;
+import org.apache.felix.scr.annotations.Reference;
+import org.apache.felix.scr.annotations.Service;
+import org.apache.sling.distribution.component.impl.DistributionComponentConstants;
+import org.apache.sling.distribution.queue.impl.DistributionQueueProvider;
+import org.apache.sling.distribution.queue.impl.DistributionQueueProviderFactory;
+import org.apache.sling.event.jobs.JobManager;
+import org.osgi.framework.BundleContext;
+
+import java.util.Map;
+
+@Component
+@Service(DistributionQueueProviderFactory.class)
+@Property(name = DistributionComponentConstants.PN_NAME, value = "jobQueue")
+public class JobHandlingQueueDistributionProviderFactory implements DistributionQueueProviderFactory {
+
+ @Reference
+ JobManager jobManager;
+
+ BundleContext context;
+
+
+ @Activate
+ protected void activate(BundleContext context, Map<String, Object> config)
+ {
+ this.context = context;
+ }
+
+ @Override
+ public DistributionQueueProvider getProvider(String agentName, String serviceName) {
+ return new JobHandlingDistributionQueueProvider(agentName, jobManager, context);
+ }
+
+ @Override
+ public void releaseProvider(DistributionQueueProvider queueProvider) {
+
+ }
+}
diff --git a/src/main/java/org/apache/sling/distribution/queue/impl/resource/ResourceIterator.java b/src/main/java/org/apache/sling/distribution/queue/impl/resource/ResourceIterator.java
new file mode 100644
index 0000000..4236d56
--- /dev/null
+++ b/src/main/java/org/apache/sling/distribution/queue/impl/resource/ResourceIterator.java
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.sling.distribution.queue.impl.resource;
+
+
+import org.apache.sling.api.resource.Resource;
+
+import java.util.Iterator;
+import java.util.Stack;
+
+public class ResourceIterator implements Iterator<Resource> {
+
+ private final String folderResourceType;
+ private final boolean includeFolders;
+ private final boolean includeLeafs;
+ private final boolean includeRoot = false;
+ private final String rootPath;
+
+ private Stack<Iterator<Resource>> folderIterators = new Stack<Iterator<Resource>>();
+
+ private Resource currentFolder;
+ private Iterator<Resource> currentIterator;
+
+ private Resource next;
+
+
+ public ResourceIterator(Resource root, String folderResourceType, boolean includeFolders, boolean includeLeafs) {
+ this.folderResourceType = folderResourceType;
+ this.includeFolders = includeFolders;
+ this.includeLeafs = includeLeafs;
+ this.rootPath = root.getPath();
+
+ currentFolder = root;
+ currentIterator = currentFolder.listChildren();
+ next = seek();
+ }
+
+ @Override
+ public boolean hasNext() {
+ return next != null;
+ }
+
+ @Override
+ public Resource next() {
+ Resource result = next;
+ next = seek();
+ return result;
+ }
+
+ @Override
+ public void remove() {
+ throw new UnsupportedOperationException();
+ }
+
+ Resource seek() {
+ Resource res;
+ while ((res = seekAll()) != null) {
+
+ if (rootPath.equals(res.getPath())) {
+ if (includeRoot) {
+ return res;
+ } else {
+ continue;
+ }
+ }
+
+ if (includeFolders && res.isResourceType(folderResourceType)) {
+ return res;
+ }
+
+ if (includeLeafs && !res.isResourceType(folderResourceType)) {
+ return res;
+ }
+ }
+
+ return null;
+ }
+
+ // depth first, post order (children before parents)
+ Resource seekAll() {
+
+ while (currentIterator != null) {
+ if (currentIterator.hasNext()) {
+ Resource res = currentIterator.next();
+
+ if (res.isResourceType(folderResourceType)) {
+ folderIterators.push(currentIterator);
+
+ currentFolder = res;
+ currentIterator = currentFolder.listChildren();
+ } else {
+ return res;
+ }
+ } else {
+ Resource folder = currentFolder;
+
+ currentFolder = currentFolder.getParent();
+ currentIterator = folderIterators.empty() ? null : folderIterators.pop();
+
+ return folder;
+ }
+ }
+
+ return null;
+ }
+}
diff --git a/src/main/java/org/apache/sling/distribution/queue/impl/resource/ResourceQueue.java b/src/main/java/org/apache/sling/distribution/queue/impl/resource/ResourceQueue.java
new file mode 100644
index 0000000..28162c3
--- /dev/null
+++ b/src/main/java/org/apache/sling/distribution/queue/impl/resource/ResourceQueue.java
@@ -0,0 +1,231 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.sling.distribution.queue.impl.resource;
+
+import org.apache.sling.api.resource.LoginException;
+import org.apache.sling.api.resource.PersistenceException;
+import org.apache.sling.api.resource.Resource;
+import org.apache.sling.api.resource.ResourceResolver;
+import org.apache.sling.api.resource.ResourceResolverFactory;
+import org.apache.sling.distribution.queue.DistributionQueueEntry;
+import org.apache.sling.distribution.queue.DistributionQueueItem;
+import org.apache.sling.distribution.queue.DistributionQueueState;
+import org.apache.sling.distribution.queue.DistributionQueueStatus;
+import org.apache.sling.distribution.queue.DistributionQueueType;
+import org.apache.sling.distribution.queue.spi.DistributionQueue;
+import org.apache.sling.distribution.util.impl.DistributionUtils;
+import org.jetbrains.annotations.NotNull;
+import org.jetbrains.annotations.Nullable;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.List;
+
+
+public class ResourceQueue implements DistributionQueue {
+ private final Logger log = LoggerFactory.getLogger(getClass());
+
+
+ private final ResourceResolverFactory resolverFactory;
+ private String serviceName;
+ private String queueName;
+ private final String queueRootPath;
+
+ public ResourceQueue(ResourceResolverFactory resolverFactory, String serviceName, String queueName, String rootPath) {
+ this.resolverFactory = resolverFactory;
+ this.serviceName = serviceName;
+ this.queueName = queueName;
+ this.queueRootPath = rootPath + "/" + queueName;
+ }
+
+ @NotNull
+ @Override
+ public String getName() {
+ return queueName;
+ }
+
+ @Nullable
+ @Override
+ public DistributionQueueEntry add(@NotNull DistributionQueueItem item) {
+
+ ResourceResolver resourceResolver = null;
+ try {
+ resourceResolver = DistributionUtils.loginService(resolverFactory, serviceName);
+
+ Resource queueRoot = ResourceQueueUtils.getRootResource(resourceResolver, queueRootPath);
+
+ Resource resource = ResourceQueueUtils.createResource(queueRoot, item);
+
+ DistributionQueueEntry entry = ResourceQueueUtils.readEntry(queueRoot, resource);
+
+ logEntry(entry, "add");
+
+ return entry;
+
+ } catch (LoginException e) {
+ throw new RuntimeException(e);
+ } catch (PersistenceException e) {
+ throw new RuntimeException(e);
+ } finally {
+ DistributionUtils.safelyLogout(resourceResolver);
+ }
+ }
+
+
+ @Override
+ public DistributionQueueEntry getHead() {
+ ResourceResolver resourceResolver = null;
+ try {
+ resourceResolver = DistributionUtils.loginService(resolverFactory, serviceName);
+ Resource queueRoot = ResourceQueueUtils.getRootResource(resourceResolver, queueRootPath);
+
+
+ DistributionQueueEntry head = ResourceQueueUtils.getHead(queueRoot);
+
+ logEntry(head, "getHead");
+
+ return head;
+
+ } catch (LoginException e) {
+ throw new RuntimeException(e);
+ } catch (PersistenceException e) {
+ throw new RuntimeException(e);
+ } finally {
+ DistributionUtils.safelyLogout(resourceResolver);
+ }
+ }
+
+ @NotNull
+ @Override
+ public Iterable<DistributionQueueEntry> getItems(int skip, int limit) {
+ ResourceResolver resourceResolver = null;
+ try {
+ resourceResolver = DistributionUtils.loginService(resolverFactory, serviceName);
+ Resource queueRoot = ResourceQueueUtils.getRootResource(resourceResolver, queueRootPath);
+
+ List<DistributionQueueEntry> entries = ResourceQueueUtils.getEntries(queueRoot, skip, limit);
+
+ log.debug("queue[{}] getItems entries={}", new Object[] { queueName, entries.size() });
+
+ return entries;
+ } catch (LoginException e) {
+ throw new RuntimeException(e);
+ } catch (PersistenceException e) {
+ throw new RuntimeException(e);
+ } finally {
+ DistributionUtils.safelyLogout(resourceResolver);
+ }
+
+ }
+
+ @Nullable
+ @Override
+ public DistributionQueueEntry getItem(@NotNull String itemId) {
+ ResourceResolver resourceResolver = null;
+ try {
+ resourceResolver = DistributionUtils.loginService(resolverFactory, serviceName);
+ Resource queueRoot = ResourceQueueUtils.getRootResource(resourceResolver, queueRootPath);
+
+ Resource itemResource = ResourceQueueUtils.getResourceById(queueRoot, itemId);
+
+ DistributionQueueEntry entry = ResourceQueueUtils.readEntry(queueRoot, itemResource);
+
+ logEntry(entry, "getItem");
+
+ return entry;
+
+ } catch (LoginException e) {
+ throw new RuntimeException(e);
+ } catch (PersistenceException e) {
+ throw new RuntimeException(e);
+ } finally {
+ DistributionUtils.safelyLogout(resourceResolver);
+ }
+ }
+
+ @Nullable
+ @Override
+ public DistributionQueueEntry remove(@NotNull String itemId) {
+ ResourceResolver resourceResolver = null;
+ try {
+ resourceResolver = DistributionUtils.loginService(resolverFactory, serviceName);
+ Resource queueRoot = ResourceQueueUtils.getRootResource(resourceResolver, queueRootPath);
+
+ Resource itemResource = ResourceQueueUtils.getResourceById(queueRoot, itemId);
+
+ DistributionQueueEntry entry = ResourceQueueUtils.readEntry(queueRoot, itemResource);
+
+ ResourceQueueUtils.deleteResource(itemResource);
+
+ logEntry(entry, "remove");
+
+ return entry;
+
+ } catch (LoginException e) {
+ throw new RuntimeException(e);
+ } catch (PersistenceException e) {
+ throw new RuntimeException(e);
+ } finally {
+ DistributionUtils.safelyLogout(resourceResolver);
+ }
+ }
+
+ @Nullable
+ @Override
+ public DistributionQueueStatus getStatus() {
+ ResourceResolver resourceResolver = null;
+ try {
+ resourceResolver = DistributionUtils.loginService(resolverFactory, serviceName);
+ Resource queueRoot = ResourceQueueUtils.getRootResource(resourceResolver, queueRootPath);
+
+ int count = ResourceQueueUtils.getResourceCount(queueRoot);
+
+ return new DistributionQueueStatus(count, DistributionQueueState.PASSIVE);
+ } catch (LoginException e) {
+ throw new RuntimeException(e);
+ } catch (PersistenceException e) {
+ throw new RuntimeException(e);
+ } finally {
+ DistributionUtils.safelyLogout(resourceResolver);
+ }
+ }
+
+ @NotNull
+ @Override
+ public DistributionQueueType getType() {
+ return DistributionQueueType.ORDERED;
+ }
+
+ void logEntry(DistributionQueueEntry entry, String scope) {
+ if (entry == null) {
+ log.debug("queue[{}] {} null entry", new Object[] { queueName, scope });
+ return;
+ }
+
+ if (entry.getItem() == null) {
+ log.debug("queue[{}] {} null item (should not happen)", new Object[] { queueName, scope });
+ return;
+ }
+
+ String entryId = entry.getId();
+ DistributionQueueItem item = entry.getItem();
+ log.debug("queue[{}] {} entryId={} packageId={}", new Object[] { queueName, scope, entryId, item.getPackageId() });
+ }
+}
diff --git a/src/main/java/org/apache/sling/distribution/queue/impl/resource/ResourceQueueCleanupTask.java b/src/main/java/org/apache/sling/distribution/queue/impl/resource/ResourceQueueCleanupTask.java
new file mode 100644
index 0000000..f41b16e
--- /dev/null
+++ b/src/main/java/org/apache/sling/distribution/queue/impl/resource/ResourceQueueCleanupTask.java
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.sling.distribution.queue.impl.resource;
+
+import org.apache.sling.api.resource.PersistenceException;
+import org.apache.sling.api.resource.Resource;
+import org.apache.sling.api.resource.ResourceResolver;
+import org.apache.sling.api.resource.ResourceResolverFactory;
+import org.apache.sling.distribution.util.impl.DistributionUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Calendar;
+import java.util.Iterator;
+
+
+public class ResourceQueueCleanupTask implements Runnable {
+ private final Logger log = LoggerFactory.getLogger(getClass());
+
+
+ private final ResourceResolverFactory resolverFactory;
+ private final String serviceName;
+ private final String rootPath;
+
+ public ResourceQueueCleanupTask(ResourceResolverFactory resolverFactory, String serviceName, String rootPath) {
+
+ this.resolverFactory = resolverFactory;
+ this.serviceName = serviceName;
+ this.rootPath = rootPath;
+ }
+
+ @Override
+ public void run() {
+ log.debug("Cleaning up resource queues at {}", rootPath);
+
+ ResourceResolver resourceResolver = null;
+ try {
+ resourceResolver = DistributionUtils.loginService(resolverFactory, serviceName);
+ Resource root = ResourceQueueUtils.getRootResource(resourceResolver, rootPath);
+
+ Iterator<Resource> it = root.listChildren();
+
+ while (it.hasNext()) {
+ Resource queueRoot = it.next();
+
+ log.debug("Starting cleaning up queue at {}", queueRoot.getPath());
+
+ removeEmptyFolders(queueRoot);
+
+ log.debug("Finished cleaning up queue at {}", queueRoot.getPath());
+ }
+ } catch (Throwable e) {
+ log.error("Error cleaning up resource queues", e);
+ } finally {
+ DistributionUtils.safelyLogout(resourceResolver);
+ }
+ }
+
+
+ public void removeEmptyFolders(Resource root) throws PersistenceException {
+ Calendar now = Calendar.getInstance();
+ now.add(Calendar.MINUTE, -5);
+
+ ResourceResolver resolver = root.getResourceResolver();
+ ResourceIterator it = new ResourceIterator(root, ResourceQueueUtils.RESOURCE_FOLDER, true, false);
+
+ String nowPath = ResourceQueueUtils.getTimePath(now);
+
+ while (it.hasNext()) {
+ Resource res = it.next();
+
+ String resPath = res.getPath().substring(root.getPath().length()+1);
+
+ if (!res.isResourceType(ResourceQueueUtils.RESOURCE_FOLDER)) {
+ continue;
+ }
+
+ // now 2018/03/15/03/48
+ // res 2018/02/15
+ if (!ResourceQueueUtils.isSafeToDelete(nowPath, resPath)) {
+ continue;
+ }
+
+ if (res.hasChildren()) {
+ continue;
+ }
+
+ log.debug("removing empty folder {}", res.getPath());
+
+ resolver.delete(res);
+ resolver.commit();
+ }
+
+ }
+}
diff --git a/src/main/java/org/apache/sling/distribution/queue/impl/resource/ResourceQueueProvider.java b/src/main/java/org/apache/sling/distribution/queue/impl/resource/ResourceQueueProvider.java
new file mode 100644
index 0000000..9c3c393
--- /dev/null
+++ b/src/main/java/org/apache/sling/distribution/queue/impl/resource/ResourceQueueProvider.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.sling.distribution.queue.impl.resource;
+
+import org.apache.sling.api.resource.ResourceResolverFactory;
+import org.apache.sling.commons.scheduler.Scheduler;
+import org.apache.sling.distribution.common.DistributionException;
+import org.apache.sling.distribution.queue.DistributionQueueType;
+import org.apache.sling.distribution.queue.impl.DistributionQueueProcessor;
+import org.apache.sling.distribution.queue.impl.DistributionQueueProvider;
+import org.apache.sling.distribution.queue.spi.DistributionQueue;
+
+import org.jetbrains.annotations.NotNull;
+import org.osgi.framework.BundleContext;
+import org.osgi.framework.ServiceRegistration;
+
+import java.util.Dictionary;
+import java.util.Hashtable;
+
+public class ResourceQueueProvider implements DistributionQueueProvider {
+
+ private final static String QUEUES_ROOT = "/var/sling/distribution/queues/";
+ private ResourceResolverFactory resolverFactory;
+ private String serviceName;
+ private String agentRootPath;
+
+ private ServiceRegistration cleanupTask;
+
+
+ public ResourceQueueProvider(BundleContext context, ResourceResolverFactory resolverFactory, String serviceName, String agentName) {
+ this.resolverFactory = resolverFactory;
+ this.serviceName = serviceName;
+ this.agentRootPath = QUEUES_ROOT + agentName;
+
+ register(context);
+ }
+
+ @NotNull
+ @Override
+ public DistributionQueue getQueue(@NotNull String queueName) throws DistributionException {
+ return new ResourceQueue(resolverFactory, serviceName, queueName, agentRootPath);
+ }
+
+ @NotNull
+ @Override
+ public DistributionQueue getQueue(@NotNull String queueName, @NotNull DistributionQueueType type) {
+ return new ResourceQueue(resolverFactory, serviceName, queueName, agentRootPath);
+ }
+
+ @Override
+ public void enableQueueProcessing(@NotNull DistributionQueueProcessor queueProcessor, String... queueNames) throws DistributionException {
+ // processing not supported
+ }
+
+ @Override
+ public void disableQueueProcessing() throws DistributionException {
+ // processing not supported
+ }
+
+
+ private void register(BundleContext context) {
+ Runnable cleanup = new ResourceQueueCleanupTask(resolverFactory, serviceName, agentRootPath);
+ Dictionary<String, Object> props = new Hashtable<String, Object>();
+ props.put(Scheduler.PROPERTY_SCHEDULER_CONCURRENT, false);
+ props.put(Scheduler.PROPERTY_SCHEDULER_PERIOD, 300L);
+ cleanupTask = context.registerService(Runnable.class.getName(), cleanup, props);
+ }
+
+ public void close() {
+ if (cleanupTask != null) {
+ cleanupTask.unregister();
+ cleanupTask = null;
+ }
+ }
+}
diff --git a/src/main/java/org/apache/sling/distribution/queue/impl/resource/ResourceQueueProviderFactory.java b/src/main/java/org/apache/sling/distribution/queue/impl/resource/ResourceQueueProviderFactory.java
new file mode 100644
index 0000000..ac0bafd
--- /dev/null
+++ b/src/main/java/org/apache/sling/distribution/queue/impl/resource/ResourceQueueProviderFactory.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.sling.distribution.queue.impl.resource;
+
+import org.apache.felix.scr.annotations.Activate;
+import org.apache.felix.scr.annotations.Component;
+import org.apache.felix.scr.annotations.Property;
+import org.apache.felix.scr.annotations.Reference;
+import org.apache.felix.scr.annotations.Service;
+import org.apache.sling.api.resource.ResourceResolverFactory;
+import org.apache.sling.distribution.component.impl.DistributionComponentConstants;
+import org.apache.sling.distribution.queue.impl.DistributionQueueProvider;
+import org.apache.sling.distribution.queue.impl.DistributionQueueProviderFactory;
+import org.osgi.framework.BundleContext;
+
+import java.util.Map;
+
+@Component
+@Service(DistributionQueueProviderFactory.class)
+@Property(name = DistributionComponentConstants.PN_NAME, value = "resourceQueue")
+public class ResourceQueueProviderFactory implements DistributionQueueProviderFactory {
+
+ @Reference
+ ResourceResolverFactory resourceResolverFactory;
+
+ BundleContext context;
+
+ @Activate
+ protected void activate(BundleContext context, Map<String, Object> config)
+ {
+ this.context = context;
+ }
+
+ @Override
+ public DistributionQueueProvider getProvider(String agentName, String serviceName) {
+ return new ResourceQueueProvider(context, resourceResolverFactory, serviceName, agentName);
+ }
+
+ @Override
+ public void releaseProvider(DistributionQueueProvider queueProvider) {
+ if (queueProvider instanceof ResourceQueueProvider) {
+ ((ResourceQueueProvider) queueProvider).close();
+ }
+ }
+}
\ No newline at end of file
diff --git a/src/main/java/org/apache/sling/distribution/queue/impl/resource/ResourceQueueUtils.java b/src/main/java/org/apache/sling/distribution/queue/impl/resource/ResourceQueueUtils.java
new file mode 100644
index 0000000..1408cb6
--- /dev/null
+++ b/src/main/java/org/apache/sling/distribution/queue/impl/resource/ResourceQueueUtils.java
@@ -0,0 +1,416 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.sling.distribution.queue.impl.resource;
+
+
+import org.apache.sling.api.resource.PersistenceException;
+import org.apache.sling.api.resource.Resource;
+import org.apache.sling.api.resource.ResourceResolver;
+import org.apache.sling.api.resource.ResourceUtil;
+import org.apache.sling.api.resource.ValueMap;
+import org.apache.sling.distribution.DistributionRequestType;
+import org.apache.sling.distribution.packaging.DistributionPackageInfo;
+import org.apache.sling.distribution.queue.DistributionQueueEntry;
+import org.apache.sling.distribution.queue.DistributionQueueItem;
+import org.apache.sling.distribution.queue.DistributionQueueItemState;
+import org.apache.sling.distribution.queue.DistributionQueueItemStatus;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+import javax.jcr.NodeIterator;
+import javax.jcr.RepositoryException;
+import javax.jcr.Session;
+import javax.jcr.query.Query;
+import javax.jcr.query.QueryManager;
+import javax.jcr.query.QueryResult;
+import java.text.SimpleDateFormat;
+import java.util.ArrayList;
+import java.util.Calendar;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.atomic.AtomicLong;
+
+public class ResourceQueueUtils {
+
+ // prefix for queue entry ids
+ private static final String ID_START = "distrq-";
+
+ // resource folder for queue roots
+ private static final String RESOURCE_ROOT = "sling:Folder";
+
+ // resource type for internal ordered folders
+ public static final String RESOURCE_FOLDER = "sling:OrderedFolder";
+
+ // resource type for internal entries
+ private static final String RESOURCE_ITEM = "nt:unstructured";
+
+ private static final String DISTRIBUTION_PACKAGE_PREFIX = "distribution.";
+ private static final String DISTRIBUTION_PACKAGE_ID = DISTRIBUTION_PACKAGE_PREFIX + "item.id";
+ private static final String DISTRIBUTION_PACKAGE_SIZE = DISTRIBUTION_PACKAGE_PREFIX + "package.size";
+
+
+ private static final AtomicLong itemCounter = new AtomicLong(0);
+ private static final Logger log = LoggerFactory.getLogger(ResourceQueueUtils.class);
+
+
+ private static Map<String, Object> serializeItem(DistributionQueueItem queueItem) {
+
+ Map<String, Object> properties = new HashMap<String, Object>();
+
+ for (String key : queueItem.keySet()) {
+ Object value = queueItem.get(key);
+
+ if (DistributionPackageInfo.PROPERTY_REQUEST_TYPE.equals(key)) {
+ if (value instanceof DistributionRequestType) {
+ value = ((DistributionRequestType) value).name();
+ }
+ }
+
+ if (value != null) {
+ properties.put(DISTRIBUTION_PACKAGE_PREFIX + key, value);
+ }
+ }
+
+ properties.put(DISTRIBUTION_PACKAGE_ID, queueItem.getPackageId());
+ properties.put(DISTRIBUTION_PACKAGE_SIZE, queueItem.getSize());
+
+ return properties;
+ }
+
+ private static DistributionQueueItem deserializeItem(ValueMap valueMap) {
+
+ String packageId = valueMap.get(DISTRIBUTION_PACKAGE_ID, String.class);
+ Long sizeProperty = valueMap.get(DISTRIBUTION_PACKAGE_SIZE, Long.class);
+ long size = sizeProperty == null ? -1 : sizeProperty;
+
+ Map<String, Object> properties = new HashMap<String, Object>();
+
+ for (String key : valueMap.keySet()) {
+ if (key.startsWith(DISTRIBUTION_PACKAGE_PREFIX)) {
+ String infoKey = key.substring(DISTRIBUTION_PACKAGE_PREFIX.length());
+ Object value = valueMap.get(key);
+
+ if (DistributionPackageInfo.PROPERTY_REQUEST_TYPE.equals(infoKey)) {
+ if (value instanceof String) {
+ value = DistributionRequestType.valueOf((String) value);
+ }
+ }
+
+ properties.put(infoKey, value);
+ }
+ }
+
+ DistributionQueueItem queueItem = new DistributionQueueItem(packageId, size, properties);
+ return queueItem;
+ }
+
+ static DistributionQueueEntry readEntry(Resource queueRoot, Resource resource) {
+
+ if (resource == null) {
+ return null;
+ }
+
+ if (!resource.getPath().startsWith(queueRoot.getPath() + "/")) {
+ return null;
+ }
+
+ if (!resource.isResourceType(RESOURCE_ITEM)) {
+ return null;
+ }
+
+ String queueName = queueRoot.getName();
+ DistributionQueueItem queueItem = deserializeItem(resource.getValueMap());
+ DistributionQueueItemStatus queueItemStatus = new DistributionQueueItemStatus(DistributionQueueItemState.QUEUED, queueName);
+
+ String entryId = getIdFromPath(queueRoot.getPath(), resource.getPath());
+
+ return new DistributionQueueEntry(entryId, queueItem, queueItemStatus);
+ }
+
+ static List<DistributionQueueEntry> getEntries(Resource queueRoot, int skip, int limit) {
+ Iterator<Resource> it = new ResourceIterator(queueRoot, RESOURCE_FOLDER, false, true);
+
+ List<DistributionQueueEntry> entries = new ArrayList<DistributionQueueEntry>();
+
+ int i = 0;
+ while (it.hasNext()) {
+ Resource resource = it.next();
+
+ if (i++ < skip) {
+ continue;
+ }
+
+ DistributionQueueEntry entry = readEntry(queueRoot, resource);
+ entries.add(entry);
+
+ if (entries.size() >= limit) {
+ break;
+ }
+ }
+
+ return entries;
+
+ }
+
+
+ static DistributionQueueEntry getHead(Resource root) {
+ Iterator<DistributionQueueEntry> it = getEntries(root, 0, 1).iterator();
+
+ if (it.hasNext()) {
+ return it.next();
+ }
+
+ return null;
+ }
+
+ public static Resource getRootResource(ResourceResolver resourceResolver, String rootPath) throws PersistenceException {
+ Resource resource = ResourceUtil.getOrCreateResource(resourceResolver, rootPath, RESOURCE_FOLDER, RESOURCE_ROOT, true);
+
+ return resource;
+ }
+
+ public static Resource getResourceById(Resource root, String entryId) {
+ String entryPath = getPathFromId(root.getPath(), entryId);
+ return root.getResourceResolver().getResource(entryPath);
+ }
+
+
+
+ public static Resource createResource(Resource root, DistributionQueueItem queueItem) throws PersistenceException {
+
+ Resource minuteResource = getOrCreateMinuteResource(root);
+
+ String entryPath = getUniqueEntryPath(minuteResource);
+
+ ResourceResolver resourceResolver = root.getResourceResolver();
+
+ Map<String, Object> properties = serializeItem(queueItem);
+
+ properties.put("sling:resourceType", RESOURCE_ITEM);
+ Resource resourceItem = ResourceUtil.getOrCreateResource(resourceResolver, entryPath, properties,
+ RESOURCE_FOLDER, true);
+
+ resourceResolver.commit();
+
+ return resourceItem;
+ }
+
+
+ /**
+ * Creates a minute resource by retrying several times. If it fails even the last time it will throw an exception.
+ */
+ private static Resource getOrCreateMinuteResource(Resource root) throws PersistenceException {
+
+ for (int i=0; i<2; i++) {
+ try {
+ return tryGetOrCreateMinutes(root);
+ } catch (PersistenceException e) {
+ log.warn("creating minute resource failed. retrying {} more times.", 5-i);
+ }
+
+ root.getResourceResolver().revert();
+ root.getResourceResolver().refresh();
+ }
+
+ return tryGetOrCreateMinutes(root);
+ }
+
+ /**
+ * Creates a set of resources for consecutive minutes.
+ * This ensures that consecutive minutes are created by a single thread, and that are created in order.
+ * This might fail due to concurrency issues and needs to be retried a couple of times.
+ */
+ private static Resource tryGetOrCreateMinutes(Resource root) throws PersistenceException {
+
+ ResourceResolver resourceResolver = root.getResourceResolver();
+ Calendar now = Calendar.getInstance();
+
+ String firstMinutePath = getTimePath(now);
+ Resource firstMinuteResource = resourceResolver.getResource(root, firstMinutePath);
+
+ if (firstMinuteResource != null) {
+ return firstMinuteResource;
+ }
+
+ for (int i=0; i < 3; i++) {
+ now.add(Calendar.MINUTE, i);
+ String newMinutePath = getTimePath(now);
+ Resource resource = createResource(root, newMinutePath);
+ }
+
+ resourceResolver.commit();
+
+ firstMinuteResource = resourceResolver.getResource(root, firstMinutePath);
+
+ return firstMinuteResource;
+ }
+
+ /*
+ * Creates a new resource at the specified path
+ * This is different than ResourceUtil.getOrCreateResource as it only creates the resource, it does not retrieve it.
+ * This ensures that consecutive minutes are always created atomically.
+ */
+ private static Resource createResource(Resource root, String relPath) throws PersistenceException {
+ ResourceResolver resourceResolver = root.getResourceResolver();
+
+ String path = root.getPath() + "/" + relPath;
+ final String parentPath = ResourceUtil.getParent(path);
+ final String name = ResourceUtil.getName(path);
+
+ Resource parent = ResourceUtil.getOrCreateResource(resourceResolver, parentPath, RESOURCE_FOLDER,
+ RESOURCE_FOLDER, false);
+
+ Map<String, Object> props = Collections.singletonMap(ResourceResolver.PROPERTY_RESOURCE_TYPE, (Object) RESOURCE_FOLDER);
+
+ return resourceResolver.create(parent, name, props);
+ }
+
+ public static void deleteResource(Resource resource) throws PersistenceException {
+ ResourceResolver resolver = resource.getResourceResolver();
+
+ String path = resource.getPath();
+
+ try {
+ resolver.delete(resource);
+ resolver.commit();
+ } catch (PersistenceException var10) {
+ resolver.revert();
+ resolver.refresh();
+ resource = resolver.getResource(path);
+ if (resource != null) {
+ resolver.delete(resource);
+ resolver.commit();
+ }
+ }
+ }
+
+
+ public static int getResourceCount(Resource root) {
+ ResourceResolver resolver = root.getResourceResolver();
+ Session session = resolver.adaptTo(Session.class);
+
+ StringBuilder buf = new StringBuilder();
+ buf.append("/jcr:root");
+ buf.append(root.getPath());
+ buf.append("//element(*,");
+ buf.append(RESOURCE_ITEM);
+ buf.append(")");
+
+ try {
+ QueryManager qManager = session.getWorkspace().getQueryManager();
+ Query q = qManager.createQuery(buf.toString(), "xpath");
+ final QueryResult res = q.execute();
+
+ NodeIterator it = res.getNodes();
+ return (int) it.getSize();
+ } catch (RepositoryException e) {
+ return -1;
+ }
+ }
+
+
+ private static String getUniqueEntryPath(Resource parent) {
+ final StringBuilder sb = new StringBuilder();
+ sb.append(parent.getPath());
+ sb.append('/');
+ sb.append(UUID.randomUUID().toString().replace("-", ""));
+ sb.append('_');
+ sb.append(itemCounter.getAndIncrement());
+
+ return sb.toString();
+ }
+
+ /**
+ * Transforms current time to path 2018/01/03/23/54
+ * @param now the current time
+ * @return the serialized time
+ */
+ public static String getTimePath(Calendar now) {
+ SimpleDateFormat sdf = new SimpleDateFormat("yyyy/MM/dd/HH/mm");
+
+ return sdf.format(now.getTime());
+ }
+
+
+ /**
+ * Checks if path is safe to delete at this time.
+ * A path is safe to delete if the nowPath does not overlap with it.
+ *
+ * @param nowPath represents a full path of current time (e.g. 2018/01/03/23/54)
+ * @param path the path to be checked (it can be a partial path e.g. 2018/01)
+ * @return true if checked path is in the past
+ */
+ public static boolean isSafeToDelete(String nowPath, String path) {
+
+ // should not happen
+ if (nowPath.length() < path.length()) {
+ return false;
+ }
+
+ nowPath = nowPath.substring(0, path.length());
+
+ return nowPath.compareTo(path) > 0;
+ }
+
+
+
+ private static String getPathFromId(String roothPath, String entryId) {
+ String entryPath = unescapeId(entryId);
+ return roothPath + "/" + entryPath;
+ }
+
+ private static String getIdFromPath(String rootPath, String path) {
+
+ if (path.startsWith(rootPath)) {
+ String entryPath = path.substring(rootPath.length()+1);
+
+ String entryId = escapeId(entryPath);
+
+ return entryId;
+ }
+ throw new IllegalArgumentException("entry path does not start with " + rootPath);
+ }
+
+
+ private static String escapeId(String jobId) {
+ //return id;
+ if (jobId == null) {
+ return null;
+ }
+ return ID_START + jobId.replace("/", "--");
+ }
+
+ public static String unescapeId(String itemId) {
+ if (itemId == null) {
+ return null;
+ }
+ if (!itemId.startsWith(ID_START)) {
+ return null;
+ }
+
+ return itemId.replace(ID_START, "").replace("--", "/");
+ }
+
+}
diff --git a/src/test/java/org/apache/sling/distribution/DistributionBaseIT.java b/src/test/java/org/apache/sling/distribution/DistributionBaseIT.java
new file mode 100644
index 0000000..707d3f8
--- /dev/null
+++ b/src/test/java/org/apache/sling/distribution/DistributionBaseIT.java
@@ -0,0 +1,152 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.sling.distribution;
+
+import org.apache.sling.api.resource.ResourceResolverFactory;
+import org.apache.sling.distribution.agent.impl.PrivilegeDistributionRequestAuthorizationStrategyFactory;
+import org.apache.sling.distribution.agent.impl.QueueDistributionAgentFactory;
+import org.apache.sling.distribution.agent.spi.DistributionAgent;
+import org.apache.sling.distribution.serialization.impl.vlt.VaultDistributionPackageBuilderFactory;
+import org.apache.sling.testing.paxexam.SlingOptions;
+import org.apache.sling.testing.paxexam.TestSupport;
+import org.ops4j.pax.exam.Configuration;
+import org.ops4j.pax.exam.CoreOptions;
+import org.ops4j.pax.exam.Option;
+import org.osgi.framework.BundleContext;
+import org.osgi.framework.Filter;
+import org.osgi.framework.InvalidSyntaxException;
+import org.osgi.framework.ServiceReference;
+
+import javax.inject.Inject;
+
+import java.io.File;
+
+import static org.apache.sling.testing.paxexam.SlingOptions.logback;
+import static org.apache.sling.testing.paxexam.SlingOptions.slingDistribution;
+import static org.apache.sling.testing.paxexam.SlingOptions.slingQuickstartOakTar;
+import static org.ops4j.pax.exam.CoreOptions.composite;
+import static org.ops4j.pax.exam.CoreOptions.junitBundles;
+import static org.ops4j.pax.exam.cm.ConfigurationAdminOptions.factoryConfiguration;
+import static org.ops4j.pax.exam.cm.ConfigurationAdminOptions.newConfiguration;
+
+public class DistributionBaseIT extends TestSupport {
+
+ protected static final String AGENT_RESOURCE_QUEUE = "agentResourceQueue";
+ protected static final String AGENT_JOB_QUEUE = "agentJobQueue";
+
+
+ @Inject
+ protected ResourceResolverFactory resolverFactory;
+
+ @Inject
+ protected BundleContext context;
+
+
+ @Configuration
+ public Option[] configuration() {
+ return new Option[]{
+ baseConfiguration(),
+ slingQuickstart(),
+ logback(),
+ // build artifact
+ slingDistribution(),
+ // testing
+ testBundle( "bundle.filename"),
+ defaultOsgiConfigs(),
+ junitBundles()
+ };
+ }
+
+ protected Option slingQuickstart() {
+ final String workingDirectory = workingDirectory(); // from TestSupport
+ final int httpPort = findFreePort(); // from TestSupport
+ return composite(
+ slingQuickstartOakTar(workingDirectory, httpPort) // from SlingOptions
+ );
+ }
+
+ public static Option defaultOsgiConfigs() {
+ return composite(
+ newConfiguration("org.apache.sling.jcr.resource.internal.JcrSystemUserValidator")
+ .put("allow.only.system.user", false).asOption(),
+
+ newConfiguration("org.apache.sling.jcr.base.internal.LoginAdminWhitelist")
+ .put("whitelist.bypass", "true").asOption(),
+
+ // For production the users would be: replication-service,content-writer-service
+ factoryConfiguration("org.apache.sling.serviceusermapping.impl.ServiceUserMapperImpl.amended")
+ .put("user.mapping", new String[]{"org.apache.sling.distribution.core:testService=admin"})
+ .asOption(),
+
+ factoryConfiguration(PrivilegeDistributionRequestAuthorizationStrategyFactory.class.getName())
+ .put("name", "default")
+ .put("jcrPrivilege", "jcr:read")
+ .asOption(),
+
+ factoryConfiguration(VaultDistributionPackageBuilderFactory.class.getName())
+ .put("name", "default")
+ .put("type", "jcrvlt")
+ .asOption(),
+
+ factoryConfiguration(QueueDistributionAgentFactory.class.getName())
+ .put("name", AGENT_RESOURCE_QUEUE)
+ .put("serviceName", "testService")
+ .put("enabled", true)
+ .put("queueProviderFactory.target", "(name=resourceQueue)")
+ .asOption(),
+
+ factoryConfiguration(QueueDistributionAgentFactory.class.getName())
+ .put("name", AGENT_JOB_QUEUE)
+ .put("serviceName", "testService")
+ .put("enabled", true)
+ .put("queueProviderFactory.target", "(name=jobQueue)")
+ .asOption()
+ );
+ }
+
+
+ protected DistributionAgent getAgent(String agentName) {
+
+ for (int i=0; i< 10; i++) {
+ try {
+ Filter filter = context.createFilter("(name="+agentName+")");
+
+ ServiceReference[] srs = this.context.getServiceReferences(DistributionAgent.class.getName(), filter.toString());
+ if (srs == null || srs.length == 0) {
+ Thread.sleep(1000);
+ continue;
+ }
+ ServiceReference sr = srs[0];
+
+ Object service = context.getService(sr);
+
+ return (DistributionAgent) service;
+ } catch (InvalidSyntaxException e) {
+
+ } catch (InterruptedException e) {
+ }
+
+
+ }
+
+ return null;
+ }
+
+}
diff --git a/src/test/java/org/apache/sling/distribution/queue/impl/resource/ResourceIteratorTest.java b/src/test/java/org/apache/sling/distribution/queue/impl/resource/ResourceIteratorTest.java
new file mode 100644
index 0000000..285ca3a
--- /dev/null
+++ b/src/test/java/org/apache/sling/distribution/queue/impl/resource/ResourceIteratorTest.java
@@ -0,0 +1,166 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.sling.distribution.queue.impl.resource;
+
+import org.apache.sling.api.resource.PersistenceException;
+import org.apache.sling.api.resource.Resource;
+import org.apache.sling.testing.mock.sling.junit.SlingContext;
+import org.apache.sling.testing.resourceresolver.MockHelper;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+import static org.junit.Assert.assertEquals;
+
+public class ResourceIteratorTest {
+
+ static final String FOLDER_TYPE = ResourceQueueUtils.RESOURCE_FOLDER;
+ static final String ITEM_TYPE = "nt:unstructured";
+
+ @Rule
+ public final SlingContext context = new SlingContext();
+
+ @Before
+ public void testSetup() {
+ MockHelper helper = MockHelper.create(context.resourceResolver());
+
+ helper.resource("/root").p("prop", "value")
+ .resource("2018").p("sling:resourceType", FOLDER_TYPE)
+ .resource("10").p("sling:resourceType", FOLDER_TYPE)
+ .resource("15").p("sling:resourceType", FOLDER_TYPE)
+ .resource("11").p("sling:resourceType", FOLDER_TYPE)
+ .resource("35").p("sling:resourceType", FOLDER_TYPE)
+ .resource("item1").p("sling:resourceType", ITEM_TYPE)
+ .resource("../item2").p("sling:resourceType", ITEM_TYPE)
+ .resource("../../36").p("sling:resourceType", FOLDER_TYPE)
+ .resource("item3").p("sling:resourceType", ITEM_TYPE);
+
+ helper.resource("/root2").p("prop", "value")
+ .resource("2018").p("sling:resourceType", FOLDER_TYPE)
+ .resource("10").p("sling:resourceType", FOLDER_TYPE)
+ .resource("15").p("sling:resourceType", FOLDER_TYPE)
+ .resource("11").p("sling:resourceType", FOLDER_TYPE)
+ .resource("35").p("sling:resourceType", FOLDER_TYPE)
+ .resource("../36").p("sling:resourceType", FOLDER_TYPE)
+ .resource("item3").p("sling:resourceType", ITEM_TYPE);
+
+ try {
+ helper.commit();
+ } catch (PersistenceException e) {
+
+ }
+
+ }
+
+ @Test
+ public void testEmptyRoot() {
+ Resource root = context.resourceResolver().getResource("/root");
+
+ ResourceIterator it = new ResourceIterator(root, ResourceQueueUtils.RESOURCE_FOLDER, false, false);
+
+
+ test(it, new String[0]);
+ }
+
+ @Test
+ public void testFullPath() {
+ Resource root = context.resourceResolver().getResource("/root");
+
+ ResourceIterator it = new ResourceIterator(root, ResourceQueueUtils.RESOURCE_FOLDER, true, true);
+
+ test(it, new String[] {
+ "/root/2018/10/15/11/35/item1",
+ "/root/2018/10/15/11/35/item2",
+ "/root/2018/10/15/11/35",
+ "/root/2018/10/15/11/36/item3",
+ "/root/2018/10/15/11/36",
+ "/root/2018/10/15/11",
+ "/root/2018/10/15",
+ "/root/2018/10",
+ "/root/2018"
+ });
+ }
+
+ @Test
+ public void testFullPath2() {
+ Resource root = context.resourceResolver().getResource("/root2");
+
+ ResourceIterator it = new ResourceIterator(root, ResourceQueueUtils.RESOURCE_FOLDER, true, true);
+
+ test(it, new String[] {
+ "/root2/2018/10/15/11/35",
+ "/root2/2018/10/15/11/36/item3",
+ "/root2/2018/10/15/11/36",
+ "/root2/2018/10/15/11",
+ "/root2/2018/10/15",
+ "/root2/2018/10",
+ "/root2/2018"
+ });
+ }
+
+ @Test
+ public void testFullPathNoFolder() {
+ Resource root = context.resourceResolver().getResource("/root");
+
+ ResourceIterator it = new ResourceIterator(root, ResourceQueueUtils.RESOURCE_FOLDER, false, true);
+
+
+ test(it, new String[] {
+ "/root/2018/10/15/11/35/item1",
+ "/root/2018/10/15/11/35/item2",
+ "/root/2018/10/15/11/36/item3",
+ });
+ }
+
+ @Test
+ public void testFullPathNoLeafs() {
+ Resource root = context.resourceResolver().getResource("/root");
+
+ ResourceIterator it = new ResourceIterator(root, ResourceQueueUtils.RESOURCE_FOLDER, true, false);
+
+ test(it, new String[] {
+ "/root/2018/10/15/11/35",
+ "/root/2018/10/15/11/36",
+ "/root/2018/10/15/11",
+ "/root/2018/10/15",
+ "/root/2018/10",
+ "/root/2018"
+ });
+ }
+
+ public void test(ResourceIterator it, String[] paths) {
+ List<String> expected = Arrays.asList(paths);
+
+ assertEquals(expected, toPaths(it));
+ }
+
+
+ List<String> toPaths(ResourceIterator it) {
+ List<String> paths = new ArrayList<String>();
+ while (it.hasNext()) {
+ paths.add(it.next().getPath());
+ }
+ return paths;
+ }
+}
diff --git a/src/test/java/org/apache/sling/distribution/queue/impl/resource/ResourceQueueIT.java b/src/test/java/org/apache/sling/distribution/queue/impl/resource/ResourceQueueIT.java
new file mode 100644
index 0000000..b599aa5
--- /dev/null
+++ b/src/test/java/org/apache/sling/distribution/queue/impl/resource/ResourceQueueIT.java
@@ -0,0 +1,237 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.sling.distribution.queue.impl.resource;
+
+import org.apache.sling.api.resource.LoginException;
+import org.apache.sling.api.resource.ResourceResolver;
+import org.apache.sling.distribution.DistributionRequestType;
+import org.apache.sling.distribution.DistributionResponse;
+import org.apache.sling.distribution.Distributor;
+import org.apache.sling.distribution.SimpleDistributionRequest;
+import org.apache.sling.distribution.agent.spi.DistributionAgent;
+import org.apache.sling.distribution.common.DistributionException;
+import org.apache.sling.distribution.queue.DistributionQueueEntry;
+import org.apache.sling.distribution.queue.DistributionQueueItem;
+import org.apache.sling.distribution.DistributionBaseIT;
+import org.apache.sling.distribution.queue.spi.DistributionQueue;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.ops4j.pax.exam.junit.PaxExam;
+
+import javax.inject.Inject;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+
+@RunWith(PaxExam.class)
+public class ResourceQueueIT extends DistributionBaseIT {
+
+ static final int ITERATIONS = 1000;
+ static final String QUEUE_NAME = "default";
+
+ @Inject
+ protected Distributor distributor;
+
+
+ @Test
+ public void testExecute() throws LoginException, DistributionException {
+
+ DistributionAgent agent = getAgent(AGENT_RESOURCE_QUEUE);
+
+ assertNotNull(agent);
+
+ ResourceResolver resourceResolver = resolverFactory.getAdministrativeResourceResolver(null);
+
+ DistributionResponse response = agent.execute(resourceResolver,
+ new SimpleDistributionRequest(DistributionRequestType.ADD, "/content" ));
+ assertTrue(response.isSuccessful());
+
+ DistributionQueue queue = agent.getQueue(QUEUE_NAME);
+
+ assertEquals(1, queue.getStatus().getItemsCount());
+
+ clear(queue);
+
+ }
+
+
+ @Test
+ public void testFifo() {
+ DistributionAgent agent = getAgent(AGENT_RESOURCE_QUEUE);
+
+ DistributionQueue queue = agent.getQueue(QUEUE_NAME);
+
+ queue.add(new DistributionQueueItem("packageId", 10, new HashMap<String, Object>()));
+
+ assertEquals(1, queue.getStatus().getItemsCount());
+
+ clear(queue);
+ }
+
+ @Test
+ public void testConcurrentFifo () throws InterruptedException {
+ final DistributionAgent agent = getAgent(AGENT_RESOURCE_QUEUE);
+ final DistributionQueue queue = agent.getQueue(QUEUE_NAME);
+
+ Producer p1 = new Producer(queue, "p1");
+ Producer p2 = new Producer(queue, "p2");
+ Producer p3 = new Producer(queue, "p3");
+ Producer p4 = new Producer(queue, "p4");
+
+ Map<String, Producer> producerMap = new HashMap<String, Producer>();
+ producerMap.put("p1", p1);
+ producerMap.put("p2", p2);
+ producerMap.put("p3", p3);
+ producerMap.put("p4", p4);
+
+ Consumer c1 = new Consumer(queue, "c1", new String[] { "p1", "p2" });
+ Consumer c2 = new Consumer(queue, "c2", new String[] { "p3", "p4" });
+
+ Map<String, Consumer> consumerMap = new HashMap<String, Consumer>();
+ consumerMap.put("c1", c1);
+ consumerMap.put("c2", c2);
+
+ List<Thread> threads = new ArrayList<Thread>();
+ for (Producer p : producerMap.values()) {
+ threads.add(new Thread(p));
+ }
+ for (Consumer c : consumerMap.values()) {
+ threads.add(new Thread(c));
+ }
+
+ for (Thread t : threads) {
+ t.start();
+ }
+
+ for (Thread t : threads) {
+ t.join(600 * 1000);
+ }
+
+ for (Producer p : producerMap.values()) {
+ assertEquals(ITERATIONS, p.created.size());
+ }
+
+ for (Consumer c : consumerMap.values()) {
+ for (String source : c.sources) {
+ Producer p = producerMap.get(source);
+ assertEquals(p.created, c.removedBySource.get(source));
+ }
+ }
+
+ assertEquals(0, queue.getStatus().getItemsCount());
+
+ clear(queue);
+ }
+
+ void clear(DistributionQueue queue) {
+ for (DistributionQueueEntry entry : queue.getItems(0, -1)) {
+ queue.remove(entry.getId());
+ }
+ }
+
+ class Producer implements Runnable {
+ private final DistributionQueue queue;
+ private final String name;
+ final List<String> created = Collections.synchronizedList(new ArrayList<String>());
+
+ Producer(DistributionQueue queue, String name) {
+ this.queue = queue;
+ this.name = name;
+ }
+
+
+ public void run() {
+ for(int i=0; i<ITERATIONS; i++) {
+
+ if (i % 1000 == 0) {
+ System.out.println("Producer " + name + " " + i);
+ }
+
+ final Map<String, Object> props = new HashMap<String, Object>();
+ final String packageId = UUID.randomUUID().toString();
+ final long size = 10;
+
+ props.put("auuid", UUID.randomUUID().toString());
+ props.put("no", i);
+ props.put("source", name);
+
+ final DistributionQueueItem item = new DistributionQueueItem(packageId, size, props);
+
+ final DistributionQueueEntry entry = queue.add(item);
+
+ created.add(entry.getId());
+ }
+ }
+ }
+
+ class Consumer implements Runnable {
+ private final DistributionQueue queue;
+ private final String name;
+ private final String[] sources;
+
+ final List<String> removed = new ArrayList<String>();
+
+ final Map<String, List<String>> removedBySource = new HashMap<String, List<String>>();
+
+
+ Consumer(DistributionQueue queue, String name, String[] sources) {
+
+ this.queue = queue;
+ this.name = name;
+ this.sources = sources;
+ for (String source: sources) {
+ removedBySource.put(source, new ArrayList<String>());
+ }
+ }
+
+ public void run() {
+ while (removed.size() < sources.length * ITERATIONS) {
+
+ final DistributionQueueEntry entry = queue.getHead();
+
+ if (entry != null) {
+
+ String source = (String) entry.getItem().get("source");
+
+ if (removedBySource.containsKey(source)) {
+ if (removed.size() % 1000 == 0) {
+ System.out.println("Consumer " + name + " " + removed.size());
+ }
+
+ final DistributionQueueEntry removedEntry = queue.remove(entry.getId());
+
+ assertEquals(removedEntry.getId(), entry.getId());
+
+ removed.add(entry.getId());
+ removedBySource.get(source).add(entry.getId());
+ }
+ }
+ }
+ }
+ }
+}
diff --git a/src/test/java/org/apache/sling/distribution/queue/impl/resource/ResourceQueueUtilsTest.java b/src/test/java/org/apache/sling/distribution/queue/impl/resource/ResourceQueueUtilsTest.java
new file mode 100644
index 0000000..af04a81
--- /dev/null
+++ b/src/test/java/org/apache/sling/distribution/queue/impl/resource/ResourceQueueUtilsTest.java
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sling.distribution.queue.impl.resource;
+
+import org.junit.Test;
+
+import java.util.GregorianCalendar;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+public class ResourceQueueUtilsTest {
+
+ @Test
+ public void testTimePath() throws Exception {
+
+
+ assertEquals("2018/01/01/00/00",
+ ResourceQueueUtils.getTimePath(new GregorianCalendar(2018, 0, 1, 0, 0)));
+
+
+ assertEquals("2018/12/01/00/00",
+ ResourceQueueUtils.getTimePath(new GregorianCalendar(2018, 11, 1, 0, 0)));
+
+ assertEquals("2018/12/31/23/59",
+ ResourceQueueUtils.getTimePath(new GregorianCalendar(2018, 11, 31, 23, 59)));
+ }
+
+
+ @Test
+ public void testIsSafeToDelete() throws Exception {
+
+ assertTrue(ResourceQueueUtils.isSafeToDelete("2018/02/15/03/09", "2017"));
+ assertFalse(ResourceQueueUtils.isSafeToDelete("2018/02/15/03/09", "2018"));
+ assertFalse(ResourceQueueUtils.isSafeToDelete("2018/02/15/03/09", "2019"));
+
+ assertTrue(ResourceQueueUtils.isSafeToDelete("2018/02/15/03/09", "2018/01"));
+ assertFalse(ResourceQueueUtils.isSafeToDelete("2018/02/15/03/09", "2018/02"));
+ assertFalse(ResourceQueueUtils.isSafeToDelete("2018/02/15/03/09", "2018/03"));
+
+ assertTrue(ResourceQueueUtils.isSafeToDelete("2018/02/15/03/09", "2018/02/14"));
+ assertFalse(ResourceQueueUtils.isSafeToDelete("2018/02/15/03/09", "2018/02/15"));
+ assertFalse(ResourceQueueUtils.isSafeToDelete("2018/02/15/03/09", "2018/02/16"));
+
+ assertTrue(ResourceQueueUtils.isSafeToDelete("2018/02/15/03/09", "2018/02/15/02"));
+ assertFalse(ResourceQueueUtils.isSafeToDelete("2018/02/15/03/09", "2018/02/15/03"));
+ assertFalse(ResourceQueueUtils.isSafeToDelete("2018/02/15/03/09", "2018/02/15/04"));
+
+ assertTrue(ResourceQueueUtils.isSafeToDelete("2018/02/15/03/09", "2018/02/15/03/08"));
+ assertFalse(ResourceQueueUtils.isSafeToDelete("2018/02/15/03/09", "2018/02/15/03/09"));
+ assertFalse(ResourceQueueUtils.isSafeToDelete("2018/02/15/03/09", "2018/02/15/03/10"));
+
+ assertFalse(ResourceQueueUtils.isSafeToDelete("2018/02/15/03/09", "2018/02/15/03/10"));
+ assertFalse(ResourceQueueUtils.isSafeToDelete("2018/02/15/03/09", "2018/03/16/04/09"));
+
+
+
+
+ // new year
+ assertTrue(ResourceQueueUtils.isSafeToDelete("2018/12/31/23/59", "2017"));
+ assertFalse(ResourceQueueUtils.isSafeToDelete("2018/12/31/23/59", "2018"));
+ assertFalse(ResourceQueueUtils.isSafeToDelete("2018/12/31/23/59", "2019"));
+
+
+ assertTrue(ResourceQueueUtils.isSafeToDelete("2018/12/31/23/59", "2018/11"));
+ assertFalse(ResourceQueueUtils.isSafeToDelete("2018/12/31/23/59", "2018/12"));
+ assertFalse(ResourceQueueUtils.isSafeToDelete("2018/12/31/23/59", "2019/01"));
+
+
+ assertTrue(ResourceQueueUtils.isSafeToDelete("2018/12/31/23/59", "2018/12/30"));
+ assertFalse(ResourceQueueUtils.isSafeToDelete("2018/12/31/23/59", "2018/12/31"));
+ assertFalse(ResourceQueueUtils.isSafeToDelete("2018/12/31/23/59", "2019/01/00"));
+
+ assertTrue(ResourceQueueUtils.isSafeToDelete("2018/12/31/23/59", "2018/12/31/22"));
+ assertFalse(ResourceQueueUtils.isSafeToDelete("2018/12/31/23/59", "2018/12/31/23"));
+ assertFalse(ResourceQueueUtils.isSafeToDelete("2018/12/31/23/59", "2019/01/01/00"));
+
+ assertTrue(ResourceQueueUtils.isSafeToDelete("2018/12/31/23/59", "2018/12/31/23/58"));
+ assertFalse(ResourceQueueUtils.isSafeToDelete("2018/12/31/23/59", "2018/12/31/23/59"));
+ assertFalse(ResourceQueueUtils.isSafeToDelete("2018/12/31/23/59", "2019/01/01/00/00"));
+
+ }
+}