SLING-8086 - Extend distribution queue SPI with the ability to clear and batch remove items
diff --git a/src/main/java/org/apache/sling/distribution/queue/impl/DistributionQueueWrapper.java b/src/main/java/org/apache/sling/distribution/queue/impl/DistributionQueueWrapper.java
index cc9fc45..9c1558f 100644
--- a/src/main/java/org/apache/sling/distribution/queue/impl/DistributionQueueWrapper.java
+++ b/src/main/java/org/apache/sling/distribution/queue/impl/DistributionQueueWrapper.java
@@ -20,15 +20,17 @@
 
 import java.util.ArrayList;
 import java.util.List;
+import java.util.Set;
 
 import org.apache.sling.distribution.queue.spi.Clearable;
 import org.apache.sling.distribution.queue.spi.DistributionQueue;
 import org.apache.sling.distribution.queue.DistributionQueueEntry;
 import org.apache.sling.distribution.queue.DistributionQueueItem;
 import org.apache.sling.distribution.queue.DistributionQueueStatus;
+import org.apache.sling.distribution.queue.spi.Removable;
 import org.jetbrains.annotations.NotNull;
 
-public abstract class DistributionQueueWrapper implements DistributionQueue, Clearable {
+public abstract class DistributionQueueWrapper implements DistributionQueue, Clearable, Removable {
     final DistributionQueue wrappedQueue;
 
     DistributionQueueWrapper(DistributionQueue wrappedQueue) {
@@ -76,6 +78,19 @@
 
     @NotNull
     @Override
+    public Iterable<DistributionQueueEntry> remove(@NotNull Set<String> entryIds) {
+        List<DistributionQueueEntry> removed = new ArrayList<DistributionQueueEntry>();
+        for (String entryId : entryIds) {
+            DistributionQueueEntry entry = remove(entryId);
+            if (entry != null) {
+                removed.add(entry);
+            }
+        }
+        return removed;
+    }
+
+    @NotNull
+    @Override
     public Iterable<DistributionQueueEntry> clear(int limit) {
         final List<DistributionQueueEntry> removedEntries = new ArrayList<DistributionQueueEntry>();
         for (DistributionQueueEntry entry : getItems(0, limit)) {
diff --git a/src/main/java/org/apache/sling/distribution/queue/impl/jobhandling/JobHandlingDistributionQueue.java b/src/main/java/org/apache/sling/distribution/queue/impl/jobhandling/JobHandlingDistributionQueue.java
index 39764d3..7bedfac 100644
--- a/src/main/java/org/apache/sling/distribution/queue/impl/jobhandling/JobHandlingDistributionQueue.java
+++ b/src/main/java/org/apache/sling/distribution/queue/impl/jobhandling/JobHandlingDistributionQueue.java
@@ -22,6 +22,7 @@
 import java.util.Collection;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 
 import org.apache.sling.distribution.queue.spi.Clearable;
 import org.apache.sling.distribution.queue.spi.DistributionQueue;
@@ -32,6 +33,7 @@
 import org.apache.sling.distribution.queue.DistributionQueueStatus;
 import org.apache.sling.distribution.queue.DistributionQueueType;
 import org.apache.sling.distribution.queue.impl.DistributionQueueUtils;
+import org.apache.sling.distribution.queue.spi.Removable;
 import org.apache.sling.event.jobs.Job;
 import org.apache.sling.event.jobs.JobManager;
 import org.apache.sling.event.jobs.JobManager.QueryType;
@@ -42,7 +44,7 @@
 /**
  * a {@link DistributionQueue} based on Sling Job Handling facilities
  */
-public class JobHandlingDistributionQueue implements DistributionQueue, Clearable {
+public class JobHandlingDistributionQueue implements DistributionQueue, Clearable, Removable {
 
     public final static String DISTRIBUTION_QUEUE_TOPIC = "org/apache/sling/distribution/queue";
 
@@ -164,6 +166,19 @@
         return null;
     }
 
+    @NotNull
+    @Override
+    public Iterable<DistributionQueueEntry> remove(@NotNull Set<String> entryIds) {
+        List<DistributionQueueEntry> removed = new ArrayList<DistributionQueueEntry>();
+        for (String entryId : entryIds) {
+            DistributionQueueEntry entry = remove(entryId);
+            if (entry != null) {
+                removed.add(entry);
+            }
+        }
+        return removed;
+    }
+
     public DistributionQueueEntry remove(@NotNull String id) {
         boolean removed = false;
         Job job = getJob(id);
diff --git a/src/main/java/org/apache/sling/distribution/queue/impl/resource/ResourceQueue.java b/src/main/java/org/apache/sling/distribution/queue/impl/resource/ResourceQueue.java
index 1f6a94e..980167e 100644
--- a/src/main/java/org/apache/sling/distribution/queue/impl/resource/ResourceQueue.java
+++ b/src/main/java/org/apache/sling/distribution/queue/impl/resource/ResourceQueue.java
@@ -31,6 +31,7 @@
 import org.apache.sling.distribution.queue.DistributionQueueType;
 import org.apache.sling.distribution.queue.spi.Clearable;
 import org.apache.sling.distribution.queue.spi.DistributionQueue;
+import org.apache.sling.distribution.queue.spi.Removable;
 import org.apache.sling.distribution.util.impl.DistributionUtils;
 import org.jetbrains.annotations.NotNull;
 import org.jetbrains.annotations.Nullable;
@@ -39,9 +40,10 @@
 
 import java.util.ArrayList;
 import java.util.List;
+import java.util.Set;
 
 
-public class ResourceQueue implements DistributionQueue, Clearable {
+public class ResourceQueue implements DistributionQueue, Clearable, Removable {
     private final Logger log = LoggerFactory.getLogger(getClass());
 
 
@@ -162,6 +164,19 @@
         }
     }
 
+    @NotNull
+    @Override
+    public Iterable<DistributionQueueEntry> remove(@NotNull Set<String> entryIds) {
+        List<DistributionQueueEntry> removed = new ArrayList<DistributionQueueEntry>();
+        for (String entryId : entryIds) {
+            DistributionQueueEntry entry = remove(entryId);
+            if (entry != null) {
+                removed.add(entry);
+            }
+        }
+        return removed;
+    }
+
     @Nullable
     @Override
     public DistributionQueueEntry remove(@NotNull String itemId) {
diff --git a/src/main/java/org/apache/sling/distribution/queue/impl/simple/SimpleDistributionQueue.java b/src/main/java/org/apache/sling/distribution/queue/impl/simple/SimpleDistributionQueue.java
index 27eec29..462caa4 100644
--- a/src/main/java/org/apache/sling/distribution/queue/impl/simple/SimpleDistributionQueue.java
+++ b/src/main/java/org/apache/sling/distribution/queue/impl/simple/SimpleDistributionQueue.java
@@ -23,6 +23,7 @@
 import java.util.List;
 import java.util.Map;
 import java.util.Queue;
+import java.util.Set;
 import java.util.WeakHashMap;
 import java.util.concurrent.LinkedBlockingQueue;
 
@@ -36,6 +37,7 @@
 import org.apache.sling.distribution.queue.DistributionQueueStatus;
 import org.apache.sling.distribution.queue.DistributionQueueType;
 import org.apache.sling.distribution.queue.impl.DistributionQueueUtils;
+import org.apache.sling.distribution.queue.spi.Removable;
 import org.jetbrains.annotations.NotNull;
 import org.jetbrains.annotations.Nullable;
 import org.slf4j.Logger;
@@ -50,7 +52,7 @@
  * Note: potentially the Queue could contain the ordered package ids, with a sidecar map id->item;
  * that way removal could be faster.
  */
-public class SimpleDistributionQueue implements DistributionQueue, Clearable {
+public class SimpleDistributionQueue implements DistributionQueue, Clearable, Removable {
 
     private final Logger log = LoggerFactory.getLogger(getClass());
 
@@ -146,6 +148,18 @@
         return null;
     }
 
+    @NotNull
+    @Override
+    public Iterable<DistributionQueueEntry> remove(@NotNull Set<String> entryIds) {
+        List<DistributionQueueEntry> removed = new ArrayList<DistributionQueueEntry>();
+        for (String entryId : entryIds) {
+            DistributionQueueEntry entry = remove(entryId);
+            if (entry != null) {
+                removed.add(entry);
+            }
+        }
+        return removed;
+    }
 
     @Nullable
     public DistributionQueueEntry remove(@NotNull String id) {
diff --git a/src/main/java/org/apache/sling/distribution/queue/spi/Removable.java b/src/main/java/org/apache/sling/distribution/queue/spi/Removable.java
new file mode 100644
index 0000000..786691d
--- /dev/null
+++ b/src/main/java/org/apache/sling/distribution/queue/spi/Removable.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sling.distribution.queue.spi;
+
+import java.util.Set;
+
+import aQute.bnd.annotation.ConsumerType;
+import org.apache.sling.distribution.queue.DistributionQueueEntry;
+import org.jetbrains.annotations.NotNull;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Trait to be added to a {@link DistributionQueue} distribution
+ * queue that supports removing random entries in batch via the
+ * {@link Removable#remove} methods.
+ *
+ * @since 0.1.0
+ */
+@ConsumerType
+public interface Removable {
+
+    /**
+     * Remove a set entries from the queue by specifying their identifiers.
+     *
+     * @param entryIds The identifiers of the entries to be removed
+     * @return an iterable over the removed entries
+     */
+    @NotNull
+    Iterable<DistributionQueueEntry> remove(@NotNull Set<String> entryIds);
+
+    /**
+     * Remove an entry from the queue by specifying its identifier.
+     *
+     * @param entryId The identifier of the entry to be removed
+     * @return the removed entry or {@code null} if the entry
+     *         could not be found
+     */
+    @Nullable
+    DistributionQueueEntry remove(@NotNull String entryId);
+}
diff --git a/src/main/java/org/apache/sling/distribution/servlet/DistributionAgentQueueServlet.java b/src/main/java/org/apache/sling/distribution/servlet/DistributionAgentQueueServlet.java
index 966657d..25d5c40 100644
--- a/src/main/java/org/apache/sling/distribution/servlet/DistributionAgentQueueServlet.java
+++ b/src/main/java/org/apache/sling/distribution/servlet/DistributionAgentQueueServlet.java
@@ -20,6 +20,9 @@
 
 import javax.servlet.ServletException;
 import java.io.IOException;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.Set;
 
 import org.apache.felix.scr.annotations.Reference;
 import org.apache.felix.scr.annotations.sling.SlingServlet;
@@ -36,6 +39,7 @@
 import org.apache.sling.distribution.queue.spi.DistributionQueue;
 import org.apache.sling.distribution.queue.DistributionQueueEntry;
 import org.apache.sling.distribution.queue.DistributionQueueItem;
+import org.apache.sling.distribution.queue.spi.Removable;
 import org.apache.sling.distribution.resources.DistributionResourceTypes;
 import org.apache.sling.distribution.packaging.DistributionPackageBuilder;
 import org.apache.sling.distribution.packaging.impl.DistributionPackageBuilderProvider;
@@ -72,7 +76,11 @@
             String[] idParam = request.getParameterValues("id");
 
             if (idParam != null) {
-                deleteItems(resourceResolver, queue, idParam);
+                if (queue instanceof Removable) {
+                    deleteItemsInBatch(resourceResolver, (Removable)queue, new HashSet<String>(Arrays.asList(idParam)), queue.getName());
+                } else {
+                    deleteItems(resourceResolver, queue, idParam);
+                }
             } else {
                 int limit = 1;
                 try {
@@ -81,7 +89,7 @@
                     log.warn("limit param malformed : "+limitParam, ex);
                 }
                 if (queue instanceof Clearable) {
-                    clearItems(resourceResolver, queue, limit);
+                    clearItems(resourceResolver, (Clearable)queue, limit, queue.getName());
                 } else {
                     deleteItems(resourceResolver, queue, limit);
                 }
@@ -138,18 +146,26 @@
         DistributionQueueItem item = entry.getItem();
         String id = entry.getId();
         queue.remove(id);
-
-        DistributionPackage distributionPackage = getPackage(resourceResolver, item);
-        DistributionPackageUtils.releaseOrDelete(distributionPackage, queue.getName());
+        releaseOrDeletePackage(resourceResolver, item, queue.getName());
     }
 
-    private void clearItems(ResourceResolver resourceResolver, DistributionQueue queue, int limit) {
-        for (DistributionQueueEntry removed : ((Clearable)queue).clear(limit)) {
-            DistributionPackage distributionPackage = getPackage(resourceResolver, removed.getItem());
-            DistributionPackageUtils.releaseOrDelete(distributionPackage, queue.getName());
+    private void deleteItemsInBatch(ResourceResolver resourceResolver, Removable queue, Set<String> entryIds, String queueName) {
+        for (DistributionQueueEntry removed : queue.remove(entryIds)) {
+            releaseOrDeletePackage(resourceResolver, removed.getItem(), queueName);
         }
     }
 
+    private void clearItems(ResourceResolver resourceResolver, Clearable queue, int limit, String queueName) {
+        for (DistributionQueueEntry removed : queue.clear(limit)) {
+            releaseOrDeletePackage(resourceResolver, removed.getItem(), queueName);
+        }
+    }
+
+    private void releaseOrDeletePackage(ResourceResolver resourceResolver, DistributionQueueItem queueItem, String queueName) {
+        DistributionPackage distributionPackage = getPackage(resourceResolver, queueItem);
+        DistributionPackageUtils.releaseOrDelete(distributionPackage, queueName);
+    }
+
     private DistributionPackage getPackage(ResourceResolver resourceResolver, DistributionQueueItem item) {
         DistributionPackageInfo info = DistributionPackageUtils.fromQueueItem(item);
         String type = info.getType();