SLING-8086 - Extend distribution queue SPI with the ability to clear and batch remove items
diff --git a/src/main/java/org/apache/sling/distribution/queue/DistributionQueueCapabilities.java b/src/main/java/org/apache/sling/distribution/queue/DistributionQueueCapabilities.java
new file mode 100644
index 0000000..048e815
--- /dev/null
+++ b/src/main/java/org/apache/sling/distribution/queue/DistributionQueueCapabilities.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sling.distribution.queue;
+
+import aQute.bnd.annotation.ProviderType;
+
+@ProviderType
+public final class DistributionQueueCapabilities {
+
+    /**
+     * Indicates that the queue supports removing random entries.
+     */
+    public static final String REMOVABLE = "removable";
+
+    /**
+     * Indicates that the queue supports clearing entries.
+     */
+    public static final String CLEARABLE = "clearable";
+
+    /**
+     * Indicates that the queue supports adding entries.
+     */
+    public static final String APPENDABLE = "appendable";
+
+
+}
diff --git a/src/main/java/org/apache/sling/distribution/queue/impl/DistributionQueueWrapper.java b/src/main/java/org/apache/sling/distribution/queue/impl/DistributionQueueWrapper.java
index dc179d3..d7aa307 100644
--- a/src/main/java/org/apache/sling/distribution/queue/impl/DistributionQueueWrapper.java
+++ b/src/main/java/org/apache/sling/distribution/queue/impl/DistributionQueueWrapper.java
@@ -18,13 +18,29 @@
  */
 package org.apache.sling.distribution.queue.impl;
 
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
 import org.apache.sling.distribution.queue.spi.DistributionQueue;
 import org.apache.sling.distribution.queue.DistributionQueueEntry;
 import org.apache.sling.distribution.queue.DistributionQueueItem;
 import org.apache.sling.distribution.queue.DistributionQueueStatus;
 import org.jetbrains.annotations.NotNull;
 
+import static org.apache.sling.distribution.queue.DistributionQueueCapabilities.APPENDABLE;
+import static org.apache.sling.distribution.queue.DistributionQueueCapabilities.REMOVABLE;
+import static org.apache.sling.distribution.queue.DistributionQueueCapabilities.CLEARABLE;
+
 public abstract class DistributionQueueWrapper implements DistributionQueue {
+
+    private static final Set<String> CAPABILITIES = Collections.unmodifiableSet(
+            new HashSet<String>(Arrays.asList(APPENDABLE, REMOVABLE, CLEARABLE)));
+
+
     final DistributionQueue wrappedQueue;
 
     DistributionQueueWrapper(DistributionQueue wrappedQueue) {
@@ -69,4 +85,21 @@
     public DistributionQueueStatus getStatus() {
         return wrappedQueue.getStatus();
     }
+
+    @NotNull
+    @Override
+    public Iterable<DistributionQueueEntry> remove(@NotNull Set<String> entryIds) {
+        return wrappedQueue.remove(entryIds);
+    }
+
+    @NotNull
+    @Override
+    public Iterable<DistributionQueueEntry> clear(int limit) {
+        return wrappedQueue.clear(limit);
+    }
+
+    @Override
+    public boolean hasCapability(@NotNull String capability) {
+        return CAPABILITIES.contains(capability);
+    }
 }
diff --git a/src/main/java/org/apache/sling/distribution/queue/impl/jobhandling/JobHandlingDistributionQueue.java b/src/main/java/org/apache/sling/distribution/queue/impl/jobhandling/JobHandlingDistributionQueue.java
index d19424f..e8b9081 100644
--- a/src/main/java/org/apache/sling/distribution/queue/impl/jobhandling/JobHandlingDistributionQueue.java
+++ b/src/main/java/org/apache/sling/distribution/queue/impl/jobhandling/JobHandlingDistributionQueue.java
@@ -19,9 +19,14 @@
 package org.apache.sling.distribution.queue.impl.jobhandling;
 
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
+
 import org.apache.sling.distribution.queue.spi.DistributionQueue;
 import org.apache.sling.distribution.queue.DistributionQueueEntry;
 import org.apache.sling.distribution.queue.DistributionQueueItem;
@@ -37,6 +42,10 @@
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import static org.apache.sling.distribution.queue.DistributionQueueCapabilities.APPENDABLE;
+import static org.apache.sling.distribution.queue.DistributionQueueCapabilities.REMOVABLE;
+import static org.apache.sling.distribution.queue.DistributionQueueCapabilities.CLEARABLE;
+
 /**
  * a {@link DistributionQueue} based on Sling Job Handling facilities
  */
@@ -44,6 +53,9 @@
 
     public final static String DISTRIBUTION_QUEUE_TOPIC = "org/apache/sling/distribution/queue";
 
+    private static final Set<String> CAPABILITIES = Collections.unmodifiableSet(
+            new HashSet<String>(Arrays.asList(APPENDABLE, REMOVABLE, CLEARABLE)));
+
     private final Logger log = LoggerFactory.getLogger(getClass());
 
     private final String name;
@@ -162,6 +174,19 @@
         return null;
     }
 
