Merge pull request #24 from actinium15/issue/SLING-8853
SLING-8853 Adds ActiveResourceQueue and makes it available as …
diff --git a/src/main/java/org/apache/sling/distribution/packaging/impl/DistributionPackageUtils.java b/src/main/java/org/apache/sling/distribution/packaging/impl/DistributionPackageUtils.java
index 72100e8..4b85fc7 100644
--- a/src/main/java/org/apache/sling/distribution/packaging/impl/DistributionPackageUtils.java
+++ b/src/main/java/org/apache/sling/distribution/packaging/impl/DistributionPackageUtils.java
@@ -20,6 +20,7 @@
import org.apache.commons.io.FileUtils;
import org.apache.commons.io.IOUtils;
+import org.apache.jackrabbit.JcrConstants;
import org.apache.jackrabbit.commons.JcrUtils;
import org.apache.sling.api.resource.PersistenceException;
import org.apache.sling.api.resource.Resource;
@@ -37,7 +38,6 @@
import javax.jcr.Binary;
import javax.jcr.Node;
-import javax.jcr.Property;
import javax.jcr.RepositoryException;
import javax.jcr.nodetype.NodeType;
import java.io.ByteArrayInputStream;
@@ -287,15 +287,16 @@
public static InputStream getStream(Resource resource) throws RepositoryException {
Node parent = resource.adaptTo(Node.class);
- return parent.getProperty("bin/jcr:content/jcr:data").getBinary().getStream();
+ return parent.getProperty("bin/" + JcrConstants.JCR_CONTENT
+ + "/" + JcrConstants.JCR_DATA).getBinary().getStream();
}
public static void uploadStream(Resource resource, InputStream stream) throws RepositoryException {
Node parent = resource.adaptTo(Node.class);
Node file = JcrUtils.getOrAddNode(parent, "bin", NodeType.NT_FILE);
- Node content = JcrUtils.getOrAddNode(file, Node.JCR_CONTENT, NodeType.NT_RESOURCE);
+ Node content = JcrUtils.getOrAddNode(file, JcrConstants.JCR_CONTENT, NodeType.NT_RESOURCE);
Binary binary = parent.getSession().getValueFactory().createBinary(stream);
- content.setProperty(Property.JCR_DATA, binary);
+ content.setProperty(JcrConstants.JCR_DATA, binary);
JcrUtils.getOrAddNode(parent, "refs", NodeType.NT_UNSTRUCTURED);
}
@@ -365,7 +366,7 @@
if (file.exists()) {
inputStream = getSafeObjectInputStream(new FileInputStream(file));
- @SuppressWarnings("unchecked") // type is known by sedign
+ @SuppressWarnings("unchecked") // type is known by design
HashSet<String> fromStreamSet = (HashSet<String>) inputStream.readObject();
set = fromStreamSet;
} else {
diff --git a/src/main/java/org/apache/sling/distribution/packaging/impl/FileDistributionPackage.java b/src/main/java/org/apache/sling/distribution/packaging/impl/FileDistributionPackage.java
index bca2d1b..286629c 100644
--- a/src/main/java/org/apache/sling/distribution/packaging/impl/FileDistributionPackage.java
+++ b/src/main/java/org/apache/sling/distribution/packaging/impl/FileDistributionPackage.java
@@ -22,6 +22,8 @@
import java.io.File;
import java.io.IOException;
import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.Map;
import org.apache.commons.io.FileUtils;
import org.apache.sling.distribution.DistributionRequestType;
@@ -43,11 +45,27 @@
public FileDistributionPackage(@NotNull File file,
@NotNull String type,
@Nullable String digestAlgorithm,
- @Nullable String digestMessage) {
+ @Nullable String digestMessage,
+ @Nullable Map<String, Object> baseInfoMap) {
super(file.getName(), type, digestAlgorithm, digestMessage);
this.file = file;
+ if (null == baseInfoMap) {
+ try (InputStream metaInfoIS = FileUtils.openInputStream(getMetaInfoFile())) {
+ DistributionPackageUtils.readInfo(metaInfoIS, this.getInfo());
+ } catch (IOException e) {
+ log.error("cannot read meta-info for the package at {}", file.getAbsoluteFile(), e);
+ }
+ }
this.getInfo().put(DistributionPackageInfo.PROPERTY_REQUEST_TYPE, DistributionRequestType.ADD);
+ if (null != baseInfoMap) {
+ this.getInfo().putAll(baseInfoMap);
+ try (OutputStream metaInfoOS = FileUtils.openOutputStream(getMetaInfoFile(), false)) {
+ DistributionPackageUtils.writeInfo(metaInfoOS, baseInfoMap);
+ } catch (IOException e) {
+ log.error("cannot create meta-info for the package at {}", file.getAbsoluteFile(), e);
+ }
+ }
}
@NotNull
@@ -67,6 +85,7 @@
public void delete() {
FileUtils.deleteQuietly(file);
FileUtils.deleteQuietly(getStatusFile());
+ FileUtils.deleteQuietly(getMetaInfoFile());
}
public File getFile() {
@@ -101,6 +120,11 @@
return new File(statusFilePath);
}
+ private File getMetaInfoFile() {
+ String metaInfoFilePath = file.getAbsolutePath() + ".metainfo";
+ return new File(metaInfoFilePath);
+ }
+
public class PackageInputStream extends BufferedInputStream {
private final File file;
diff --git a/src/main/java/org/apache/sling/distribution/packaging/impl/FileDistributionPackageBuilder.java b/src/main/java/org/apache/sling/distribution/packaging/impl/FileDistributionPackageBuilder.java
index a79a7a2..2359b03 100644
--- a/src/main/java/org/apache/sling/distribution/packaging/impl/FileDistributionPackageBuilder.java
+++ b/src/main/java/org/apache/sling/distribution/packaging/impl/FileDistributionPackageBuilder.java
@@ -37,6 +37,7 @@
import org.apache.sling.distribution.common.DistributionException;
import org.apache.sling.distribution.packaging.DistributionPackage;
import org.apache.sling.distribution.packaging.DistributionPackageBuilder;
+import org.apache.sling.distribution.packaging.DistributionPackageInfo;
import org.apache.sling.distribution.serialization.DistributionContentSerializer;
import org.apache.sling.distribution.serialization.DistributionExportFilter;
import org.apache.sling.distribution.serialization.DistributionExportOptions;
@@ -104,7 +105,10 @@
if (digestAlgorithm != null) {
digestMessage = readDigestMessage((DigestOutputStream) outputStream);
}
- distributionPackage = new FileDistributionPackage(file, getType(), digestAlgorithm, digestMessage);
+ DistributionPackageInfo info = new DistributionPackageInfo(getType());
+ DistributionPackageUtils.fillInfo(info, request);
+ distributionPackage = new FileDistributionPackage(file, getType(), digestAlgorithm, digestMessage,
+ info);
} catch (IOException e) {
throw new DistributionException(e);
} finally {
@@ -140,7 +144,7 @@
outputStream.flush();
String digestMessage = readDigestMessage(outputStream);
- distributionPackage = new FileDistributionPackage(file, getType(), digestAlgorithm, digestMessage);
+ distributionPackage = new FileDistributionPackage(file, getType(), digestAlgorithm, digestMessage, null);
} catch (Exception e) {
throw new DistributionException(e);
} finally {
@@ -168,6 +172,6 @@
log.warn("file package does not exist", file.getAbsolutePath());
return null;
}
- return new FileDistributionPackage(file, getType(), null, null);
+ return new FileDistributionPackage(file, getType(), null, null, null);
}
}
diff --git a/src/main/java/org/apache/sling/distribution/packaging/impl/InMemoryDistributionPackage.java b/src/main/java/org/apache/sling/distribution/packaging/impl/InMemoryDistributionPackage.java
index e3a7aed..21ac0ad 100644
--- a/src/main/java/org/apache/sling/distribution/packaging/impl/InMemoryDistributionPackage.java
+++ b/src/main/java/org/apache/sling/distribution/packaging/impl/InMemoryDistributionPackage.java
@@ -21,6 +21,7 @@
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.InputStream;
+import java.util.Map;
import org.apache.sling.distribution.packaging.DistributionPackage;
import org.apache.sling.distribution.packaging.DistributionPackageInfo;
@@ -38,12 +39,16 @@
private final DistributionPackageInfo info;
- public InMemoryDistributionPackage(String id, String type, byte[] data) {
+ public InMemoryDistributionPackage(String id, String type, byte[] data, Map<String, Object> baseInfoMap) {
this.id = id;
this.type = type;
this.data = data;
this.size = data.length;
this.info = new DistributionPackageInfo(type);
+
+ if (null != baseInfoMap) {
+ this.info.putAll(baseInfoMap);
+ }
}
@NotNull
diff --git a/src/main/java/org/apache/sling/distribution/packaging/impl/InMemoryDistributionPackageBuilder.java b/src/main/java/org/apache/sling/distribution/packaging/impl/InMemoryDistributionPackageBuilder.java
index d55b77e..b74b164 100644
--- a/src/main/java/org/apache/sling/distribution/packaging/impl/InMemoryDistributionPackageBuilder.java
+++ b/src/main/java/org/apache/sling/distribution/packaging/impl/InMemoryDistributionPackageBuilder.java
@@ -33,6 +33,7 @@
import org.apache.sling.distribution.DistributionRequest;
import org.apache.sling.distribution.common.DistributionException;
import org.apache.sling.distribution.packaging.DistributionPackage;
+import org.apache.sling.distribution.packaging.DistributionPackageInfo;
import org.apache.sling.distribution.serialization.DistributionContentSerializer;
import org.apache.sling.distribution.serialization.DistributionExportFilter;
import org.apache.sling.distribution.serialization.DistributionExportOptions;
@@ -75,7 +76,9 @@
String packageId = "dstrpck-" + System.currentTimeMillis() + "-" + UUID.randomUUID().toString();
- return new InMemoryDistributionPackage(packageId, getType(), baos.toByteArray());
+ DistributionPackageInfo info = new DistributionPackageInfo(getType());
+ DistributionPackageUtils.fillInfo(info, request);
+ return new InMemoryDistributionPackage(packageId, getType(), baos.toByteArray(), info);
}
@Override
@@ -102,7 +105,7 @@
baos.flush();
byte[] data = baos.toByteArray();
- return new InMemoryDistributionPackage(packageId, getType(), data);
+ return new InMemoryDistributionPackage(packageId, getType(), data, info);
} catch (IOException e) {
throw new DistributionException(e);
}
diff --git a/src/main/java/org/apache/sling/distribution/packaging/impl/ResourceDistributionPackage.java b/src/main/java/org/apache/sling/distribution/packaging/impl/ResourceDistributionPackage.java
index d94ec7b..fbd804a 100644
--- a/src/main/java/org/apache/sling/distribution/packaging/impl/ResourceDistributionPackage.java
+++ b/src/main/java/org/apache/sling/distribution/packaging/impl/ResourceDistributionPackage.java
@@ -22,6 +22,7 @@
import java.io.BufferedInputStream;
import java.io.IOException;
import java.io.InputStream;
+import java.util.Map;
import org.apache.sling.api.resource.PersistenceException;
import org.apache.sling.api.resource.Resource;
@@ -50,14 +51,26 @@
String type,
ResourceResolver resourceResolver,
@Nullable String digestAlgorithm,
- @Nullable String digestMessage) {
+ @Nullable String digestMessage,
+ @Nullable Map<String, Object> baseInfoMap) {
super(resource.getName(), type, digestAlgorithm, digestMessage);
this.resourceResolver = resourceResolver;
ValueMap valueMap = resource.getValueMap();
assert type.equals(valueMap.get("type")) : "wrong resource type";
this.resource = resource;
Object sizeProperty = resource.getValueMap().get("size");
+ Object paths = resource.getValueMap().get(DistributionPackageInfo.PROPERTY_REQUEST_PATHS);
+ Object deepPaths = resource.getValueMap().get(DistributionPackageInfo.PROPERTY_REQUEST_DEEP_PATHS);
this.size = sizeProperty == null ? -1 : Long.parseLong(sizeProperty.toString());
+ if (null != baseInfoMap) {
+ this.getInfo().putAll(baseInfoMap);
+ }
+ if (paths instanceof String[]) {
+ this.getInfo().put(DistributionPackageInfo.PROPERTY_REQUEST_PATHS, (String[])paths);
+ }
+ if (deepPaths instanceof String[]) {
+ this.getInfo().put(DistributionPackageInfo.PROPERTY_REQUEST_DEEP_PATHS, (String[])deepPaths);
+ }
this.getInfo().put(DistributionPackageInfo.PROPERTY_REQUEST_TYPE, DistributionRequestType.ADD);
}
diff --git a/src/main/java/org/apache/sling/distribution/packaging/impl/ResourceDistributionPackageBuilder.java b/src/main/java/org/apache/sling/distribution/packaging/impl/ResourceDistributionPackageBuilder.java
index 919d3d9..8c412aa 100644
--- a/src/main/java/org/apache/sling/distribution/packaging/impl/ResourceDistributionPackageBuilder.java
+++ b/src/main/java/org/apache/sling/distribution/packaging/impl/ResourceDistributionPackageBuilder.java
@@ -44,6 +44,7 @@
import org.apache.sling.distribution.DistributionRequest;
import org.apache.sling.distribution.common.DistributionException;
import org.apache.sling.distribution.packaging.DistributionPackage;
+import org.apache.sling.distribution.packaging.DistributionPackageInfo;
import org.apache.sling.distribution.serialization.DistributionContentSerializer;
import org.apache.sling.distribution.serialization.DistributionExportFilter;
import org.apache.sling.distribution.serialization.DistributionExportOptions;
@@ -124,13 +125,15 @@
try {
inputStream = outputStream.openWrittenDataInputStream();
- packageResource = uploadStream(resourceResolver, packagesRoot, inputStream, outputStream.size());
+ packageResource = uploadStream(resourceResolver, packagesRoot, inputStream, outputStream.size(),
+ request);
} finally {
IOUtils.closeQuietly(inputStream);
outputStream.clean();
}
- distributionPackage = new ResourceDistributionPackage(packageResource, getType(), resourceResolver, digestAlgorithm, digestMessage);
+ distributionPackage = new ResourceDistributionPackage(packageResource, getType(), resourceResolver,
+ digestAlgorithm, digestMessage, null);
} catch (IOException e) {
throw new DistributionException(e);
}
@@ -150,8 +153,8 @@
try {
Resource packagesRoot = DistributionPackageUtils.getPackagesRoot(resourceResolver, packagesPath);
- Resource packageResource = uploadStream(resourceResolver, packagesRoot, inputStream, -1);
- return new ResourceDistributionPackage(packageResource, getType(), resourceResolver, null, null);
+ Resource packageResource = uploadStream(resourceResolver, packagesRoot, inputStream, -1, null);
+ return new ResourceDistributionPackage(packageResource, getType(), resourceResolver, null, null, null);
} catch (PersistenceException e) {
throw new DistributionException(e);
}
@@ -176,7 +179,7 @@
if (packageResource == null) {
return null;
} else {
- return new ResourceDistributionPackage(packageResource, getType(), resourceResolver, null, null);
+ return new ResourceDistributionPackage(packageResource, getType(), resourceResolver, null, null, null);
}
} catch (PersistenceException e) {
return null;
@@ -184,7 +187,8 @@
}
- private Resource uploadStream(ResourceResolver resourceResolver, Resource parent, InputStream stream, long size) throws PersistenceException {
+ private Resource uploadStream(ResourceResolver resourceResolver, Resource parent, InputStream stream,
+ long size, DistributionRequest request) throws PersistenceException {
String name;
log.debug("uploading stream");
@@ -211,6 +215,13 @@
Map<String, Object> props = new HashMap<String, Object>();
props.put(ResourceResolver.PROPERTY_RESOURCE_TYPE, "sling:Folder");
props.put("type", getType());
+ if (null != request) {
+ DistributionPackageInfo info = new DistributionPackageInfo(getType());
+ DistributionPackageUtils.fillInfo(info, request);
+ props.put(DistributionPackageInfo.PROPERTY_REQUEST_PATHS, info.getPaths());
+ props.put(DistributionPackageInfo.PROPERTY_REQUEST_DEEP_PATHS,
+ info.get(DistributionPackageInfo.PROPERTY_REQUEST_DEEP_PATHS));
+ }
if (size != -1) {
props.put("size", size);
@@ -272,7 +283,7 @@
@Override
public ResourceDistributionPackage next() {
Resource packageResource = packages.next();
- return new ResourceDistributionPackage(packageResource, type, resourceResolver, null, null);
+ return new ResourceDistributionPackage(packageResource, type, resourceResolver, null, null, null);
}
@Override
diff --git a/src/main/java/org/apache/sling/distribution/queue/impl/simple/SimpleDistributionQueue.java b/src/main/java/org/apache/sling/distribution/queue/impl/simple/SimpleDistributionQueue.java
index 62e9e49..6b4a2c1 100644
--- a/src/main/java/org/apache/sling/distribution/queue/impl/simple/SimpleDistributionQueue.java
+++ b/src/main/java/org/apache/sling/distribution/queue/impl/simple/SimpleDistributionQueue.java
@@ -27,7 +27,6 @@
import java.util.Map;
import java.util.Queue;
import java.util.Set;
-import java.util.WeakHashMap;
import java.util.concurrent.LinkedBlockingQueue;
import org.apache.sling.distribution.queue.spi.DistributionQueue;
@@ -68,13 +67,13 @@
private final Queue<DistributionQueueItem> queue;
- private final Map<DistributionQueueItem, DistributionQueueItemStatus> statusMap;
+ private final Map<String, DistributionQueueItemStatus> statusMap;
- public SimpleDistributionQueue(String agentName, String name) {
+ public SimpleDistributionQueue(String agentName, String name, Map<String, DistributionQueueItemStatus> statusMap) {
log.debug("starting a simple queue {} for agent {}", name, agentName);
this.name = name;
this.queue = new LinkedBlockingQueue<DistributionQueueItem>();
- this.statusMap = new WeakHashMap<DistributionQueueItem, DistributionQueueItemStatus>(10);
+ this.statusMap = statusMap;
}
@NotNull
@@ -85,17 +84,19 @@
public DistributionQueueEntry add(@NotNull DistributionQueueItem item) {
DistributionQueueItemState itemState = DistributionQueueItemState.ERROR;
boolean result = false;
+ String entryId = item.getPackageId();
try {
result = queue.offer(item);
itemState = DistributionQueueItemState.QUEUED;
} catch (Exception e) {
log.error("cannot add an item to the queue", e);
} finally {
- statusMap.put(item, new DistributionQueueItemStatus(Calendar.getInstance(), itemState, 0, name));
+ statusMap.put(entryId, new DistributionQueueItemStatus(Calendar.getInstance(), itemState, 0, name));
}
if (result) {
- return new DistributionQueueEntry(item.getPackageId(), item, statusMap.get(item));
+
+ return new DistributionQueueEntry(item.getPackageId(), item, statusMap.get(entryId));
}
return null;
@@ -106,10 +107,7 @@
public DistributionQueueEntry getHead() {
DistributionQueueItem element = queue.peek();
if (element != null) {
- DistributionQueueItemStatus itemState = statusMap.get(element);
- statusMap.put(element, new DistributionQueueItemStatus(itemState.getEntered(),
- itemState.getItemState(),
- itemState.getAttempts() + 1, name));
+ DistributionQueueItemStatus itemState = statusMap.get(element.getPackageId());
return new DistributionQueueEntry(element.getPackageId(), element, itemState);
}
@@ -119,7 +117,7 @@
@NotNull
private DistributionQueueState getState() {
DistributionQueueItem firstItem = queue.peek();
- DistributionQueueItemStatus firstItemStatus = firstItem != null ? statusMap.get(firstItem) : null;
+ DistributionQueueItemStatus firstItemStatus = firstItem != null ? statusMap.get(firstItem.getPackageId()) : null;
return DistributionQueueUtils.calculateState(firstItem, firstItemStatus);
}
@@ -146,7 +144,7 @@
List<DistributionQueueEntry> result = new ArrayList<DistributionQueueEntry>();
for (DistributionQueueItem item : queue) {
- result.add(new DistributionQueueEntry(item.getPackageId(), item, statusMap.get(item)));
+ result.add(new DistributionQueueEntry(item.getPackageId(), item, statusMap.get(item.getPackageId())));
}
return result;
}
@@ -155,7 +153,7 @@
public DistributionQueueEntry getEntry(@NotNull String id) {
for (DistributionQueueItem item : queue) {
if (id.equals(item.getPackageId())) {
- return new DistributionQueueEntry(id, item, statusMap.get(item));
+ return new DistributionQueueEntry(id, item, statusMap.get(item.getPackageId()));
}
}
@@ -182,6 +180,7 @@
boolean removed = false;
if (toRemove != null) {
removed = queue.remove(toRemove.getItem());
+ statusMap.remove(id);
}
log.debug("item with id {} removed from the queue: {}", id, removed);
if (removed) {
diff --git a/src/main/java/org/apache/sling/distribution/queue/impl/simple/SimpleDistributionQueueProcessor.java b/src/main/java/org/apache/sling/distribution/queue/impl/simple/SimpleDistributionQueueProcessor.java
index 54c37eb..462895e 100644
--- a/src/main/java/org/apache/sling/distribution/queue/impl/simple/SimpleDistributionQueueProcessor.java
+++ b/src/main/java/org/apache/sling/distribution/queue/impl/simple/SimpleDistributionQueueProcessor.java
@@ -19,7 +19,11 @@
package org.apache.sling.distribution.queue.impl.simple;
import org.apache.sling.distribution.queue.spi.DistributionQueue;
+
+import java.util.Map;
+
import org.apache.sling.distribution.queue.DistributionQueueEntry;
+import org.apache.sling.distribution.queue.DistributionQueueItemStatus;
import org.apache.sling.distribution.queue.impl.DistributionQueueProcessor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -32,17 +36,23 @@
private final Logger log = LoggerFactory.getLogger(getClass());
private final DistributionQueue queue;
private final DistributionQueueProcessor queueProcessor;
+ private final Map<String, DistributionQueueItemStatus> statusMap;
public SimpleDistributionQueueProcessor(DistributionQueue queue,
- DistributionQueueProcessor queueProcessor) {
+ DistributionQueueProcessor queueProcessor,
+ Map<String, DistributionQueueItemStatus> statusMap) {
this.queue = queue;
this.queueProcessor = queueProcessor;
+ this.statusMap = statusMap;
}
public void run() {
try {
DistributionQueueEntry entry;
while ((entry = queue.getHead()) != null) {
+ DistributionQueueItemStatus itemStatus = entry.getStatus();
+ statusMap.put(entry.getId(), new DistributionQueueItemStatus(itemStatus.getEntered(),
+ itemStatus.getItemState(), itemStatus.getAttempts() + 1, queue.getName()));
if (queueProcessor.process(queue.getName(), entry)) {
if (queue.remove(entry.getId()) != null) {
log.debug("item {} processed and removed from the queue", entry.getItem());
diff --git a/src/main/java/org/apache/sling/distribution/queue/impl/simple/SimpleDistributionQueueProvider.java b/src/main/java/org/apache/sling/distribution/queue/impl/simple/SimpleDistributionQueueProvider.java
index 8622fdc..91ca984 100644
--- a/src/main/java/org/apache/sling/distribution/queue/impl/simple/SimpleDistributionQueueProvider.java
+++ b/src/main/java/org/apache/sling/distribution/queue/impl/simple/SimpleDistributionQueueProvider.java
@@ -26,6 +26,7 @@
import java.io.FilenameFilter;
import java.util.Collection;
import java.util.Map;
+import java.util.WeakHashMap;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.commons.io.IOUtils;
@@ -34,6 +35,7 @@
import org.apache.sling.commons.scheduler.Scheduler;
import org.apache.sling.distribution.queue.spi.DistributionQueue;
import org.apache.sling.distribution.queue.DistributionQueueItem;
+import org.apache.sling.distribution.queue.DistributionQueueItemStatus;
import org.apache.sling.distribution.queue.impl.DistributionQueueProcessor;
import org.apache.sling.distribution.queue.impl.DistributionQueueProvider;
import org.apache.sling.distribution.queue.DistributionQueueType;
@@ -56,6 +58,8 @@
private final Scheduler scheduler;
private final Map<String, SimpleDistributionQueue> queueMap = new ConcurrentHashMap<String, SimpleDistributionQueue>();
+ private final Map<SimpleDistributionQueue, Map<String, DistributionQueueItemStatus>> statusMap
+ = new WeakHashMap<SimpleDistributionQueue, Map<String, DistributionQueueItemStatus>>();
private final boolean checkpoint;
private File checkpointDirectory;
@@ -75,7 +79,8 @@
if (!checkpointDirectory.exists()) {
created = checkpointDirectory.mkdir();
}
- log.info("checkpoint directory created: {}, exists {}", created, checkpointDirectory.isDirectory() && checkpointDirectory.exists());
+ log.info("checkpoint directory created: {}, exists {}", created,
+ checkpointDirectory.isDirectory() && checkpointDirectory.exists());
}
this.scheduler = scheduler;
@@ -89,8 +94,11 @@
SimpleDistributionQueue queue = queueMap.get(key);
if (queue == null) {
log.debug("creating a queue with key {}", key);
- queue = new SimpleDistributionQueue(name, queueName);
+ Map<String, DistributionQueueItemStatus> queueStatusMap
+ = new ConcurrentHashMap<String, DistributionQueueItemStatus>();
+ queue = new SimpleDistributionQueue(name, queueName, queueStatusMap);
queueMap.put(key, queue);
+ statusMap.put(queue, queueStatusMap);
log.debug("queue created {}", queue);
}
return queue;
@@ -125,7 +133,7 @@
try {
LineIterator lineIterator = IOUtils.lineIterator(new FileReader(qf));
while (lineIterator.hasNext()) {
- String line = lineIterator.nextLine();
+ String line = lineIterator.nextLine();
DistributionQueueItem item = mapper.readQueueItem(line);
queue.add(item);
}
@@ -140,19 +148,18 @@
// enable checkpointing
for (String queueName : queueNames) {
- ScheduleOptions options = scheduler.NOW(-1, 15)
- .canRunConcurrently(false)
+ ScheduleOptions options = scheduler.NOW(-1, 15).canRunConcurrently(false)
.name(getJobName(queueName + "-checkpoint"));
- scheduler.schedule(new SimpleDistributionQueueCheckpoint(getQueue(queueName), checkpointDirectory), options);
+ scheduler.schedule(new SimpleDistributionQueueCheckpoint(getQueue(queueName), checkpointDirectory),
+ options);
}
}
// enable processing
for (String queueName : queueNames) {
- ScheduleOptions options = scheduler.NOW(-1, 1)
- .canRunConcurrently(false)
- .name(getJobName(queueName));
- scheduler.schedule(new SimpleDistributionQueueProcessor(getQueue(queueName), queueProcessor), options);
+ ScheduleOptions options = scheduler.NOW(-1, 1).canRunConcurrently(false).name(getJobName(queueName));
+ scheduler.schedule(new SimpleDistributionQueueProcessor(getQueue(queueName), queueProcessor,
+ statusMap.get(getQueue(queueName))), options);
}
}
diff --git a/src/main/java/org/apache/sling/distribution/transport/impl/SimpleHttpDistributionTransport.java b/src/main/java/org/apache/sling/distribution/transport/impl/SimpleHttpDistributionTransport.java
index 3f7bd51..0e0cb92 100644
--- a/src/main/java/org/apache/sling/distribution/transport/impl/SimpleHttpDistributionTransport.java
+++ b/src/main/java/org/apache/sling/distribution/transport/impl/SimpleHttpDistributionTransport.java
@@ -20,7 +20,6 @@
import java.io.InputStream;
import java.net.URI;
-import java.util.Collections;
import java.util.Map;
import java.util.UUID;
import org.apache.commons.io.IOUtils;
@@ -32,8 +31,6 @@
import org.apache.http.client.fluent.Response;
import org.apache.http.conn.HttpHostConnectException;
import org.apache.http.entity.ContentType;
-import org.apache.http.impl.client.HttpClientBuilder;
-import org.apache.http.impl.client.HttpClients;
import org.apache.http.message.BasicHeader;
import org.apache.http.protocol.HTTP;
import org.apache.sling.api.resource.ResourceResolver;
@@ -128,6 +125,11 @@
.addHeader(HTTP.CONN_DIRECTIVE, HTTP.CONN_CLOSE)
.useExpectContinue();
+ String authorizationHeader = getAuthSecret();
+ if (null != authorizationHeader) {
+ req.addHeader(new BasicHeader(HttpHeaders.AUTHORIZATION, authorizationHeader));
+ }
+
// add the message body digest, see https://tools.ietf.org/html/rfc3230#section-4.3.2
if (distributionPackage instanceof AbstractDistributionPackage) {
AbstractDistributionPackage adb = (AbstractDistributionPackage) distributionPackage;
@@ -222,27 +224,32 @@
return executor;
}
- private Executor buildAuthExecutor(String authorizationHeader) {
- HttpClientBuilder builder = HttpClients.custom();
- builder.setDefaultHeaders(Collections.singletonList(new BasicHeader(HttpHeaders.AUTHORIZATION, authorizationHeader)));
- Executor executor = Executor.newInstance(builder.build());
- log.debug("set Authorization header, endpoint={}", distributionEndpoint.getUri());
- return executor;
- }
-
- private Executor buildAuthExecutor(@NotNull Map<String, String> credentialsMap) {
- return (credentialsMap.containsKey(AUTHORIZATION))
- ? buildAuthExecutor(credentialsMap.get(AUTHORIZATION))
- : buildAuthExecutor(credentialsMap.get(USERNAME), credentialsMap.get(PASSWORD));
- }
-
- private Executor buildExecutor() {
- DistributionTransportSecret secret = secretProvider.getSecret(distributionEndpoint.getUri());
- Map<String, String> credentialsMap = secret.asCredentialsMap();
- return (credentialsMap != null)
- ? buildAuthExecutor(credentialsMap)
+ private Executor buildAuthExecutor(Map<String, String> credentialsMap) {
+ return (null != credentialsMap && !credentialsMap.containsKey(AUTHORIZATION))
+ ? buildAuthExecutor(credentialsMap.get(USERNAME), credentialsMap.get(PASSWORD))
: Executor.newInstance();
}
+ private Executor buildExecutor() {
+ Map<String, String> credentialsMap = getCredentialsMap();
+ return buildAuthExecutor(credentialsMap);
+ }
+
+ private String getAuthSecret() {
+ Map<String, String> credentialsMap = getCredentialsMap();
+ if (null != credentialsMap && credentialsMap.containsKey(AUTHORIZATION)) {
+ return credentialsMap.get(AUTHORIZATION);
+ }
+ return null;
+ }
+
+ private Map<String, String> getCredentialsMap() {
+ DistributionTransportSecret secret = secretProvider.getSecret(distributionEndpoint.getUri());
+ Map<String, String> credentialsMap = null;
+ if (null != secret) {
+ credentialsMap = secret.asCredentialsMap();
+ }
+ return credentialsMap;
+ }
}
diff --git a/src/test/java/org/apache/sling/distribution/agent/impl/SimpleDistributionAgentTest.java b/src/test/java/org/apache/sling/distribution/agent/impl/SimpleDistributionAgentTest.java
index e729c7c..433d2ff 100644
--- a/src/test/java/org/apache/sling/distribution/agent/impl/SimpleDistributionAgentTest.java
+++ b/src/test/java/org/apache/sling/distribution/agent/impl/SimpleDistributionAgentTest.java
@@ -21,7 +21,7 @@
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
-
+import java.util.HashMap;
import org.apache.sling.api.resource.ResourceResolver;
import org.apache.sling.api.resource.ResourceResolverFactory;
import org.apache.sling.distribution.DistributionRequest;
@@ -125,7 +125,7 @@
when(queueProvider.getQueue(DistributionQueueDispatchingStrategy.DEFAULT_QUEUE_NAME)).thenReturn(
- new SimpleDistributionQueue(name, "name"));
+ new SimpleDistributionQueue(name, "name", new HashMap<String, DistributionQueueItemStatus>()));
DistributionResponse response = agent.execute(resourceResolver, request);
assertNotNull(response);
assertEquals("[ERROR]", response.getMessage());
@@ -165,7 +165,7 @@
}
}).when(packageExporter).exportPackages(any(ResourceResolver.class), any(DistributionRequest.class), any(DistributionPackageProcessor.class));
when(queueProvider.getQueue(DistributionQueueDispatchingStrategy.DEFAULT_QUEUE_NAME)).thenReturn(
- new SimpleDistributionQueue(name, "name"));
+ new SimpleDistributionQueue(name, "name", new HashMap<String, DistributionQueueItemStatus>()));
DistributionResponse response = agent.execute(resourceResolver, request);
assertNotNull(response);
assertEquals("[QUEUED]", response.getMessage());
@@ -205,7 +205,7 @@
return null;
}
}).when(packageExporter).exportPackages(any(ResourceResolver.class), any(DistributionRequest.class), any(DistributionPackageProcessor.class)); when(queueProvider.getQueue(DistributionQueueDispatchingStrategy.DEFAULT_QUEUE_NAME)).thenReturn(
- new SimpleDistributionQueue(name, "name"));
+ new SimpleDistributionQueue(name, "name", new HashMap<String, DistributionQueueItemStatus>()));
agent.execute(resourceResolver, request);
}
@@ -298,7 +298,7 @@
return null;
}
}).when(packageExporter).exportPackages(any(ResourceResolver.class), any(DistributionRequest.class), any(DistributionPackageProcessor.class)); when(queueProvider.getQueue(DistributionQueueDispatchingStrategy.DEFAULT_QUEUE_NAME)).thenReturn(
- new SimpleDistributionQueue(name, "name"));
+ new SimpleDistributionQueue(name, "name", new HashMap<String, DistributionQueueItemStatus>()));
DistributionResponse response = agent.execute(resourceResolver, request);
@@ -344,7 +344,7 @@
return null;
}
}).when(packageExporter).exportPackages(any(ResourceResolver.class), any(DistributionRequest.class), any(DistributionPackageProcessor.class)); when(queueProvider.getQueue(DistributionQueueDispatchingStrategy.DEFAULT_QUEUE_NAME)).thenReturn(
- new SimpleDistributionQueue(name, "name"));
+ new SimpleDistributionQueue(name, "name", new HashMap<String, DistributionQueueItemStatus>()));
DistributionResponse response = agent.execute(resourceResolver, request);
diff --git a/src/test/java/org/apache/sling/distribution/packaging/impl/FileDistributionPackageBuilderTest.java b/src/test/java/org/apache/sling/distribution/packaging/impl/FileDistributionPackageBuilderTest.java
index ea6072c..5948b09 100644
--- a/src/test/java/org/apache/sling/distribution/packaging/impl/FileDistributionPackageBuilderTest.java
+++ b/src/test/java/org/apache/sling/distribution/packaging/impl/FileDistributionPackageBuilderTest.java
@@ -20,16 +20,20 @@
package org.apache.sling.distribution.packaging.impl;
import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
+import java.util.Arrays;
import org.apache.sling.api.resource.ResourceResolver;
import org.apache.sling.distribution.DistributionRequest;
import org.apache.sling.distribution.common.DistributionException;
import org.apache.sling.distribution.packaging.DistributionPackage;
+import org.apache.sling.distribution.packaging.DistributionPackageInfo;
import org.apache.sling.distribution.serialization.DistributionContentSerializer;
import org.apache.sling.distribution.serialization.DistributionExportOptions;
import org.junit.Test;
@@ -38,14 +42,37 @@
@Test
public void testDefaultTempDirectory() throws DistributionException, IOException {
+ final String testPath = "/a/test/path";
+ final String testDeepPath = "/a/deep/test/path";
+ final String[] requestPaths = {testPath, testDeepPath};
+ DistributionRequest mockRequest = mock(DistributionRequest.class);
+ when(mockRequest.getPaths()).thenReturn(requestPaths);
+ when(mockRequest.isDeep(testDeepPath)).thenReturn(true);
+ when(mockRequest.isDeep(testPath)).thenReturn(false);
+
FileDistributionPackageBuilder builder = new FileDistributionPackageBuilder("test", new TestSerializer(), null, null, new String[0],
new String[0]);
- DistributionPackage createdPackage = builder.createPackageForAdd(mock(ResourceResolver.class), mock(DistributionRequest.class));
+
+ DistributionPackage createdPackage = builder.createPackageForAdd(mock(ResourceResolver.class), mockRequest);
try {
assertNotNull(createdPackage.createInputStream());
DistributionPackage gotPackage = builder.getPackageInternal(mock(ResourceResolver.class), createdPackage.getId());
assertNotNull(gotPackage.createInputStream()); // this will throw an exception when the file doesn't exist
+ final String[] createdPackagePaths = createdPackage.getInfo().getPaths();
+ final String[] gotPackagePaths = gotPackage.getInfo().getPaths();
+ final String[] createdPackageDeepPaths = (String[]) createdPackage.getInfo()
+ .get(DistributionPackageInfo.PROPERTY_REQUEST_DEEP_PATHS);
+ final String[] gotPackageDeepPaths = (String[]) gotPackage.getInfo()
+ .get(DistributionPackageInfo.PROPERTY_REQUEST_DEEP_PATHS);
+ assertTrue("packaged Paths at createPackage and getPackage not consistent. "
+ + "expected " + Arrays.toString(createdPackagePaths)
+ + ", found " + Arrays.toString(gotPackagePaths),
+ Arrays.equals(createdPackagePaths, gotPackagePaths));
+ assertTrue("packaged deep Paths at createPackage and getPackage not consistent. "
+ + "expected " + Arrays.toString(createdPackageDeepPaths)
+ + ", found " + Arrays.toString(gotPackageDeepPaths),
+ Arrays.equals(createdPackageDeepPaths, gotPackageDeepPaths));
} finally {
createdPackage.delete();
}
diff --git a/src/test/java/org/apache/sling/distribution/packaging/impl/InMemoryDistributionPackageBuilderTest.java b/src/test/java/org/apache/sling/distribution/packaging/impl/InMemoryDistributionPackageBuilderTest.java
index f20dc66..463cbd9 100644
--- a/src/test/java/org/apache/sling/distribution/packaging/impl/InMemoryDistributionPackageBuilderTest.java
+++ b/src/test/java/org/apache/sling/distribution/packaging/impl/InMemoryDistributionPackageBuilderTest.java
@@ -21,7 +21,6 @@
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
-
import org.apache.commons.io.IOUtils;
import org.apache.sling.api.resource.ResourceResolver;
import org.apache.sling.distribution.DistributionRequestType;
@@ -40,8 +39,8 @@
@Test
public void testCreatePackage() throws Exception {
InMemoryDistributionPackageBuilder builder = new InMemoryDistributionPackageBuilder("name", new InMemDistributionContentSerializer(), new String[0], new String[0]);
- DistributionPackage pkg = builder.createPackageForAdd(mock(ResourceResolver.class), new SimpleDistributionRequest(DistributionRequestType.ADD, false, "/test"));
- assertNotNull(pkg.createInputStream());
+ DistributionPackage createdPkg = builder.createPackageForAdd(mock(ResourceResolver.class), new SimpleDistributionRequest(DistributionRequestType.ADD, false, "/test"));
+ assertNotNull(createdPkg.createInputStream());
}
private final class InMemDistributionContentSerializer implements DistributionContentSerializer {
diff --git a/src/test/java/org/apache/sling/distribution/packaging/impl/InMemoryDistributionPackageTest.java b/src/test/java/org/apache/sling/distribution/packaging/impl/InMemoryDistributionPackageTest.java
index 1737d63..9d778e8 100644
--- a/src/test/java/org/apache/sling/distribution/packaging/impl/InMemoryDistributionPackageTest.java
+++ b/src/test/java/org/apache/sling/distribution/packaging/impl/InMemoryDistributionPackageTest.java
@@ -18,30 +18,51 @@
*/
package org.apache.sling.distribution.packaging.impl;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
import java.util.Random;
import org.apache.sling.distribution.packaging.DistributionPackage;
+import org.apache.sling.distribution.packaging.DistributionPackageInfo;
import org.junit.Assert;
import org.junit.Test;
public class InMemoryDistributionPackageTest {
+ @SuppressWarnings("serial")
@Test
public void testGetInfo() throws Exception {
int size = 1000;
byte[] data = new byte[size];
+ final String testPath = "/a/test/path";
+ final String testDeepPath = "/a/test/deepPath";
new Random().nextBytes(data);
- DistributionPackage pkg = new InMemoryDistributionPackage("id", "type", data);
+ Map <String, Object> baseInfoMap = new HashMap<String, Object>() {{
+ put(DistributionPackageInfo.PROPERTY_REQUEST_PATHS, new String[] {testPath, testDeepPath});
+ put(DistributionPackageInfo.PROPERTY_REQUEST_DEEP_PATHS, new String[] {testDeepPath});
+ }};
+ DistributionPackage pkg = new InMemoryDistributionPackage("id", "type", data, baseInfoMap);
Assert.assertEquals("type", pkg.getType());
Assert.assertEquals("id", pkg.getId());
Assert.assertEquals(size, pkg.getSize());
+ Assert.assertTrue("DistributionRequest provided Paths and those retrieved from"
+ + "DistributionPackage.getInfo() don't match",
+ Arrays.equals(new String[] {testPath, testDeepPath},
+ (String[])pkg.getInfo().get(DistributionPackageInfo.PROPERTY_REQUEST_PATHS))
+ );
+ Assert.assertTrue("DistributionRequest deep Paths and those retrieved from"
+ + "DistributionPackage.getInfo() don't match",
+ Arrays.equals(new String[] {testDeepPath},
+ (String[]) pkg.getInfo().get(DistributionPackageInfo.PROPERTY_REQUEST_DEEP_PATHS))
+ );
}
@Test
public void testCreateInputStream() throws Exception {
byte[] data = new byte[1000];
new Random().nextBytes(data);
- DistributionPackage pkg = new InMemoryDistributionPackage("id", "type", data);
+ DistributionPackage pkg = new InMemoryDistributionPackage("id", "type", data, null);
Assert.assertNotEquals(pkg.createInputStream(), pkg.createInputStream());
}
}
\ No newline at end of file
diff --git a/src/test/java/org/apache/sling/distribution/packaging/impl/ResourceDistributionPackageBuilderTest.java b/src/test/java/org/apache/sling/distribution/packaging/impl/ResourceDistributionPackageBuilderTest.java
new file mode 100644
index 0000000..3850ae5
--- /dev/null
+++ b/src/test/java/org/apache/sling/distribution/packaging/impl/ResourceDistributionPackageBuilderTest.java
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.sling.distribution.packaging.impl;
+
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.Arrays;
+
+import org.apache.sling.api.resource.ResourceResolver;
+import org.apache.sling.distribution.DistributionRequest;
+import org.apache.sling.distribution.common.DistributionException;
+import org.apache.sling.distribution.packaging.DistributionPackage;
+import org.apache.sling.distribution.packaging.DistributionPackageInfo;
+import org.apache.sling.distribution.serialization.DistributionContentSerializer;
+import org.apache.sling.distribution.serialization.DistributionExportOptions;
+import org.apache.sling.distribution.util.impl.FileBackedMemoryOutputStream.MemoryUnit;
+import org.apache.sling.testing.mock.osgi.MockOsgi;
+import org.apache.sling.testing.mock.sling.MockSling;
+import org.apache.sling.testing.mock.sling.ResourceResolverType;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.osgi.framework.BundleContext;
+
+public class ResourceDistributionPackageBuilderTest {
+ BundleContext bundleContext = null;
+ ResourceResolver resolver = null;
+
+ @Test
+ public void testResourceDistributionBuilder() throws DistributionException, IOException {
+ final String testPath = "/a/test/path";
+ final String testDeepPath = "/a/deep/test/path";
+ final String[] requestPaths = {testPath, testDeepPath};
+
+ DistributionRequest mockRequest = mock(DistributionRequest.class);
+ when(mockRequest.getPaths()).thenReturn(requestPaths);
+ when(mockRequest.isDeep(testDeepPath)).thenReturn(true);
+ when(mockRequest.isDeep(testPath)).thenReturn(false);
+
+ ResourceDistributionPackageBuilder builder = new ResourceDistributionPackageBuilder("test",
+ new TestSerializer(), null, 0, MemoryUnit.valueOf("MEGA_BYTES"), false, null,
+ new String[0],new String[0]);
+
+ DistributionPackage createdPackage = builder.createPackageForAdd(resolver, mockRequest);
+
+ InputStream createdPackageContentIS = createdPackage.createInputStream();
+ assertNotNull("Couldn't create stream from DistributionPackage", createdPackageContentIS);
+ // create a new package from the stream of created-package
+ assertNotNull("Couldn't read stream from DistributionPackage",
+ builder.readPackage(resolver, createdPackageContentIS));
+
+ try {
+ DistributionPackage gotPackage = builder.getPackageInternal(resolver, createdPackage.getId());
+ final String[] createdPackagePaths = createdPackage.getInfo().getPaths();
+ final String[] gotPackagePaths = gotPackage.getInfo().getPaths();
+ final String[] createdPackageDeepPaths = (String[]) createdPackage.getInfo()
+ .get(DistributionPackageInfo.PROPERTY_REQUEST_DEEP_PATHS);
+ final String[] gotPackageDeepPaths = (String[]) gotPackage.getInfo()
+ .get(DistributionPackageInfo.PROPERTY_REQUEST_DEEP_PATHS);
+ assertTrue("packaged Paths at createPackage and getPackage not consistent. "
+ + "expected " + Arrays.toString(createdPackagePaths)
+ + ", found " + Arrays.toString(gotPackagePaths),
+ Arrays.equals(createdPackagePaths, gotPackagePaths));
+ assertTrue("packaged deep Paths at createPackage and getPackage not consistent. "
+ + "expected " + Arrays.toString(createdPackageDeepPaths)
+ + ", found " + Arrays.toString(gotPackageDeepPaths),
+ Arrays.equals(createdPackageDeepPaths, gotPackageDeepPaths));
+ } finally {
+ createdPackage.delete();
+ }
+ }
+
+ class TestSerializer implements DistributionContentSerializer {
+
+ @Override public void exportToStream(ResourceResolver resourceResolver, DistributionExportOptions exportOptions,
+ OutputStream outputStream) throws DistributionException {
+ try {
+ outputStream.write("test".getBytes());
+ } catch (IOException ex) {
+ throw new DistributionException(ex);
+ }
+ }
+
+ @Override public void importFromStream(ResourceResolver resourceResolver, InputStream inputStream) throws DistributionException {
+ throw new DistributionException("unsupported");
+ }
+
+ @Override public String getName() {
+ return "test";
+ }
+
+ @Override public boolean isRequestFiltering() {
+ return true;
+ }
+ }
+
+ @Before
+ public void setUp() {
+ bundleContext = MockOsgi.newBundleContext();
+ MockSling.setAdapterManagerBundleContext(bundleContext);
+ resolver = MockSling.newResourceResolver(ResourceResolverType.JCR_MOCK, bundleContext);
+ }
+
+ @After
+ public void tearDown() {
+ resolver.close();
+ MockSling.clearAdapterManagerBundleContext();
+ }
+}
diff --git a/src/test/java/org/apache/sling/distribution/queue/impl/simple/SimpleDistributionQueueProcessorTest.java b/src/test/java/org/apache/sling/distribution/queue/impl/simple/SimpleDistributionQueueProcessorTest.java
index 42128c8..b219f2d 100644
--- a/src/test/java/org/apache/sling/distribution/queue/impl/simple/SimpleDistributionQueueProcessorTest.java
+++ b/src/test/java/org/apache/sling/distribution/queue/impl/simple/SimpleDistributionQueueProcessorTest.java
@@ -21,7 +21,6 @@
import java.util.Collection;
import java.util.HashMap;
import java.util.LinkedList;
-
import org.apache.sling.distribution.queue.DistributionQueueItemState;
import org.apache.sling.distribution.queue.DistributionQueueItemStatus;
import org.apache.sling.distribution.queue.spi.DistributionQueue;
@@ -43,7 +42,7 @@
DistributionQueue queue = mock(DistributionQueue.class);
DistributionQueueProcessor queueProcessor = mock(DistributionQueueProcessor.class);
SimpleDistributionQueueProcessor simpleDistributionQueueProcessor = new SimpleDistributionQueueProcessor(
- queue, queueProcessor);
+ queue, queueProcessor, new HashMap<String, DistributionQueueItemStatus>());
simpleDistributionQueueProcessor.run();
}
@@ -57,7 +56,7 @@
when(queueProvider.getQueues()).thenReturn(queues);
DistributionQueueProcessor queueProcessor = mock(DistributionQueueProcessor.class);
SimpleDistributionQueueProcessor simpleDistributionQueueProcessor = new SimpleDistributionQueueProcessor(
- queue, queueProcessor);
+ queue, queueProcessor, new HashMap<String, DistributionQueueItemStatus>());
simpleDistributionQueueProcessor.run();
}
@@ -74,7 +73,7 @@
when(queueProvider.getQueues()).thenReturn(queues);
DistributionQueueProcessor queueProcessor = mock(DistributionQueueProcessor.class);
SimpleDistributionQueueProcessor simpleDistributionQueueProcessor = new SimpleDistributionQueueProcessor(
- queue, queueProcessor);
+ queue, queueProcessor, new HashMap<String, DistributionQueueItemStatus>());
simpleDistributionQueueProcessor.run();
}
}
\ No newline at end of file
diff --git a/src/test/java/org/apache/sling/distribution/queue/impl/simple/SimpleDistributionQueueTest.java b/src/test/java/org/apache/sling/distribution/queue/impl/simple/SimpleDistributionQueueTest.java
index b43ef88..aadd3a4 100644
--- a/src/test/java/org/apache/sling/distribution/queue/impl/simple/SimpleDistributionQueueTest.java
+++ b/src/test/java/org/apache/sling/distribution/queue/impl/simple/SimpleDistributionQueueTest.java
@@ -19,7 +19,6 @@
package org.apache.sling.distribution.queue.impl.simple;
import java.util.HashMap;
-
import org.apache.sling.distribution.queue.spi.DistributionQueue;
import org.apache.sling.distribution.queue.DistributionQueueEntry;
import org.apache.sling.distribution.queue.DistributionQueueItem;
@@ -39,7 +38,8 @@
@Test
public void testPackageAddition() throws Exception {
- DistributionQueue queue = new SimpleDistributionQueue("agentName", "default");
+ DistributionQueue queue = new SimpleDistributionQueue("agentName", "default",
+ new HashMap<String, DistributionQueueItemStatus>());
DistributionQueueItem pkg = new DistributionQueueItem("packageId", new HashMap<String, Object>());
assertNotNull(queue.add(pkg));
assertFalse(queue.getStatus().isEmpty());
@@ -47,7 +47,8 @@
@Test
public void testPackageAdditionAndRemoval() throws Exception {
- DistributionQueue queue = new SimpleDistributionQueue("agentName", "default");
+ DistributionQueue queue = new SimpleDistributionQueue("agentName", "default",
+ new HashMap<String, DistributionQueueItemStatus>());
DistributionQueueItem pkg = new DistributionQueueItem("id", new HashMap<String, Object>());
assertNotNull(queue.add(pkg));
assertFalse(queue.getStatus().isEmpty());
@@ -59,7 +60,8 @@
@Test
public void testPackageAdditionRetrievalAndRemoval() throws Exception {
- DistributionQueue queue = new SimpleDistributionQueue("agentName", "default");
+ DistributionQueue queue = new SimpleDistributionQueue("agentName", "default",
+ new HashMap<String, DistributionQueueItemStatus>());
DistributionQueueItem pkg = new DistributionQueueItem("id", new HashMap<String, Object>());
assertNotNull(queue.add(pkg));
assertFalse(queue.getStatus().isEmpty());
@@ -69,7 +71,7 @@
assertNotNull(queue.remove(pkg.getPackageId()));
assertTrue(queue.getStatus().isEmpty());
assertNotNull(status);
- assertEquals(1, status.getAttempts());
+ assertEquals(0, status.getAttempts());
}
}