SLING-8996 - Fix error in clear callback. Improve test and logging
diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueue.java b/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueue.java
index 93f0dca..9bfd4fd 100644
--- a/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueue.java
+++ b/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueue.java
@@ -47,9 +47,13 @@
import org.apache.sling.distribution.queue.DistributionQueueStatus;
import org.apache.sling.distribution.queue.DistributionQueueType;
import org.apache.sling.distribution.queue.spi.DistributionQueue;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
@ParametersAreNonnullByDefault
public class PubQueue implements DistributionQueue {
+
+ private final Logger log = LoggerFactory.getLogger(this.getClass());
private final String queueName;
@@ -134,6 +138,7 @@
* Until then, the REMOVABLE capability is provided
* but only allows to remove the head of the queue.
*/
+ log.info("Removing queue entry {}", entryId);
DistributionQueueEntry headEntry = getHead();
if (headEntry != null) {
if (headEntry.getId().equals(entryId)) {
@@ -159,6 +164,7 @@
* which clears from the head entry to the entry
* provided with the max offset (tailEntry).
*/
+ log.info("Removing queue entries {}", entryIds);
Optional<String> tailEntryId = entryIds.stream()
.max((e1, e2) -> compare(EntryUtil.entryOffset(e1), EntryUtil.entryOffset(e2)));
return (tailEntryId.isPresent())
@@ -214,6 +220,7 @@
}
private Iterable<DistributionQueueEntry> clear(String tailEntryId) {
+ log.info("Clearing up to tail queue entry {}", tailEntryId);
List<DistributionQueueEntry> removed = new ArrayList<>();
for (DistributionQueueEntry entry : getEntries(0, -1)) {
removed.add(entry);
@@ -229,7 +236,8 @@
if (clearCallback == null) {
throw new UnsupportedOperationException();
}
- clearCallback.clear(EntryUtil.entryOffset(tailEntry.getId()));
+ long offset = EntryUtil.entryOffset(tailEntry.getId());
+ clearCallback.clear(offset);
}
}
diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueueProviderImpl.java b/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueueProviderImpl.java
index 8b253b7..3b7feab 100644
--- a/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueueProviderImpl.java
+++ b/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueueProviderImpl.java
@@ -108,7 +108,7 @@
@Override
public DistributionQueue getQueue(String pubAgentName, String subSlingId, String subAgentName, String queueName, long minOffset, int headRetries, boolean editable) {
OffsetQueue<DistributionQueueItem> agentQueue = pubQueueCacheService.getOffsetQueue(pubAgentName, minOffset);
- ClearCallback editableCallback = offset -> sendClearCommand(subSlingId, subAgentName, minOffset);
+ ClearCallback editableCallback = offset -> sendClearCommand(subSlingId, subAgentName, offset);
ClearCallback callback = editable ? editableCallback : null;
return new PubQueue(queueName, agentQueue.getMinOffsetQueue(minOffset), headRetries, callback);
}
@@ -152,6 +152,7 @@
.setSubAgentName(subAgentName)
.setClearCommand(clearCommand)
.build();
+ LOG.info("Sending clear command to subSlingId: {}, subAgentName: {} with offset {}.", subSlingId, subAgentName, offset);
sender.send(topics.getCommandTopic(), commandMessage);
}
diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/CommandPoller.java b/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/CommandPoller.java
index e56828f..f9d047b 100644
--- a/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/CommandPoller.java
+++ b/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/CommandPoller.java
@@ -81,8 +81,9 @@
}
private void handleClearCommand(long offset) {
- updateClearOffsetIfLarger(offset);
- LOG.info("Handled clear command for offset {}", offset);
+ long oldOffset = clearOffset.get();
+ long newOffset = updateClearOffsetIfLarger(offset);
+ LOG.info("Handled clear command for offset {}. Old clear offset was {}, new clear offset is {}.", offset, oldOffset, newOffset);
}
private long updateClearOffsetIfLarger(long offset) {
diff --git a/src/test/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueueTest.java b/src/test/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueueTest.java
index 25c15bf..9872ad9 100644
--- a/src/test/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueueTest.java
+++ b/src/test/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueueTest.java
@@ -18,134 +18,130 @@
*/
package org.apache.sling.distribution.journal.impl.queue.impl;
+import static org.apache.sling.distribution.journal.impl.queue.QueueItemFactory.RECORD_OFFSET;
+import static org.apache.sling.distribution.journal.impl.queue.QueueItemFactory.RECORD_PARTITION;
+import static org.apache.sling.distribution.journal.impl.queue.QueueItemFactory.RECORD_TIMESTAMP;
+import static org.apache.sling.distribution.journal.impl.queue.QueueItemFactory.RECORD_TOPIC;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
+import java.util.stream.Stream;
import java.util.stream.StreamSupport;
import org.apache.sling.distribution.journal.impl.queue.OffsetQueue;
import org.apache.sling.distribution.queue.DistributionQueueEntry;
import org.apache.sling.distribution.queue.DistributionQueueItem;
import org.apache.sling.distribution.queue.DistributionQueueType;
-import org.apache.sling.distribution.queue.spi.DistributionQueue;
+import org.junit.Before;
import org.junit.Test;
-
-import static org.apache.sling.distribution.journal.impl.queue.QueueItemFactory.*;
-import static java.util.Collections.*;
-import static org.junit.Assert.*;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
@SuppressWarnings("serial")
public class PubQueueTest {
-
private static final String TOPIC = "topic";
-
private static final String PARTITION = "0";
-
private static final String QUEUE_NAME = "queueName";
+ private static final String PACKAGE_ID_PREFIX = "package-";
+ private final Logger log = LoggerFactory.getLogger(this.getClass());
+ private Semaphore invoked = new Semaphore(0);
+ private long lastClearOffset = 0l;
+ private OffsetQueue<DistributionQueueItem> offsetQueue;
+ private PubQueue queue;
- private static final String PACKAGE_ID_1 = "package-1";
-
- private static final String PACKAGE_ID_2 = "package-2";
-
- private static final String PACKAGE_ID_3 = "package-3";
-
- private static final ClearCallback NO_OP = (offset) -> {};
-
- private static final OffsetQueue<DistributionQueueItem> EMPTY_QUEUE = new OffsetQueueImpl<>();
-
- private static final OffsetQueue<DistributionQueueItem> THREE_ENTRY_QUEUE = new OffsetQueueImpl<>();
-
- static {
-
- THREE_ENTRY_QUEUE.putItem(100, new DistributionQueueItem(PACKAGE_ID_1, new HashMap<String, Object>(){{
- put(RECORD_TOPIC, TOPIC);
- put(RECORD_PARTITION, PARTITION);
- put(RECORD_OFFSET, 100);
- put(RECORD_TIMESTAMP, 1541538150582L);
- }}));
-
- THREE_ENTRY_QUEUE.putItem(200, new DistributionQueueItem(PACKAGE_ID_2, new HashMap<String, Object>(){{
- put(RECORD_TOPIC, TOPIC);
- put(RECORD_PARTITION, PARTITION);
- put(RECORD_OFFSET, 200);
- put(RECORD_TIMESTAMP, 1541538150584L);
- }}));
-
- THREE_ENTRY_QUEUE.putItem(300, new DistributionQueueItem(PACKAGE_ID_3, new HashMap<String, Object>(){{
- put(RECORD_TOPIC, TOPIC);
- put(RECORD_PARTITION, PARTITION);
- put(RECORD_OFFSET, 300);
- put(RECORD_TIMESTAMP, 1541538150586L);
- }}));
+ @Before
+ public void before () {
+ offsetQueue = new OffsetQueueImpl<>();
+ queue = pubQueue(offsetQueue);
+ addEntries();
}
@Test
public void testGetName() throws Exception {
- assertEquals(QUEUE_NAME, new PubQueue(QUEUE_NAME, new OffsetQueueImpl<>(), 0, NO_OP).getName());
+ assertEquals(QUEUE_NAME, queue.getName());
}
@Test(expected = UnsupportedOperationException.class)
public void testAdd() throws Exception {
- DistributionQueueItem queueItem = new DistributionQueueItem(PACKAGE_ID_1, emptyMap());
- new PubQueue(QUEUE_NAME, new OffsetQueueImpl<>(), 0, NO_OP).add(queueItem);
+ queue.add(queueItem(1));
+ }
+
+ @Test
+ public void testGetHeadEmpty() throws Exception {
+ assertNull(queue.getHead());
}
@Test
public void testGetHead() throws Exception {
- assertNull(pubQueue(EMPTY_QUEUE).getHead());
- DistributionQueueEntry headEntry = pubQueue(THREE_ENTRY_QUEUE).getHead();
+ addEntries();
+
+ DistributionQueueEntry headEntry = queue.getHead();
+
assertNotNull(headEntry);
- assertEquals(PACKAGE_ID_1, headEntry.getItem().getPackageId());
+ assertEquals(packageId(1), headEntry.getItem().getPackageId());
}
@Test
public void testGetItems() throws Exception {
- Iterator<DistributionQueueEntry> entries = pubQueue(THREE_ENTRY_QUEUE).getEntries(1, 2).iterator();
+ addEntries();
+
+ Iterator<DistributionQueueEntry> entries = queue.getEntries(1, 2).iterator();
+
assertNotNull(entries);
DistributionQueueEntry entry1 = entries.next();
assertNotNull(entry1);
- assertEquals(PACKAGE_ID_2, entry1.getItem().getPackageId());
+ assertEquals(packageId(2), entry1.getItem().getPackageId());
DistributionQueueEntry entry2 = entries.next();
- assertEquals(PACKAGE_ID_3, entry2.getItem().getPackageId());
+ assertEquals(packageId(3), entry2.getItem().getPackageId());
}
@Test
public void testGetItem() throws Exception {
+ addEntries();
+
String entryId = TOPIC + "-" + PARTITION + "@" + 200;
- DistributionQueueEntry queueEntry = pubQueue(THREE_ENTRY_QUEUE).getEntry(entryId);
+ DistributionQueueEntry queueEntry = queue.getEntry(entryId);
+
assertNotNull(queueEntry);
- assertEquals(PACKAGE_ID_2, queueEntry.getItem().getPackageId());
+ assertEquals(packageId(2), queueEntry.getItem().getPackageId());
}
@Test
public void testRemoveHead() throws Exception {
- final Semaphore invoked = new Semaphore(1);
- DistributionQueue queue = pubQueue(THREE_ENTRY_QUEUE, (offset) -> invoked.release());
- String headEntryId = EntryUtil.entryId(THREE_ENTRY_QUEUE.getHeadItem());
+ addEntries();
+
+ String headEntryId = EntryUtil.entryId(offsetQueue.getHeadItem());
DistributionQueueEntry removed = queue.remove(headEntryId);
- assertTrue(invoked.tryAcquire(5, TimeUnit.MILLISECONDS));
+
+ assertClearCallbackInvoked();
assertEquals(headEntryId, removed.getId());
}
@Test(expected = UnsupportedOperationException.class)
public void testRemoveRandomItemFails() throws Exception {
- DistributionQueue queue = pubQueue(THREE_ENTRY_QUEUE);
- String randomEntryId = EntryUtil.entryId(THREE_ENTRY_QUEUE.getItem(200));
+ addEntries();
+
+ String randomEntryId = EntryUtil.entryId(offsetQueue.getItem(200));
queue.remove(randomEntryId);
}
@Test
public void testRemoveSetOfRandomItemsWillClear() throws Exception {
- final Semaphore invoked = new Semaphore(2);
- DistributionQueue queue = pubQueue(THREE_ENTRY_QUEUE, (offset) -> invoked.release());
- String headEntryId = EntryUtil.entryId(THREE_ENTRY_QUEUE.getHeadItem());
- String randomEntryId = EntryUtil.entryId(THREE_ENTRY_QUEUE.getItem(200));
+ addEntries();
+ String headEntryId = EntryUtil.entryId(offsetQueue.getHeadItem());
+ String randomEntryId = EntryUtil.entryId(offsetQueue.getItem(offset(2)));
Iterator<DistributionQueueEntry> removed = queue.remove(Collections.singleton(randomEntryId)).iterator();
- assertTrue(invoked.tryAcquire(5, TimeUnit.MILLISECONDS));
- assertTrue(invoked.tryAcquire(5, TimeUnit.MILLISECONDS));
+
+ assertClearCallbackInvoked();
assertEquals(headEntryId, removed.next().getId());
assertEquals(randomEntryId, removed.next().getId());
assertFalse(removed.hasNext());
@@ -153,47 +149,79 @@
@Test
public void testRemoveSetOfNonExistingItem() throws Exception {
- DistributionQueue queue = pubQueue(THREE_ENTRY_QUEUE);
-
+ addEntries();
+
Iterable<DistributionQueueEntry> removed = queue.remove(Collections.singleton("nonexisting-0@99999"));
+
assertFalse(removed.iterator().hasNext());
}
@Test
public void testClearAll() throws Exception {
- final Semaphore invoked = new Semaphore(3);
- DistributionQueue queue = pubQueue(THREE_ENTRY_QUEUE, (offset) -> invoked.release());
+ addEntries();
Iterable<DistributionQueueEntry> removed = queue.clear(-1);
- assertTrue(invoked.tryAcquire(5, TimeUnit.MILLISECONDS));
- assertTrue(invoked.tryAcquire(5, TimeUnit.MILLISECONDS));
- assertTrue(invoked.tryAcquire(5, TimeUnit.MILLISECONDS));
- assertEquals(3, StreamSupport.stream(removed.spliterator(), false).count());
+
+ assertClearCallbackInvoked();
+ assertEquals(3, streamOf(removed).count());
+ assertEquals(offset(3), lastClearOffset);
}
@Test
public void testClearPartial() throws Exception {
- final Semaphore invoked = new Semaphore(2);
- DistributionQueue queue = pubQueue(THREE_ENTRY_QUEUE, (offset) -> invoked.release());
-
+ addEntries();
+
Iterable<DistributionQueueEntry> removed = queue.clear(2);
- assertTrue(invoked.tryAcquire(5, TimeUnit.MILLISECONDS));
- assertTrue(invoked.tryAcquire(5, TimeUnit.MILLISECONDS));
- assertEquals(2, StreamSupport.stream(removed.spliterator(), false).count());
+
+ assertClearCallbackInvoked();
+ assertEquals(2, streamOf(removed).count());
+ assertEquals(offset(2), lastClearOffset);
}
@Test
public void testGetType() throws Exception {
- assertEquals(DistributionQueueType.ORDERED, new PubQueue(QUEUE_NAME, new OffsetQueueImpl<>(), 0, NO_OP).getType());
+ assertEquals(DistributionQueueType.ORDERED, queue.getType());
+ }
+
+ private void assertClearCallbackInvoked() throws InterruptedException {
+ assertTrue(invoked.tryAcquire(5, TimeUnit.MILLISECONDS));
+ }
+
+ private void addEntries() {
+ offsetQueue.putItem(offset(1), queueItem(1));
+ offsetQueue.putItem(offset(2), queueItem(2));
+ offsetQueue.putItem(offset(3), queueItem(3));
+ }
+
+ private DistributionQueueItem queueItem(int nr) {
+ HashMap<String, Object> data = new HashMap<String, Object>(){{
+ put(RECORD_TOPIC, TOPIC);
+ put(RECORD_PARTITION, PARTITION);
+ put(RECORD_OFFSET, offset(nr));
+ put(RECORD_TIMESTAMP, 1541538150580L + nr * 2);
+ }};
+ return new DistributionQueueItem(packageId(nr), data);
+ }
+
+ private long offset(int nr) {
+ return nr * 100;
+ };
+
+ private static String packageId(int nr) {
+ return PACKAGE_ID_PREFIX + new Integer(nr).toString();
+ }
+
+ private Stream<DistributionQueueEntry> streamOf(Iterable<DistributionQueueEntry> entries) {
+ return StreamSupport.stream(entries.spliterator(), false);
}
private PubQueue pubQueue(OffsetQueue<DistributionQueueItem> offsetQueue) {
- return pubQueue(offsetQueue, NO_OP);
+ return new PubQueue(QUEUE_NAME, offsetQueue, 0, this::clearCallback);
}
- private PubQueue pubQueue(OffsetQueue<DistributionQueueItem> offsetQueue, ClearCallback clearCallback) {
- return new PubQueue(QUEUE_NAME, offsetQueue, 0, clearCallback);
+ private void clearCallback(long offset) {
+ log.info("Clearcallback with offset {}", offset);
+ lastClearOffset = offset;
+ invoked.release();
}
-
-
}
\ No newline at end of file