+    @NotNull
+    @Override
+    public Iterable<DistributionQueueEntry> remove(@NotNull Set<String> entryIds) {
+        List<DistributionQueueEntry> removed = new ArrayList<DistributionQueueEntry>();
+        for (String entryId : entryIds) {
+            DistributionQueueEntry entry = remove(entryId);
+            if (entry != null) {
+                removed.add(entry);
+            }
+        }
+        return removed;
+    }
+
     public DistributionQueueEntry remove(@NotNull String id) {
         boolean removed = false;
         Job job = getJob(id);
@@ -202,4 +227,21 @@
         return type;
     }
 
+    @NotNull
+    @Override
+    public Iterable<DistributionQueueEntry> clear(int limit) {
+        final List<DistributionQueueEntry> removedEntries = new ArrayList<DistributionQueueEntry>();
+        for (DistributionQueueEntry entry : getItems(0, limit)) {
+            DistributionQueueEntry removed = remove(entry.getId());
+            if (removed != null) {
+                removedEntries.add(removed);
+            }
+        }
+        return removedEntries;
+    }
+
+    @Override
+    public boolean hasCapability(@NotNull String capability) {
+        return CAPABILITIES.contains(capability);
+    }
 }
diff --git a/src/main/java/org/apache/sling/distribution/queue/impl/resource/ResourceQueue.java b/src/main/java/org/apache/sling/distribution/queue/impl/resource/ResourceQueue.java
index 28162c3..612211d 100644
--- a/src/main/java/org/apache/sling/distribution/queue/impl/resource/ResourceQueue.java
+++ b/src/main/java/org/apache/sling/distribution/queue/impl/resource/ResourceQueue.java
@@ -36,10 +36,23 @@
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashSet;
 import java.util.List;
+import java.util.Set;
+
+import static org.apache.sling.distribution.queue.DistributionQueueCapabilities.APPENDABLE;
+import static org.apache.sling.distribution.queue.DistributionQueueCapabilities.REMOVABLE;
+import static org.apache.sling.distribution.queue.DistributionQueueCapabilities.CLEARABLE;
 
 
 public class ResourceQueue implements DistributionQueue {
+
+    private static final Set<String> CAPABILITIES = Collections.unmodifiableSet(
+            new HashSet<String>(Arrays.asList(APPENDABLE, REMOVABLE, CLEARABLE)));
+
     private final Logger log = LoggerFactory.getLogger(getClass());
 
 
@@ -160,6 +173,19 @@
         }
     }
 
+    @NotNull
+    @Override
+    public Iterable<DistributionQueueEntry> remove(@NotNull Set<String> entryIds) {
+        List<DistributionQueueEntry> removed = new ArrayList<DistributionQueueEntry>();
+        for (String entryId : entryIds) {
+            DistributionQueueEntry entry = remove(entryId);
+            if (entry != null) {
+                removed.add(entry);
+            }
+        }
+        return removed;
+    }
+
     @Nullable
     @Override
     public DistributionQueueEntry remove(@NotNull String itemId) {
@@ -228,4 +254,23 @@
         DistributionQueueItem item = entry.getItem();
         log.debug("queue[{}] {} entryId={} packageId={}", new Object[] { queueName, scope, entryId, item.getPackageId() });
     }
