SLING-8086 - Extend distribution queue SPI with the ability to clear and batch remove items
diff --git a/src/main/java/org/apache/sling/distribution/queue/DistributionQueueCapabilities.java b/src/main/java/org/apache/sling/distribution/queue/DistributionQueueCapabilities.java
new file mode 100644
index 0000000..048e815
--- /dev/null
+++ b/src/main/java/org/apache/sling/distribution/queue/DistributionQueueCapabilities.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sling.distribution.queue;
+
+import aQute.bnd.annotation.ProviderType;
+
+@ProviderType
+public final class DistributionQueueCapabilities {
+
+ /**
+ * Indicates that the queue supports removing random entries.
+ */
+ public static final String REMOVABLE = "removable";
+
+ /**
+ * Indicates that the queue supports clearing entries.
+ */
+ public static final String CLEARABLE = "clearable";
+
+ /**
+ * Indicates that the queue supports adding entries.
+ */
+ public static final String APPENDABLE = "appendable";
+
+
+}
diff --git a/src/main/java/org/apache/sling/distribution/queue/impl/DistributionQueueWrapper.java b/src/main/java/org/apache/sling/distribution/queue/impl/DistributionQueueWrapper.java
index dc179d3..d7aa307 100644
--- a/src/main/java/org/apache/sling/distribution/queue/impl/DistributionQueueWrapper.java
+++ b/src/main/java/org/apache/sling/distribution/queue/impl/DistributionQueueWrapper.java
@@ -18,13 +18,29 @@
*/
package org.apache.sling.distribution.queue.impl;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
import org.apache.sling.distribution.queue.spi.DistributionQueue;
import org.apache.sling.distribution.queue.DistributionQueueEntry;
import org.apache.sling.distribution.queue.DistributionQueueItem;
import org.apache.sling.distribution.queue.DistributionQueueStatus;
import org.jetbrains.annotations.NotNull;
+import static org.apache.sling.distribution.queue.DistributionQueueCapabilities.APPENDABLE;
+import static org.apache.sling.distribution.queue.DistributionQueueCapabilities.REMOVABLE;
+import static org.apache.sling.distribution.queue.DistributionQueueCapabilities.CLEARABLE;
+
public abstract class DistributionQueueWrapper implements DistributionQueue {
+
+ private static final Set<String> CAPABILITIES = Collections.unmodifiableSet(
+ new HashSet<String>(Arrays.asList(APPENDABLE, REMOVABLE, CLEARABLE)));
+
+
final DistributionQueue wrappedQueue;
DistributionQueueWrapper(DistributionQueue wrappedQueue) {
@@ -69,4 +85,21 @@
public DistributionQueueStatus getStatus() {
return wrappedQueue.getStatus();
}
+
+ @NotNull
+ @Override
+ public Iterable<DistributionQueueEntry> remove(@NotNull Set<String> entryIds) {
+ return wrappedQueue.remove(entryIds);
+ }
+
+ @NotNull
+ @Override
+ public Iterable<DistributionQueueEntry> clear(int limit) {
+ return wrappedQueue.clear(limit);
+ }
+
+ @Override
+ public boolean hasCapability(@NotNull String capability) {
+ return CAPABILITIES.contains(capability);
+ }
}
diff --git a/src/main/java/org/apache/sling/distribution/queue/impl/jobhandling/JobHandlingDistributionQueue.java b/src/main/java/org/apache/sling/distribution/queue/impl/jobhandling/JobHandlingDistributionQueue.java
index d19424f..e8b9081 100644
--- a/src/main/java/org/apache/sling/distribution/queue/impl/jobhandling/JobHandlingDistributionQueue.java
+++ b/src/main/java/org/apache/sling/distribution/queue/impl/jobhandling/JobHandlingDistributionQueue.java
@@ -19,9 +19,14 @@
package org.apache.sling.distribution.queue.impl.jobhandling;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
import java.util.List;
import java.util.Map;
+import java.util.Set;
+
import org.apache.sling.distribution.queue.spi.DistributionQueue;
import org.apache.sling.distribution.queue.DistributionQueueEntry;
import org.apache.sling.distribution.queue.DistributionQueueItem;
@@ -37,6 +42,10 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import static org.apache.sling.distribution.queue.DistributionQueueCapabilities.APPENDABLE;
+import static org.apache.sling.distribution.queue.DistributionQueueCapabilities.REMOVABLE;
+import static org.apache.sling.distribution.queue.DistributionQueueCapabilities.CLEARABLE;
+
/**
* a {@link DistributionQueue} based on Sling Job Handling facilities
*/
@@ -44,6 +53,9 @@
public final static String DISTRIBUTION_QUEUE_TOPIC = "org/apache/sling/distribution/queue";
+ private static final Set<String> CAPABILITIES = Collections.unmodifiableSet(
+ new HashSet<String>(Arrays.asList(APPENDABLE, REMOVABLE, CLEARABLE)));
+
private final Logger log = LoggerFactory.getLogger(getClass());
private final String name;
@@ -162,6 +174,19 @@
return null;
}
+ @NotNull
+ @Override
+ public Iterable<DistributionQueueEntry> remove(@NotNull Set<String> entryIds) {
+ List<DistributionQueueEntry> removed = new ArrayList<DistributionQueueEntry>();
+ for (String entryId : entryIds) {
+ DistributionQueueEntry entry = remove(entryId);
+ if (entry != null) {
+ removed.add(entry);
+ }
+ }
+ return removed;
+ }
+
public DistributionQueueEntry remove(@NotNull String id) {
boolean removed = false;
Job job = getJob(id);
@@ -202,4 +227,21 @@
return type;
}
+ @NotNull
+ @Override
+ public Iterable<DistributionQueueEntry> clear(int limit) {
+ final List<DistributionQueueEntry> removedEntries = new ArrayList<DistributionQueueEntry>();
+ for (DistributionQueueEntry entry : getItems(0, limit)) {
+ DistributionQueueEntry removed = remove(entry.getId());
+ if (removed != null) {
+ removedEntries.add(removed);
+ }
+ }
+ return removedEntries;
+ }
+
+ @Override
+ public boolean hasCapability(@NotNull String capability) {
+ return CAPABILITIES.contains(capability);
+ }
}
diff --git a/src/main/java/org/apache/sling/distribution/queue/impl/resource/ResourceQueue.java b/src/main/java/org/apache/sling/distribution/queue/impl/resource/ResourceQueue.java
index 28162c3..612211d 100644
--- a/src/main/java/org/apache/sling/distribution/queue/impl/resource/ResourceQueue.java
+++ b/src/main/java/org/apache/sling/distribution/queue/impl/resource/ResourceQueue.java
@@ -36,10 +36,23 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashSet;
import java.util.List;
+import java.util.Set;
+
+import static org.apache.sling.distribution.queue.DistributionQueueCapabilities.APPENDABLE;
+import static org.apache.sling.distribution.queue.DistributionQueueCapabilities.REMOVABLE;
+import static org.apache.sling.distribution.queue.DistributionQueueCapabilities.CLEARABLE;
public class ResourceQueue implements DistributionQueue {
+
+ private static final Set<String> CAPABILITIES = Collections.unmodifiableSet(
+ new HashSet<String>(Arrays.asList(APPENDABLE, REMOVABLE, CLEARABLE)));
+
private final Logger log = LoggerFactory.getLogger(getClass());
@@ -160,6 +173,19 @@
}
}
+ @NotNull
+ @Override
+ public Iterable<DistributionQueueEntry> remove(@NotNull Set<String> entryIds) {
+ List<DistributionQueueEntry> removed = new ArrayList<DistributionQueueEntry>();
+ for (String entryId : entryIds) {
+ DistributionQueueEntry entry = remove(entryId);
+ if (entry != null) {
+ removed.add(entry);
+ }
+ }
+ return removed;
+ }
+
@Nullable
@Override
public DistributionQueueEntry remove(@NotNull String itemId) {
@@ -228,4 +254,23 @@
DistributionQueueItem item = entry.getItem();
log.debug("queue[{}] {} entryId={} packageId={}", new Object[] { queueName, scope, entryId, item.getPackageId() });
}
+
+ @NotNull
+ @Override
+ public Iterable<DistributionQueueEntry> clear(int limit) {
+ final List<DistributionQueueEntry> removedEntries = new ArrayList<DistributionQueueEntry>();
+ for (DistributionQueueEntry entry : getItems(0, limit)) {
+ DistributionQueueEntry removed = remove(entry.getId());
+ if (removed != null) {
+ removedEntries.add(removed);
+ }
+ }
+ return removedEntries;
+ }
+
+ @Override
+ public boolean hasCapability(@NotNull String capability) {
+ return CAPABILITIES.contains(capability);
+ }
+
}
diff --git a/src/main/java/org/apache/sling/distribution/queue/impl/simple/SimpleDistributionQueue.java b/src/main/java/org/apache/sling/distribution/queue/impl/simple/SimpleDistributionQueue.java
index 26e05d4..24b376b 100644
--- a/src/main/java/org/apache/sling/distribution/queue/impl/simple/SimpleDistributionQueue.java
+++ b/src/main/java/org/apache/sling/distribution/queue/impl/simple/SimpleDistributionQueue.java
@@ -19,12 +19,17 @@
package org.apache.sling.distribution.queue.impl.simple;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.Calendar;
+import java.util.Collections;
+import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Queue;
+import java.util.Set;
import java.util.WeakHashMap;
import java.util.concurrent.LinkedBlockingQueue;
+
import org.apache.sling.distribution.queue.spi.DistributionQueue;
import org.apache.sling.distribution.queue.DistributionQueueEntry;
import org.apache.sling.distribution.queue.DistributionQueueItem;
@@ -39,6 +44,10 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import static org.apache.sling.distribution.queue.DistributionQueueCapabilities.APPENDABLE;
+import static org.apache.sling.distribution.queue.DistributionQueueCapabilities.REMOVABLE;
+import static org.apache.sling.distribution.queue.DistributionQueueCapabilities.CLEARABLE;
+
/**
* A simple implementation of a {@link DistributionQueue}.
* <p/>
@@ -52,6 +61,9 @@
private final Logger log = LoggerFactory.getLogger(getClass());
+ private static final Set<String> CAPABILITIES = Collections.unmodifiableSet(
+ new HashSet<String>(Arrays.asList(APPENDABLE, REMOVABLE, CLEARABLE)));
+
private final String name;
private final Queue<DistributionQueueItem> queue;
@@ -122,6 +134,11 @@
return DistributionQueueType.ORDERED;
}
+ @Override
+ public boolean hasCapability(@NotNull String capability) {
+ return CAPABILITIES.contains(capability);
+ }
+
@NotNull
public Iterable<DistributionQueueEntry> getItems(int skip, int limit) {
@@ -144,6 +161,18 @@
return null;
}
+ @NotNull
+ @Override
+ public Iterable<DistributionQueueEntry> remove(@NotNull Set<String> entryIds) {
+ List<DistributionQueueEntry> removed = new ArrayList<DistributionQueueEntry>();
+ for (String entryId : entryIds) {
+ DistributionQueueEntry entry = remove(entryId);
+ if (entry != null) {
+ removed.add(entry);
+ }
+ }
+ return removed;
+ }
@Nullable
public DistributionQueueEntry remove(@NotNull String id) {
@@ -168,4 +197,17 @@
'}';
}
+ @NotNull
+ @Override
+ public Iterable<DistributionQueueEntry> clear(int limit) {
+ final List<DistributionQueueEntry> removedEntries = new ArrayList<DistributionQueueEntry>();
+ for (DistributionQueueEntry entry : getItems(0, limit)) {
+ DistributionQueueEntry removed = remove(entry.getId());
+ if (removed != null) {
+ removedEntries.add(removed);
+ }
+ }
+ return removedEntries;
+ }
+
}
diff --git a/src/main/java/org/apache/sling/distribution/queue/package-info.java b/src/main/java/org/apache/sling/distribution/queue/package-info.java
index c81b5db..70795e9 100644
--- a/src/main/java/org/apache/sling/distribution/queue/package-info.java
+++ b/src/main/java/org/apache/sling/distribution/queue/package-info.java
@@ -17,7 +17,7 @@
* under the License.
*/
-@Version("0.0.1")
+@Version("0.1.0")
package org.apache.sling.distribution.queue;
import aQute.bnd.annotation.Version;
diff --git a/src/main/java/org/apache/sling/distribution/queue/spi/DistributionQueue.java b/src/main/java/org/apache/sling/distribution/queue/spi/DistributionQueue.java
index 7cff77e..cd052e1 100644
--- a/src/main/java/org/apache/sling/distribution/queue/spi/DistributionQueue.java
+++ b/src/main/java/org/apache/sling/distribution/queue/spi/DistributionQueue.java
@@ -18,6 +18,8 @@
*/
package org.apache.sling.distribution.queue.spi;
+import java.util.Set;
+
import aQute.bnd.annotation.ConsumerType;
import org.apache.sling.distribution.agent.spi.DistributionAgent;
import org.apache.sling.distribution.packaging.DistributionPackage;
@@ -99,6 +101,26 @@
DistributionQueueEntry remove(@NotNull String itemId);
/**
+ * Remove a set entries from the queue by specifying their identifiers.
+ *
+ * @param entryIds The identifiers of the entries to be removed
+ * @return an iterable over the removed entries
+ */
+ @NotNull
+ Iterable<DistributionQueueEntry> remove(@NotNull Set<String> entryIds);
+
+ /**
+ * Clear a range of entries from the queue. The range starts from
+ * the head entry, includes the specified #limit number of entries.
+ *
+ * @param limit The maximum number of entries to remove. All entries
+ * are removed when the limit is {@code -1}.
+ * @return an iterable over the removed entries
+ */
+ @NotNull
+ Iterable<DistributionQueueEntry> clear(int limit);
+
+ /**
* get the status of the queue
* @return the queue status
*/
@@ -111,4 +133,10 @@
*/
@NotNull
DistributionQueueType getType();
+
+ /**
+ * @return {@code true} if the queue supports the capability ;
+ * {@code false} otherwise
+ */
+ boolean hasCapability(@NotNull String capability);
}
diff --git a/src/main/java/org/apache/sling/distribution/queue/spi/package-info.java b/src/main/java/org/apache/sling/distribution/queue/spi/package-info.java
index 96aa8bd..912273a 100644
--- a/src/main/java/org/apache/sling/distribution/queue/spi/package-info.java
+++ b/src/main/java/org/apache/sling/distribution/queue/spi/package-info.java
@@ -17,7 +17,7 @@
* under the License.
*/
-@Version("0.0.1")
+@Version("1.0.0")
package org.apache.sling.distribution.queue.spi;
import aQute.bnd.annotation.Version;
diff --git a/src/main/java/org/apache/sling/distribution/servlet/DistributionAgentQueueServlet.java b/src/main/java/org/apache/sling/distribution/servlet/DistributionAgentQueueServlet.java
index dffc64c..767868b 100644
--- a/src/main/java/org/apache/sling/distribution/servlet/DistributionAgentQueueServlet.java
+++ b/src/main/java/org/apache/sling/distribution/servlet/DistributionAgentQueueServlet.java
@@ -20,6 +20,9 @@
import javax.servlet.ServletException;
import java.io.IOException;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.Set;
import org.apache.felix.scr.annotations.Reference;
import org.apache.felix.scr.annotations.sling.SlingServlet;
@@ -42,6 +45,10 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import static org.apache.sling.distribution.queue.DistributionQueueCapabilities.REMOVABLE;
+import static org.apache.sling.distribution.queue.DistributionQueueCapabilities.CLEARABLE;
+import static org.apache.sling.distribution.queue.DistributionQueueCapabilities.APPENDABLE;
+
/**
* Servlet to retrieve a {@link DistributionQueue} status.
*/
@@ -69,9 +76,9 @@
if ("delete".equals(operation)) {
String limitParam = request.getParameter("limit");
String[] idParam = request.getParameterValues("id");
-
if (idParam != null) {
- deleteItems(resourceResolver, queue, idParam);
+ assertCapability(queue, REMOVABLE);
+ deleteItems(resourceResolver, queue, new HashSet<String>(Arrays.asList(idParam)));
} else {
int limit = 1;
try {
@@ -79,28 +86,32 @@
} catch (NumberFormatException ex) {
log.warn("limit param malformed : "+limitParam, ex);
}
- deleteItems(resourceResolver, queue, limit);
+ assertCapability(queue, CLEARABLE);
+ clearItems(resourceResolver, queue, limit);
}
} else if ("copy".equals(operation)) {
String from = request.getParameter("from");
String[] idParam = request.getParameterValues("id");
if (idParam != null && from != null) {
+ assertCapability(queue, APPENDABLE);
DistributionAgent agent = request.getResource().getParent().getParent().adaptTo(DistributionAgent.class);
DistributionQueue sourceQueue = getQueueOrThrow(agent,from);
addItems(resourceResolver, queue, sourceQueue, idParam);
+
}
} else if ("move".equals(operation)) {
String from = request.getParameter("from");
String[] idParam = request.getParameterValues("id");
if (idParam != null && from != null) {
+ assertCapability(queue, APPENDABLE);
DistributionAgent agent = request.getResource().getParent().getParent().adaptTo(DistributionAgent.class);
DistributionQueue sourceQueue = getQueueOrThrow(agent,from);
-
+ assertCapability(sourceQueue, REMOVABLE);
addItems(resourceResolver, queue, sourceQueue, idParam);
- deleteItems(resourceResolver, sourceQueue, idParam);
+ deleteItems(resourceResolver, sourceQueue, new HashSet<String>(Arrays.asList(idParam)));
}
}
}
@@ -116,26 +127,21 @@
}
}
- private void deleteItems(ResourceResolver resourceResolver, DistributionQueue queue, int limit) {
- for (DistributionQueueEntry item : queue.getItems(0, limit)) {
- deleteItem(resourceResolver, queue, item);
+ private void deleteItems(ResourceResolver resourceResolver, DistributionQueue queue, Set<String> entryIds) {
+ for (DistributionQueueEntry removed : queue.remove(entryIds)) {
+ releaseOrDeletePackage(resourceResolver, removed.getItem(), queue.getName());
}
}
- private void deleteItems(ResourceResolver resourceResolver, DistributionQueue queue, String[] ids) {
- for (String id : ids) {
- DistributionQueueEntry entry = queue.getItem(id);
- deleteItem(resourceResolver, queue, entry);
+ private void clearItems(ResourceResolver resourceResolver, DistributionQueue queue, int limit) {
+ for (DistributionQueueEntry removed : queue.clear(limit)) {
+ releaseOrDeletePackage(resourceResolver, removed.getItem(), queue.getName());
}
}
- private void deleteItem(ResourceResolver resourceResolver, DistributionQueue queue, DistributionQueueEntry entry) {
- DistributionQueueItem item = entry.getItem();
- String id = entry.getId();
- queue.remove(id);
-
- DistributionPackage distributionPackage = getPackage(resourceResolver, item);
- DistributionPackageUtils.releaseOrDelete(distributionPackage, queue.getName());
+ private void releaseOrDeletePackage(ResourceResolver resourceResolver, DistributionQueueItem queueItem, String queueName) {
+ DistributionPackage distributionPackage = getPackage(resourceResolver, queueItem);
+ DistributionPackageUtils.releaseOrDelete(distributionPackage, queueName);
}
private DistributionPackage getPackage(ResourceResolver resourceResolver, DistributionQueueItem item) {
@@ -156,6 +162,12 @@
return null;
}
+ private void assertCapability(DistributionQueue queue, String capability) {
+ if (!queue.hasCapability(capability)) {
+ throw new UnsupportedOperationException(String.format("Capability %s not supported for queue %s", capability, queue.getName()));
+ }
+ }
+
@NotNull
private static DistributionQueue getQueueOrThrow(@NotNull DistributionAgent agent, @NotNull String queueName) {
DistributionQueue queue = agent.getQueue(queueName);