SLING-8996 - Fix error in clear callback. Improve test and logging
diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueue.java b/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueue.java
index 93f0dca..9bfd4fd 100644
--- a/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueue.java
+++ b/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueue.java
@@ -47,9 +47,13 @@
 import org.apache.sling.distribution.queue.DistributionQueueStatus;
 import org.apache.sling.distribution.queue.DistributionQueueType;
 import org.apache.sling.distribution.queue.spi.DistributionQueue;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 @ParametersAreNonnullByDefault
 public class PubQueue implements DistributionQueue {
+    
+    private final Logger log = LoggerFactory.getLogger(this.getClass());
 
     private final String queueName;
 
@@ -134,6 +138,7 @@
          * Until then, the REMOVABLE capability is provided
          * but only allows to remove the head of the queue.
          */
+        log.info("Removing queue entry {}", entryId);
         DistributionQueueEntry headEntry = getHead();
         if (headEntry != null) {
             if (headEntry.getId().equals(entryId)) {
@@ -159,6 +164,7 @@
          * which clears from the head entry to the entry
          * provided with the max offset (tailEntry).
          */
+        log.info("Removing queue entries {}", entryIds);
         Optional<String> tailEntryId = entryIds.stream()
                 .max((e1, e2) -> compare(EntryUtil.entryOffset(e1), EntryUtil.entryOffset(e2)));
         return (tailEntryId.isPresent())
@@ -214,6 +220,7 @@
     }
 
     private Iterable<DistributionQueueEntry> clear(String tailEntryId) {
+        log.info("Clearing up to tail queue entry {}", tailEntryId);
         List<DistributionQueueEntry> removed = new ArrayList<>();
         for (DistributionQueueEntry entry : getEntries(0, -1)) {
             removed.add(entry);
@@ -229,7 +236,8 @@
         if (clearCallback == null) {
             throw new UnsupportedOperationException();
         }
-        clearCallback.clear(EntryUtil.entryOffset(tailEntry.getId()));
+        long offset = EntryUtil.entryOffset(tailEntry.getId());
+        clearCallback.clear(offset);
     }
 
 }
diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueueProviderImpl.java b/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueueProviderImpl.java
index 8b253b7..3b7feab 100644
--- a/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueueProviderImpl.java
+++ b/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueueProviderImpl.java
@@ -108,7 +108,7 @@
     @Override
     public DistributionQueue getQueue(String pubAgentName, String subSlingId, String subAgentName, String queueName, long minOffset, int headRetries, boolean editable) {
         OffsetQueue<DistributionQueueItem> agentQueue = pubQueueCacheService.getOffsetQueue(pubAgentName, minOffset);
-        ClearCallback editableCallback = offset -> sendClearCommand(subSlingId, subAgentName, minOffset);
+        ClearCallback editableCallback = offset -> sendClearCommand(subSlingId, subAgentName, offset);
         ClearCallback callback = editable ? editableCallback : null;
         return new PubQueue(queueName, agentQueue.getMinOffsetQueue(minOffset), headRetries, callback);
     }
@@ -152,6 +152,7 @@
                 .setSubAgentName(subAgentName)
                 .setClearCommand(clearCommand)
                 .build();
+        LOG.info("Sending clear command to subSlingId: {}, subAgentName: {} with offset {}.", subSlingId, subAgentName, offset);
         sender.send(topics.getCommandTopic(), commandMessage);
     }
 
diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/CommandPoller.java b/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/CommandPoller.java
index e56828f..f9d047b 100644
--- a/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/CommandPoller.java
+++ b/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/CommandPoller.java
@@ -81,8 +81,9 @@
     }
 
     private void handleClearCommand(long offset) {
-        updateClearOffsetIfLarger(offset);
-        LOG.info("Handled clear command for offset {}", offset);
+        long oldOffset = clearOffset.get();
+        long newOffset = updateClearOffsetIfLarger(offset);
+        LOG.info("Handled clear command for offset {}. Old clear offset was {}, new clear offset is {}.", offset, oldOffset, newOffset);
     }
 
     private long updateClearOffsetIfLarger(long offset) {
diff --git a/src/test/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueueTest.java b/src/test/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueueTest.java
index 25c15bf..9872ad9 100644
--- a/src/test/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueueTest.java
+++ b/src/test/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueueTest.java
@@ -18,134 +18,130 @@
  */
 package org.apache.sling.distribution.journal.impl.queue.impl;
 
+import static org.apache.sling.distribution.journal.impl.queue.QueueItemFactory.RECORD_OFFSET;
+import static org.apache.sling.distribution.journal.impl.queue.QueueItemFactory.RECORD_PARTITION;
+import static org.apache.sling.distribution.journal.impl.queue.QueueItemFactory.RECORD_TIMESTAMP;
+import static org.apache.sling.distribution.journal.impl.queue.QueueItemFactory.RECORD_TOPIC;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.Iterator;
 import java.util.concurrent.Semaphore;
 import java.util.concurrent.TimeUnit;
+import java.util.stream.Stream;
 import java.util.stream.StreamSupport;
 
 import org.apache.sling.distribution.journal.impl.queue.OffsetQueue;
 import org.apache.sling.distribution.queue.DistributionQueueEntry;
 import org.apache.sling.distribution.queue.DistributionQueueItem;
 import org.apache.sling.distribution.queue.DistributionQueueType;
-import org.apache.sling.distribution.queue.spi.DistributionQueue;
+import org.junit.Before;
 import org.junit.Test;
-
-import static org.apache.sling.distribution.journal.impl.queue.QueueItemFactory.*;
-import static java.util.Collections.*;
-import static org.junit.Assert.*;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 @SuppressWarnings("serial")
 public class PubQueueTest {
-
     private static final String TOPIC = "topic";
-
     private static final String PARTITION = "0";
-
     private static final String QUEUE_NAME = "queueName";
+    private static final String PACKAGE_ID_PREFIX = "package-";
+    private final Logger log = LoggerFactory.getLogger(this.getClass());
+    private Semaphore invoked = new Semaphore(0);
+    private long lastClearOffset = 0l;
+    private OffsetQueue<DistributionQueueItem> offsetQueue;
+    private PubQueue queue;
 
-    private static final String PACKAGE_ID_1 = "package-1";
-
-    private static final String PACKAGE_ID_2 = "package-2";
-
-    private static final String PACKAGE_ID_3 = "package-3";
-
-    private static final ClearCallback NO_OP = (offset) -> {};
-
-    private static final OffsetQueue<DistributionQueueItem> EMPTY_QUEUE = new OffsetQueueImpl<>();
-
-    private static final OffsetQueue<DistributionQueueItem> THREE_ENTRY_QUEUE = new OffsetQueueImpl<>();
-
-    static {
-
-        THREE_ENTRY_QUEUE.putItem(100, new DistributionQueueItem(PACKAGE_ID_1, new HashMap<String, Object>(){{
-            put(RECORD_TOPIC, TOPIC);
-            put(RECORD_PARTITION, PARTITION);
-            put(RECORD_OFFSET, 100);
-            put(RECORD_TIMESTAMP, 1541538150582L);
-        }}));
-
-        THREE_ENTRY_QUEUE.putItem(200, new DistributionQueueItem(PACKAGE_ID_2, new HashMap<String, Object>(){{
-            put(RECORD_TOPIC, TOPIC);
-            put(RECORD_PARTITION, PARTITION);
-            put(RECORD_OFFSET, 200);
-            put(RECORD_TIMESTAMP, 1541538150584L);
-        }}));
-
-        THREE_ENTRY_QUEUE.putItem(300, new DistributionQueueItem(PACKAGE_ID_3, new HashMap<String, Object>(){{
-            put(RECORD_TOPIC, TOPIC);
-            put(RECORD_PARTITION, PARTITION);
-            put(RECORD_OFFSET, 300);
-            put(RECORD_TIMESTAMP, 1541538150586L);
-        }}));
+    @Before
+    public void before () {
+        offsetQueue = new OffsetQueueImpl<>();
+        queue = pubQueue(offsetQueue);
+        addEntries();
     }
 
     @Test
     public void testGetName() throws Exception {
-        assertEquals(QUEUE_NAME, new PubQueue(QUEUE_NAME, new OffsetQueueImpl<>(), 0, NO_OP).getName());
+        assertEquals(QUEUE_NAME, queue.getName());
     }
 
     @Test(expected = UnsupportedOperationException.class)
     public void testAdd() throws Exception {
-        DistributionQueueItem queueItem = new DistributionQueueItem(PACKAGE_ID_1, emptyMap());
-        new PubQueue(QUEUE_NAME, new OffsetQueueImpl<>(), 0, NO_OP).add(queueItem);
+        queue.add(queueItem(1));
+    }
+    
+    @Test
+    public void testGetHeadEmpty() throws Exception {
+        assertNull(queue.getHead());
     }
 
     @Test
     public void testGetHead() throws Exception {
-        assertNull(pubQueue(EMPTY_QUEUE).getHead());
-        DistributionQueueEntry headEntry = pubQueue(THREE_ENTRY_QUEUE).getHead();
+        addEntries();
+        
+        DistributionQueueEntry headEntry = queue.getHead();
+
         assertNotNull(headEntry);
-        assertEquals(PACKAGE_ID_1, headEntry.getItem().getPackageId());
+        assertEquals(packageId(1), headEntry.getItem().getPackageId());
     }
 
     @Test
     public void testGetItems() throws Exception {
-        Iterator<DistributionQueueEntry> entries = pubQueue(THREE_ENTRY_QUEUE).getEntries(1, 2).iterator();
+        addEntries();
+        
+        Iterator<DistributionQueueEntry> entries = queue.getEntries(1, 2).iterator();
+        
         assertNotNull(entries);
         DistributionQueueEntry entry1 = entries.next();
         assertNotNull(entry1);
-        assertEquals(PACKAGE_ID_2, entry1.getItem().getPackageId());
+        assertEquals(packageId(2), entry1.getItem().getPackageId());
         DistributionQueueEntry entry2 = entries.next();
-        assertEquals(PACKAGE_ID_3, entry2.getItem().getPackageId());
+        assertEquals(packageId(3), entry2.getItem().getPackageId());
     }
 
     @Test
     public void testGetItem() throws Exception {
+        addEntries();
+        
         String entryId = TOPIC + "-" + PARTITION + "@" + 200;
-        DistributionQueueEntry queueEntry = pubQueue(THREE_ENTRY_QUEUE).getEntry(entryId);
+        DistributionQueueEntry queueEntry = queue.getEntry(entryId);
+        
         assertNotNull(queueEntry);
-        assertEquals(PACKAGE_ID_2, queueEntry.getItem().getPackageId());
+        assertEquals(packageId(2), queueEntry.getItem().getPackageId());
     }
 
     @Test
     public void testRemoveHead() throws Exception {
-        final Semaphore invoked = new Semaphore(1);
-        DistributionQueue queue = pubQueue(THREE_ENTRY_QUEUE, (offset) -> invoked.release());
-        String headEntryId = EntryUtil.entryId(THREE_ENTRY_QUEUE.getHeadItem());
+        addEntries();
+        
+        String headEntryId = EntryUtil.entryId(offsetQueue.getHeadItem());
         DistributionQueueEntry removed = queue.remove(headEntryId);
-        assertTrue(invoked.tryAcquire(5, TimeUnit.MILLISECONDS));
+        
+        assertClearCallbackInvoked();
         assertEquals(headEntryId, removed.getId());
     }
 
     @Test(expected = UnsupportedOperationException.class)
     public void testRemoveRandomItemFails() throws Exception {
-        DistributionQueue queue = pubQueue(THREE_ENTRY_QUEUE);
-        String randomEntryId = EntryUtil.entryId(THREE_ENTRY_QUEUE.getItem(200));
+        addEntries();
+        
+        String randomEntryId = EntryUtil.entryId(offsetQueue.getItem(200));
         queue.remove(randomEntryId);
     }
 
     @Test
     public void testRemoveSetOfRandomItemsWillClear() throws Exception {
-        final Semaphore invoked = new Semaphore(2);
-        DistributionQueue queue = pubQueue(THREE_ENTRY_QUEUE, (offset) -> invoked.release());
-        String headEntryId = EntryUtil.entryId(THREE_ENTRY_QUEUE.getHeadItem());
-        String randomEntryId = EntryUtil.entryId(THREE_ENTRY_QUEUE.getItem(200));
+        addEntries();
+        String headEntryId = EntryUtil.entryId(offsetQueue.getHeadItem());
+        String randomEntryId = EntryUtil.entryId(offsetQueue.getItem(offset(2)));
 
         Iterator<DistributionQueueEntry> removed = queue.remove(Collections.singleton(randomEntryId)).iterator();
-        assertTrue(invoked.tryAcquire(5, TimeUnit.MILLISECONDS));
-        assertTrue(invoked.tryAcquire(5, TimeUnit.MILLISECONDS));
+        
+        assertClearCallbackInvoked();
         assertEquals(headEntryId, removed.next().getId());
         assertEquals(randomEntryId, removed.next().getId());
         assertFalse(removed.hasNext());
@@ -153,47 +149,79 @@
 
     @Test
     public void testRemoveSetOfNonExistingItem() throws Exception {
-        DistributionQueue queue = pubQueue(THREE_ENTRY_QUEUE);
-
+        addEntries();
+        
         Iterable<DistributionQueueEntry> removed = queue.remove(Collections.singleton("nonexisting-0@99999"));
+        
         assertFalse(removed.iterator().hasNext());
     }
 
     @Test
     public void testClearAll() throws Exception {
-        final Semaphore invoked = new Semaphore(3);
-        DistributionQueue queue = pubQueue(THREE_ENTRY_QUEUE, (offset) -> invoked.release());
+        addEntries();
 
         Iterable<DistributionQueueEntry> removed = queue.clear(-1);
-        assertTrue(invoked.tryAcquire(5, TimeUnit.MILLISECONDS));
-        assertTrue(invoked.tryAcquire(5, TimeUnit.MILLISECONDS));
-        assertTrue(invoked.tryAcquire(5, TimeUnit.MILLISECONDS));
-        assertEquals(3, StreamSupport.stream(removed.spliterator(), false).count());
+        
+        assertClearCallbackInvoked();
+        assertEquals(3, streamOf(removed).count());
+        assertEquals(offset(3), lastClearOffset);
     }
 
     @Test
     public void testClearPartial() throws Exception {
-        final Semaphore invoked = new Semaphore(2);
-        DistributionQueue queue = pubQueue(THREE_ENTRY_QUEUE, (offset) -> invoked.release());
-
+        addEntries();
+        
         Iterable<DistributionQueueEntry> removed = queue.clear(2);
-        assertTrue(invoked.tryAcquire(5, TimeUnit.MILLISECONDS));
-        assertTrue(invoked.tryAcquire(5, TimeUnit.MILLISECONDS));
-        assertEquals(2, StreamSupport.stream(removed.spliterator(), false).count());
+        
+        assertClearCallbackInvoked();
+        assertEquals(2, streamOf(removed).count());
+        assertEquals(offset(2), lastClearOffset);
     }
 
     @Test
     public void testGetType() throws Exception {
-        assertEquals(DistributionQueueType.ORDERED, new PubQueue(QUEUE_NAME, new OffsetQueueImpl<>(), 0, NO_OP).getType());
+        assertEquals(DistributionQueueType.ORDERED, queue.getType());
+    }
+
+    private void assertClearCallbackInvoked() throws InterruptedException {
+        assertTrue(invoked.tryAcquire(5, TimeUnit.MILLISECONDS));
+    }
+
+    private void addEntries() {
+        offsetQueue.putItem(offset(1), queueItem(1));
+        offsetQueue.putItem(offset(2), queueItem(2));
+        offsetQueue.putItem(offset(3), queueItem(3));
+    }
+
+    private DistributionQueueItem queueItem(int nr) {
+        HashMap<String, Object> data = new HashMap<String, Object>(){{
+            put(RECORD_TOPIC, TOPIC);
+            put(RECORD_PARTITION, PARTITION);
+            put(RECORD_OFFSET, offset(nr));
+            put(RECORD_TIMESTAMP, 1541538150580L + nr * 2);
+        }};
+        return new DistributionQueueItem(packageId(nr), data);
+    }
+
+    private long offset(int nr) {
+        return nr * 100;
+    };
+    
+    private static String packageId(int nr) {
+        return PACKAGE_ID_PREFIX + new Integer(nr).toString();
+    }
+
+    private Stream<DistributionQueueEntry> streamOf(Iterable<DistributionQueueEntry> entries) {
+        return StreamSupport.stream(entries.spliterator(), false);
     }
 
     private PubQueue pubQueue(OffsetQueue<DistributionQueueItem> offsetQueue) {
-        return pubQueue(offsetQueue, NO_OP);
+        return new PubQueue(QUEUE_NAME, offsetQueue, 0, this::clearCallback);
     }
 
-    private PubQueue pubQueue(OffsetQueue<DistributionQueueItem> offsetQueue, ClearCallback clearCallback) {
-        return new PubQueue(QUEUE_NAME, offsetQueue, 0, clearCallback);
+    private void clearCallback(long offset) {
+        log.info("Clearcallback with offset {}", offset);
+        lastClearOffset = offset; 
+        invoked.release();
     }
-
-
 }
\ No newline at end of file