+
+    @NotNull
+    @Override
+    public Iterable<DistributionQueueEntry> clear(int limit) {
+        final List<DistributionQueueEntry> removedEntries = new ArrayList<DistributionQueueEntry>();
+        for (DistributionQueueEntry entry : getItems(0, limit)) {
+            DistributionQueueEntry removed = remove(entry.getId());
+            if (removed != null) {
+                removedEntries.add(removed);
+            }
+        }
+        return removedEntries;
+    }
+
+    @Override
+    public boolean hasCapability(@NotNull String capability) {
+        return CAPABILITIES.contains(capability);
+    }
+
 }
diff --git a/src/main/java/org/apache/sling/distribution/queue/impl/simple/SimpleDistributionQueue.java b/src/main/java/org/apache/sling/distribution/queue/impl/simple/SimpleDistributionQueue.java
index 26e05d4..24b376b 100644
--- a/src/main/java/org/apache/sling/distribution/queue/impl/simple/SimpleDistributionQueue.java
+++ b/src/main/java/org/apache/sling/distribution/queue/impl/simple/SimpleDistributionQueue.java
@@ -19,12 +19,17 @@
 package org.apache.sling.distribution.queue.impl.simple;
 
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Calendar;
+import java.util.Collections;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Queue;
+import java.util.Set;
 import java.util.WeakHashMap;
 import java.util.concurrent.LinkedBlockingQueue;
+
 import org.apache.sling.distribution.queue.spi.DistributionQueue;
 import org.apache.sling.distribution.queue.DistributionQueueEntry;
 import org.apache.sling.distribution.queue.DistributionQueueItem;
@@ -39,6 +44,10 @@
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import static org.apache.sling.distribution.queue.DistributionQueueCapabilities.APPENDABLE;
+import static org.apache.sling.distribution.queue.DistributionQueueCapabilities.REMOVABLE;
+import static org.apache.sling.distribution.queue.DistributionQueueCapabilities.CLEARABLE;
+
 /**
  * A simple implementation of a {@link DistributionQueue}.
  * <p/>
@@ -52,6 +61,9 @@
 
     private final Logger log = LoggerFactory.getLogger(getClass());
 
+    private static final Set<String> CAPABILITIES = Collections.unmodifiableSet(
+            new HashSet<String>(Arrays.asList(APPENDABLE, REMOVABLE, CLEARABLE)));
+
     private final String name;
 
     private final Queue<DistributionQueueItem> queue;
@@ -122,6 +134,11 @@
         return DistributionQueueType.ORDERED;
     }
 
+    @Override
+    public boolean hasCapability(@NotNull String capability) {
+        return CAPABILITIES.contains(capability);
+    }
+
 
     @NotNull
     public Iterable<DistributionQueueEntry> getItems(int skip, int limit) {
@@ -144,6 +161,18 @@
         return null;
     }
 
+    @NotNull
+    @Override
+    public Iterable<DistributionQueueEntry> remove(@NotNull Set<String> entryIds) {
+        List<DistributionQueueEntry> removed = new ArrayList<DistributionQueueEntry>();
+        for (String entryId : entryIds) {
+            DistributionQueueEntry entry = remove(entryId);
+            if (entry != null) {
+                removed.add(entry);
+            }
+        }
+        return removed;
+    }
 
     @Nullable
     public DistributionQueueEntry remove(@NotNull String id) {
@@ -168,4 +197,17 @@
                 '}';
     }
 
+    @NotNull
+    @Override
+    public Iterable<DistributionQueueEntry> clear(int limit) {
+        final List<DistributionQueueEntry> removedEntries = new ArrayList<DistributionQueueEntry>();
+        for (DistributionQueueEntry entry : getItems(0, limit)) {
+            DistributionQueueEntry removed = remove(entry.getId());
+            if (removed != null) {
+                removedEntries.add(removed);
+            }
+        }
+        return removedEntries;
+    }
+
 }
diff --git a/src/main/java/org/apache/sling/distribution/queue/package-info.java b/src/main/java/org/apache/sling/distribution/queue/package-info.java
index c81b5db..70795e9 100644
--- a/src/main/java/org/apache/sling/distribution/queue/package-info.java
+++ b/src/main/java/org/apache/sling/distribution/queue/package-info.java
@@ -17,7 +17,7 @@
  * under the License.
  */
 
