SLING-8086 - Extend distribution queue SPI with the ability to clear and batch remove items
diff --git a/src/main/java/org/apache/sling/distribution/queue/impl/DistributionQueueWrapper.java b/src/main/java/org/apache/sling/distribution/queue/impl/DistributionQueueWrapper.java
index cc9fc45..9c1558f 100644
--- a/src/main/java/org/apache/sling/distribution/queue/impl/DistributionQueueWrapper.java
+++ b/src/main/java/org/apache/sling/distribution/queue/impl/DistributionQueueWrapper.java
@@ -20,15 +20,17 @@
import java.util.ArrayList;
import java.util.List;
+import java.util.Set;
import org.apache.sling.distribution.queue.spi.Clearable;
import org.apache.sling.distribution.queue.spi.DistributionQueue;
import org.apache.sling.distribution.queue.DistributionQueueEntry;
import org.apache.sling.distribution.queue.DistributionQueueItem;
import org.apache.sling.distribution.queue.DistributionQueueStatus;
+import org.apache.sling.distribution.queue.spi.Removable;
import org.jetbrains.annotations.NotNull;
-public abstract class DistributionQueueWrapper implements DistributionQueue, Clearable {
+public abstract class DistributionQueueWrapper implements DistributionQueue, Clearable, Removable {
final DistributionQueue wrappedQueue;
DistributionQueueWrapper(DistributionQueue wrappedQueue) {
@@ -76,6 +78,19 @@
@NotNull
@Override
+ public Iterable<DistributionQueueEntry> remove(@NotNull Set<String> entryIds) {
+ List<DistributionQueueEntry> removed = new ArrayList<DistributionQueueEntry>();
+ for (String entryId : entryIds) {
+ DistributionQueueEntry entry = remove(entryId);
+ if (entry != null) {
+ removed.add(entry);
+ }
+ }
+ return removed;
+ }
+
+ @NotNull
+ @Override
public Iterable<DistributionQueueEntry> clear(int limit) {
final List<DistributionQueueEntry> removedEntries = new ArrayList<DistributionQueueEntry>();
for (DistributionQueueEntry entry : getItems(0, limit)) {
diff --git a/src/main/java/org/apache/sling/distribution/queue/impl/jobhandling/JobHandlingDistributionQueue.java b/src/main/java/org/apache/sling/distribution/queue/impl/jobhandling/JobHandlingDistributionQueue.java
index 39764d3..7bedfac 100644
--- a/src/main/java/org/apache/sling/distribution/queue/impl/jobhandling/JobHandlingDistributionQueue.java
+++ b/src/main/java/org/apache/sling/distribution/queue/impl/jobhandling/JobHandlingDistributionQueue.java
@@ -22,6 +22,7 @@
import java.util.Collection;
import java.util.List;
import java.util.Map;
+import java.util.Set;
import org.apache.sling.distribution.queue.spi.Clearable;
import org.apache.sling.distribution.queue.spi.DistributionQueue;
@@ -32,6 +33,7 @@
import org.apache.sling.distribution.queue.DistributionQueueStatus;
import org.apache.sling.distribution.queue.DistributionQueueType;
import org.apache.sling.distribution.queue.impl.DistributionQueueUtils;
+import org.apache.sling.distribution.queue.spi.Removable;
import org.apache.sling.event.jobs.Job;
import org.apache.sling.event.jobs.JobManager;
import org.apache.sling.event.jobs.JobManager.QueryType;
@@ -42,7 +44,7 @@
/**
* a {@link DistributionQueue} based on Sling Job Handling facilities
*/
-public class JobHandlingDistributionQueue implements DistributionQueue, Clearable {
+public class JobHandlingDistributionQueue implements DistributionQueue, Clearable, Removable {
public final static String DISTRIBUTION_QUEUE_TOPIC = "org/apache/sling/distribution/queue";
@@ -164,6 +166,19 @@
return null;
}
+ @NotNull
+ @Override
+ public Iterable<DistributionQueueEntry> remove(@NotNull Set<String> entryIds) {
+ List<DistributionQueueEntry> removed = new ArrayList<DistributionQueueEntry>();
+ for (String entryId : entryIds) {
+ DistributionQueueEntry entry = remove(entryId);
+ if (entry != null) {
+ removed.add(entry);
+ }
+ }
+ return removed;
+ }
+
public DistributionQueueEntry remove(@NotNull String id) {
boolean removed = false;
Job job = getJob(id);
diff --git a/src/main/java/org/apache/sling/distribution/queue/impl/resource/ResourceQueue.java b/src/main/java/org/apache/sling/distribution/queue/impl/resource/ResourceQueue.java
index 1f6a94e..980167e 100644
--- a/src/main/java/org/apache/sling/distribution/queue/impl/resource/ResourceQueue.java
+++ b/src/main/java/org/apache/sling/distribution/queue/impl/resource/ResourceQueue.java
@@ -31,6 +31,7 @@
import org.apache.sling.distribution.queue.DistributionQueueType;
import org.apache.sling.distribution.queue.spi.Clearable;
import org.apache.sling.distribution.queue.spi.DistributionQueue;
+import org.apache.sling.distribution.queue.spi.Removable;
import org.apache.sling.distribution.util.impl.DistributionUtils;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
@@ -39,9 +40,10 @@
import java.util.ArrayList;
import java.util.List;
+import java.util.Set;
-public class ResourceQueue implements DistributionQueue, Clearable {
+public class ResourceQueue implements DistributionQueue, Clearable, Removable {
private final Logger log = LoggerFactory.getLogger(getClass());
@@ -162,6 +164,19 @@
}
}
+ @NotNull
+ @Override
+ public Iterable<DistributionQueueEntry> remove(@NotNull Set<String> entryIds) {
+ List<DistributionQueueEntry> removed = new ArrayList<DistributionQueueEntry>();
+ for (String entryId : entryIds) {
+ DistributionQueueEntry entry = remove(entryId);
+ if (entry != null) {
+ removed.add(entry);
+ }
+ }
+ return removed;
+ }
+
@Nullable
@Override
public DistributionQueueEntry remove(@NotNull String itemId) {
diff --git a/src/main/java/org/apache/sling/distribution/queue/impl/simple/SimpleDistributionQueue.java b/src/main/java/org/apache/sling/distribution/queue/impl/simple/SimpleDistributionQueue.java
index 27eec29..462caa4 100644
--- a/src/main/java/org/apache/sling/distribution/queue/impl/simple/SimpleDistributionQueue.java
+++ b/src/main/java/org/apache/sling/distribution/queue/impl/simple/SimpleDistributionQueue.java
@@ -23,6 +23,7 @@
import java.util.List;
import java.util.Map;
import java.util.Queue;
+import java.util.Set;
import java.util.WeakHashMap;
import java.util.concurrent.LinkedBlockingQueue;
@@ -36,6 +37,7 @@
import org.apache.sling.distribution.queue.DistributionQueueStatus;
import org.apache.sling.distribution.queue.DistributionQueueType;
import org.apache.sling.distribution.queue.impl.DistributionQueueUtils;
+import org.apache.sling.distribution.queue.spi.Removable;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
import org.slf4j.Logger;
@@ -50,7 +52,7 @@
* Note: potentially the Queue could contain the ordered package ids, with a sidecar map id->item;
* that way removal could be faster.
*/
-public class SimpleDistributionQueue implements DistributionQueue, Clearable {
+public class SimpleDistributionQueue implements DistributionQueue, Clearable, Removable {
private final Logger log = LoggerFactory.getLogger(getClass());
@@ -146,6 +148,18 @@
return null;
}
+ @NotNull
+ @Override
+ public Iterable<DistributionQueueEntry> remove(@NotNull Set<String> entryIds) {
+ List<DistributionQueueEntry> removed = new ArrayList<DistributionQueueEntry>();
+ for (String entryId : entryIds) {
+ DistributionQueueEntry entry = remove(entryId);
+ if (entry != null) {
+ removed.add(entry);
+ }
+ }
+ return removed;
+ }
@Nullable
public DistributionQueueEntry remove(@NotNull String id) {
diff --git a/src/main/java/org/apache/sling/distribution/queue/spi/Removable.java b/src/main/java/org/apache/sling/distribution/queue/spi/Removable.java
new file mode 100644
index 0000000..786691d
--- /dev/null
+++ b/src/main/java/org/apache/sling/distribution/queue/spi/Removable.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sling.distribution.queue.spi;
+
+import java.util.Set;
+
+import aQute.bnd.annotation.ConsumerType;
+import org.apache.sling.distribution.queue.DistributionQueueEntry;
+import org.jetbrains.annotations.NotNull;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Trait to be added to a {@link DistributionQueue} distribution
+ * queue that supports removing random entries in batch via the
+ * {@link Removable#remove} methods.
+ *
+ * @since 0.1.0
+ */
+@ConsumerType
+public interface Removable {
+
+ /**
+ * Remove a set entries from the queue by specifying their identifiers.
+ *
+ * @param entryIds The identifiers of the entries to be removed
+ * @return an iterable over the removed entries
+ */
+ @NotNull
+ Iterable<DistributionQueueEntry> remove(@NotNull Set<String> entryIds);
+
+ /**
+ * Remove an entry from the queue by specifying its identifier.
+ *
+ * @param entryId The identifier of the entry to be removed
+ * @return the removed entry or {@code null} if the entry
+ * could not be found
+ */
+ @Nullable
+ DistributionQueueEntry remove(@NotNull String entryId);
+}
diff --git a/src/main/java/org/apache/sling/distribution/servlet/DistributionAgentQueueServlet.java b/src/main/java/org/apache/sling/distribution/servlet/DistributionAgentQueueServlet.java
index 966657d..25d5c40 100644
--- a/src/main/java/org/apache/sling/distribution/servlet/DistributionAgentQueueServlet.java
+++ b/src/main/java/org/apache/sling/distribution/servlet/DistributionAgentQueueServlet.java
@@ -20,6 +20,9 @@
import javax.servlet.ServletException;
import java.io.IOException;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.Set;
import org.apache.felix.scr.annotations.Reference;
import org.apache.felix.scr.annotations.sling.SlingServlet;
@@ -36,6 +39,7 @@
import org.apache.sling.distribution.queue.spi.DistributionQueue;
import org.apache.sling.distribution.queue.DistributionQueueEntry;
import org.apache.sling.distribution.queue.DistributionQueueItem;
+import org.apache.sling.distribution.queue.spi.Removable;
import org.apache.sling.distribution.resources.DistributionResourceTypes;
import org.apache.sling.distribution.packaging.DistributionPackageBuilder;
import org.apache.sling.distribution.packaging.impl.DistributionPackageBuilderProvider;
@@ -72,7 +76,11 @@
String[] idParam = request.getParameterValues("id");
if (idParam != null) {
- deleteItems(resourceResolver, queue, idParam);
+ if (queue instanceof Removable) {
+ deleteItemsInBatch(resourceResolver, (Removable)queue, new HashSet<String>(Arrays.asList(idParam)), queue.getName());
+ } else {
+ deleteItems(resourceResolver, queue, idParam);
+ }
} else {
int limit = 1;
try {
@@ -81,7 +89,7 @@
log.warn("limit param malformed : "+limitParam, ex);
}
if (queue instanceof Clearable) {
- clearItems(resourceResolver, queue, limit);
+ clearItems(resourceResolver, (Clearable)queue, limit, queue.getName());
} else {
deleteItems(resourceResolver, queue, limit);
}
@@ -138,18 +146,26 @@
DistributionQueueItem item = entry.getItem();
String id = entry.getId();
queue.remove(id);
-
- DistributionPackage distributionPackage = getPackage(resourceResolver, item);
- DistributionPackageUtils.releaseOrDelete(distributionPackage, queue.getName());
+ releaseOrDeletePackage(resourceResolver, item, queue.getName());
}
- private void clearItems(ResourceResolver resourceResolver, DistributionQueue queue, int limit) {
- for (DistributionQueueEntry removed : ((Clearable)queue).clear(limit)) {
- DistributionPackage distributionPackage = getPackage(resourceResolver, removed.getItem());
- DistributionPackageUtils.releaseOrDelete(distributionPackage, queue.getName());
+ private void deleteItemsInBatch(ResourceResolver resourceResolver, Removable queue, Set<String> entryIds, String queueName) {
+ for (DistributionQueueEntry removed : queue.remove(entryIds)) {
+ releaseOrDeletePackage(resourceResolver, removed.getItem(), queueName);
}
}
+ private void clearItems(ResourceResolver resourceResolver, Clearable queue, int limit, String queueName) {
+ for (DistributionQueueEntry removed : queue.clear(limit)) {
+ releaseOrDeletePackage(resourceResolver, removed.getItem(), queueName);
+ }
+ }
+
+ private void releaseOrDeletePackage(ResourceResolver resourceResolver, DistributionQueueItem queueItem, String queueName) {
+ DistributionPackage distributionPackage = getPackage(resourceResolver, queueItem);
+ DistributionPackageUtils.releaseOrDelete(distributionPackage, queueName);
+ }
+
private DistributionPackage getPackage(ResourceResolver resourceResolver, DistributionQueueItem item) {
DistributionPackageInfo info = DistributionPackageUtils.fromQueueItem(item);
String type = info.getType();