-@Version("0.0.1")
+@Version("0.1.0")
 package org.apache.sling.distribution.queue;
 
 import aQute.bnd.annotation.Version;
diff --git a/src/main/java/org/apache/sling/distribution/queue/spi/DistributionQueue.java b/src/main/java/org/apache/sling/distribution/queue/spi/DistributionQueue.java
index 7cff77e..cd052e1 100644
--- a/src/main/java/org/apache/sling/distribution/queue/spi/DistributionQueue.java
+++ b/src/main/java/org/apache/sling/distribution/queue/spi/DistributionQueue.java
@@ -18,6 +18,8 @@
  */
 package org.apache.sling.distribution.queue.spi;
 
+import java.util.Set;
+
 import aQute.bnd.annotation.ConsumerType;
 import org.apache.sling.distribution.agent.spi.DistributionAgent;
 import org.apache.sling.distribution.packaging.DistributionPackage;
@@ -99,6 +101,26 @@
     DistributionQueueEntry remove(@NotNull String itemId);
 
     /**
+     * Remove a set entries from the queue by specifying their identifiers.
+     *
+     * @param entryIds The identifiers of the entries to be removed
+     * @return an iterable over the removed entries
+     */
+    @NotNull
+    Iterable<DistributionQueueEntry> remove(@NotNull Set<String> entryIds);
+
+    /**
+     * Clear a range of entries from the queue. The range starts from
+     * the head entry, includes the specified #limit number of entries.
+     *
+     * @param limit The maximum number of entries to remove. All entries
+     *              are removed when the limit is {@code -1}.
+     * @return an iterable over the removed entries
+     */
+    @NotNull
+    Iterable<DistributionQueueEntry> clear(int limit);
+
+    /**
      * get the status of the queue
      * @return the queue status
      */
@@ -111,4 +133,10 @@
      */
     @NotNull
     DistributionQueueType getType();
+
+    /**
+     * @return {@code true} if the queue supports the capability ;
+     *         {@code false} otherwise
+     */
+    boolean hasCapability(@NotNull String capability);
 }
diff --git a/src/main/java/org/apache/sling/distribution/queue/spi/package-info.java b/src/main/java/org/apache/sling/distribution/queue/spi/package-info.java
index 96aa8bd..912273a 100644
--- a/src/main/java/org/apache/sling/distribution/queue/spi/package-info.java
+++ b/src/main/java/org/apache/sling/distribution/queue/spi/package-info.java
@@ -17,7 +17,7 @@
  * under the License.
  */
 
-@Version("0.0.1")
+@Version("1.0.0")
 package org.apache.sling.distribution.queue.spi;
 
 import aQute.bnd.annotation.Version;
diff --git a/src/main/java/org/apache/sling/distribution/servlet/DistributionAgentQueueServlet.java b/src/main/java/org/apache/sling/distribution/servlet/DistributionAgentQueueServlet.java
index dffc64c..767868b 100644
--- a/src/main/java/org/apache/sling/distribution/servlet/DistributionAgentQueueServlet.java
+++ b/src/main/java/org/apache/sling/distribution/servlet/DistributionAgentQueueServlet.java
@@ -20,6 +20,9 @@
 
 import javax.servlet.ServletException;
 import java.io.IOException;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.Set;
 
 import org.apache.felix.scr.annotations.Reference;
 import org.apache.felix.scr.annotations.sling.SlingServlet;
@@ -42,6 +45,10 @@
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import static org.apache.sling.distribution.queue.DistributionQueueCapabilities.REMOVABLE;
+import static org.apache.sling.distribution.queue.DistributionQueueCapabilities.CLEARABLE;
+import static org.apache.sling.distribution.queue.DistributionQueueCapabilities.APPENDABLE;
+
 /**
  * Servlet to retrieve a {@link DistributionQueue} status.
  */
@@ -69,9 +76,9 @@
         if ("delete".equals(operation)) {
             String limitParam = request.getParameter("limit");
             String[] idParam = request.getParameterValues("id");
-
             if (idParam != null) {
-                deleteItems(resourceResolver, queue, idParam);
+                assertCapability(queue, REMOVABLE);
+                deleteItems(resourceResolver, queue, new HashSet<String>(Arrays.asList(idParam)));
             } else {
                 int limit = 1;
                 try {
@@ -79,28 +86,32 @@
                 } catch (NumberFormatException ex) {
                     log.warn("limit param malformed : "+limitParam, ex);
                 }
-                deleteItems(resourceResolver, queue, limit);
+                assertCapability(queue, CLEARABLE);
+                clearItems(resourceResolver, queue, limit);
             }
         } else if ("copy".equals(operation)) {
             String from = request.getParameter("from");
             String[] idParam = request.getParameterValues("id");
 
             if (idParam != null && from != null) {
+                assertCapability(queue, APPENDABLE);
                 DistributionAgent agent = request.getResource().getParent().getParent().adaptTo(DistributionAgent.class);
                 DistributionQueue sourceQueue = getQueueOrThrow(agent,from);
 
                 addItems(resourceResolver, queue, sourceQueue, idParam);
+
             }
         } else if ("move".equals(operation)) {
             String from = request.getParameter("from");
             String[] idParam = request.getParameterValues("id");
 
             if (idParam != null && from != null) {
+                assertCapability(queue, APPENDABLE);
                 DistributionAgent agent = request.getResource().getParent().getParent().adaptTo(DistributionAgent.class);
                 DistributionQueue sourceQueue = getQueueOrThrow(agent,from);
-
+                assertCapability(sourceQueue, REMOVABLE);
                 addItems(resourceResolver, queue, sourceQueue, idParam);
-                deleteItems(resourceResolver, sourceQueue, idParam);
+                deleteItems(resourceResolver, sourceQueue, new HashSet<String>(Arrays.asList(idParam)));
             }
         }
     }
@@ -116,26 +127,21 @@
         }
     }
 
-    private void deleteItems(ResourceResolver resourceResolver, DistributionQueue queue, int limit) {
-        for (DistributionQueueEntry item : queue.getItems(0, limit)) {
-            deleteItem(resourceResolver, queue, item);
+    private void deleteItems(ResourceResolver resourceResolver, DistributionQueue queue, Set<String> entryIds) {
+        for (DistributionQueueEntry removed : queue.remove(entryIds)) {
+            releaseOrDeletePackage(resourceResolver, removed.getItem(), queue.getName());
         }
     }
 
-    private void deleteItems(ResourceResolver resourceResolver, DistributionQueue queue, String[] ids) {
-        for (String id : ids) {
-            DistributionQueueEntry entry = queue.getItem(id);
-            deleteItem(resourceResolver, queue, entry);
+    private void clearItems(ResourceResolver resourceResolver, DistributionQueue queue, int limit) {
+        for (DistributionQueueEntry removed : queue.clear(limit)) {
+            releaseOrDeletePackage(resourceResolver, removed.getItem(), queue.getName());
         }
     }
 
-    private void deleteItem(ResourceResolver resourceResolver, DistributionQueue queue, DistributionQueueEntry entry) {
-        DistributionQueueItem item = entry.getItem();
-        String id = entry.getId();
-        queue.remove(id);
-
-        DistributionPackage distributionPackage = getPackage(resourceResolver, item);
-        DistributionPackageUtils.releaseOrDelete(distributionPackage, queue.getName());
+    private void releaseOrDeletePackage(ResourceResolver resourceResolver, DistributionQueueItem queueItem, String queueName) {
+        DistributionPackage distributionPackage = getPackage(resourceResolver, queueItem);
+        DistributionPackageUtils.releaseOrDelete(distributionPackage, queueName);
     }
 
     private DistributionPackage getPackage(ResourceResolver resourceResolver, DistributionQueueItem item) {
@@ -156,6 +162,12 @@
         return null;
     }
 
+    private void assertCapability(DistributionQueue queue, String capability) {
+        if (!queue.hasCapability(capability)) {
+            throw new UnsupportedOperationException(String.format("Capability %s not supported for queue %s", capability, queue.getName()));
+        }
+    }
+
     @NotNull
     private static DistributionQueue getQueueOrThrow(@NotNull DistributionAgent agent, @NotNull String queueName) {
         DistributionQueue queue = agent.getQueue(queueName);