Merge pull request #38 from Buuhuu/SLING-9495

SLING-9495: send the request type as distribution.type
diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueueCache.java b/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueueCache.java
index b2f0135..ec811d2 100644
--- a/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueueCache.java
+++ b/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueueCache.java
@@ -106,7 +106,7 @@
      * Holds the last known seed offset stored to the
      * seed store.
      */
-    private volatile long seedOffset = 0L;
+    private volatile long seedOffset = -1L;
 
     private final Set<JMXRegistration> jmxRegs = new HashSet<>();
 
@@ -116,36 +116,42 @@
 
     private volatile Closeable tailPoller;
 
+    private final QueueCacheSeeder seeder;
+
     private final String topic;
 
     private final LocalStore seedStore;
 
     private final DistributionMetricsService distributionMetricsService;
     
-    private volatile boolean closed;
-
-    public PubQueueCache(MessagingProvider messagingProvider, EventAdmin eventAdmin, DistributionMetricsService distributionMetricsService, String topic, LocalStore seedStore) {
+    public PubQueueCache(MessagingProvider messagingProvider, EventAdmin eventAdmin, DistributionMetricsService distributionMetricsService, String topic, LocalStore seedStore, QueueCacheSeeder seeder) {
         this.messagingProvider = messagingProvider;
         this.eventAdmin = eventAdmin;
         this.distributionMetricsService = distributionMetricsService;
         this.topic = topic;
         this.seedStore = seedStore;
-        Long offset = seedStore.load().get("offset", Long.class);
+        this.seeder = seeder;
+        Long offset = seedStore.load("offset", Long.class);
         if (offset != null) {
             seedOffset = offset;
+            startPoller(seedOffset);
+            /*
+             * We need at least one seeding message
+             * for cases where the seedOffset is no
+             * longer on the journal.
+             */
+            seeder.seedOne();
         } else {
             /*
              * Fallback to seeding messages when
              * no offset could be found in the
              * repository.
              */
-            seedOffset = messagingProvider.retrieveOffset(topic, Reset.latest);
-            storeSeed(seedOffset);
+            seeder.seed(this::startPoller);
         }
-        seed(seedOffset);
     }
 
-    private void seed(long offset) {
+    private void startPoller(long offset) {
         LOG.info("Seed with offset: {}", offset);
         String assignTo = messagingProvider.assignTo(offset);
         tailPoller = messagingProvider.createPoller(
@@ -157,14 +163,10 @@
 
     @Nonnull
     public OffsetQueue<DistributionQueueItem> getOffsetQueue(String pubAgentName, long minOffset) throws InterruptedException {
-        if (minOffset > 0) {
-            /*
-             * we fetch data only when requested
-             * at least one processed offset
-             */
-            waitSeeded();
-            fetchIfNeeded(minOffset);
+        if (!isSeeded()) {
+            throw new RuntimeException("Gave up waiting for seeded cache");
         }
+        fetchIfNeeded(minOffset);
         return agentQueues.getOrDefault(pubAgentName, new OffsetQueueImpl<>());
     }
 
@@ -190,8 +192,8 @@
     }
 
     public void close() {
-        closed = true;
         IOUtils.closeQuietly(tailPoller);
+        IOUtils.closeQuietly(seeder);
         jmxRegs.forEach(IOUtils::closeQuietly);
     }
 
@@ -253,15 +255,8 @@
         updateMinOffset(requestedMinOffset);
     }
 
-    private void waitSeeded() throws InterruptedException {
-        long start = System.currentTimeMillis();
-        while (getMinOffset() == Long.MAX_VALUE) {
-            LOG.debug("Waiting for seeded cache");
-            if (closed || System.currentTimeMillis() - start > MAX_FETCH_WAIT_MS) {
-                throw new RuntimeException("Gave up waiting for seeded cache");
-            }
-            Thread.sleep(1000);
-        }
+    private boolean isSeeded() {
+        return getMinOffset() != Long.MAX_VALUE;
     }
 
     protected long getMinOffset() {
diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueueCacheService.java b/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueueCacheService.java
index 304bfb4..2eddc6f 100644
--- a/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueueCacheService.java
+++ b/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueueCacheService.java
@@ -155,6 +155,8 @@
 
     private PubQueueCache newCache() {
         LocalStore seedStore = new LocalStore(resolverFactory, "seeds", pubSlingId);
-        return new PubQueueCache(messagingProvider, eventAdmin, distributionMetricsService, topics.getPackageTopic(), seedStore);
+        String topic = topics.getPackageTopic();
+        QueueCacheSeeder seeder = new QueueCacheSeeder(messagingProvider, topic);
+        return new PubQueueCache(messagingProvider, eventAdmin, distributionMetricsService, topic, seedStore, seeder);
     }
 }
diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/QueueCacheSeeder.java b/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/QueueCacheSeeder.java
new file mode 100644
index 0000000..79f6063
--- /dev/null
+++ b/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/QueueCacheSeeder.java
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sling.distribution.journal.impl.queue.impl;
+
+import java.io.Closeable;
+import java.util.UUID;
+import java.util.function.LongConsumer;
+
+import org.apache.commons.io.IOUtils;
+import org.apache.sling.distribution.journal.MessageSender;
+import org.apache.sling.distribution.journal.MessagingException;
+import org.apache.sling.distribution.journal.MessagingProvider;
+import org.apache.sling.distribution.journal.Reset;
+import org.apache.sling.distribution.journal.messages.Messages.PackageMessage;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.apache.sling.distribution.journal.HandlerAdapter.create;
+import static org.apache.sling.distribution.journal.RunnableUtil.startBackgroundThread;
+
+public class QueueCacheSeeder implements Closeable {
+
+
+    private static final Logger LOG = LoggerFactory.getLogger(QueueCacheSeeder.class);
+
+    /**
+     * Interval in millisecond between two seeding messages to seed the cache.
+     */
+    private static final long CACHE_SEEDING_DELAY_MS = 10_000;
+
+    private final String topic;
+
+    private final MessagingProvider provider;
+
+    private volatile Closeable poller;
+
+    private volatile boolean closed;
+
+    public QueueCacheSeeder(MessagingProvider provider, String topic) {
+        this.provider = provider;
+        this.topic = topic;
+    }
+
+    public void seedOne() {
+        startBackgroundThread(this::sendSeedingMessage, "Seeder thread - one seed");
+    }
+
+    public void seed(LongConsumer callback) {
+        poller = provider.createPoller(topic, Reset.latest,
+                create(PackageMessage.class, (info, msg) -> {
+                    close();
+                    callback.accept(info.getOffset());
+                }));
+        startBackgroundThread(this::sendSeedingMessages, "Seeder thread");
+    }
+
+    @Override
+    public void close() {
+        closed = true;
+        IOUtils.closeQuietly(poller);
+    }
+
+    private void sendSeedingMessages() {
+        LOG.info("Start message seeder");
+        try {
+            MessageSender<PackageMessage> sender = provider.createSender();
+            while (!closed) {
+                sendSeedingMessage(sender);
+                delay(CACHE_SEEDING_DELAY_MS);
+            }
+        } finally {
+            LOG.info("Stop message seeder");
+        }
+    }
+
+    private void sendSeedingMessage() {
+        sendSeedingMessage(provider.createSender());
+    }
+
+    private void sendSeedingMessage(MessageSender<PackageMessage> sender) {
+        PackageMessage pkgMsg = createTestMessage();
+        LOG.info("Send seeding message");
+        try {
+            sender.send(topic, pkgMsg);
+        } catch (MessagingException e) {
+            LOG.warn(e.getMessage(), e);
+            delay(CACHE_SEEDING_DELAY_MS * 10);
+        }
+    }
+
+    private static void delay(long sleepMs) {
+        try {
+            Thread.sleep(sleepMs);
+        } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+        }
+    }
+
+    protected PackageMessage createTestMessage() {
+        String pkgId = UUID.randomUUID().toString();
+        return PackageMessage.newBuilder()
+                .setPubSlingId("seeder")
+                .setPkgId(pkgId)
+                .setPkgType("seeder")
+                .setReqType(PackageMessage.ReqType.TEST)
+                .build();
+    }
+}
diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/LocalStore.java b/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/LocalStore.java
index b59fbde..878c831 100644
--- a/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/LocalStore.java
+++ b/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/LocalStore.java
@@ -91,6 +91,10 @@
         LOG.debug(String.format("Stored data %s for storeId %s", map.toString(), storeId));
     }
 
+    public <T> T load(String key, Class<T> clazz) {
+        return load().get(key, clazz);
+    }
+
     public <T> T load(String key, T defaultValue) {
         LOG.debug(String.format("Loading key %s for storeId %s with default value %s", key, storeId, defaultValue));
         return load().get(key, defaultValue);
diff --git a/src/test/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueueCacheTest.java b/src/test/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueueCacheTest.java
index 828ed35..3f0dc5e 100644
--- a/src/test/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueueCacheTest.java
+++ b/src/test/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueueCacheTest.java
@@ -24,25 +24,22 @@
 import static org.apache.sling.distribution.journal.messages.Messages.PackageMessage.ReqType.DELETE;
 import static org.apache.sling.distribution.journal.messages.Messages.PackageMessage.ReqType.TEST;
 import static org.hamcrest.CoreMatchers.notNullValue;
+import static org.hamcrest.Matchers.greaterThan;
 import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertThat;
 import static org.mockito.Matchers.any;
 import static org.mockito.Matchers.anyLong;
+import static org.mockito.Matchers.anyString;
 import static org.mockito.Matchers.eq;
 import static org.mockito.Mockito.when;
 
 import java.io.Closeable;
 import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
 import java.util.Random;
 import java.util.UUID;
-import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
-import java.util.concurrent.TimeoutException;
 import java.util.stream.LongStream;
 
 import org.apache.sling.api.resource.ResourceResolverFactory;
@@ -60,7 +57,6 @@
 import org.apache.sling.distribution.queue.DistributionQueueItem;
 import org.apache.sling.testing.resourceresolver.MockResourceResolverFactory;
 import org.awaitility.Awaitility;
-import org.awaitility.Duration;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
@@ -89,9 +85,6 @@
     private static final String PUB_AGENT_NAME_3 = "pubAgentName3";
 
     private static final Random RAND = new Random();
-    
-    @Captor
-    private ArgumentCaptor<PackageMessage> seedingMessageCaptor;
 
     @Captor
     private ArgumentCaptor<HandlerAdapter<PackageMessage>> handlerCaptor;
@@ -106,12 +99,18 @@
     private MessagingProvider clientProvider;
 
     @Mock
+    private QueueCacheSeeder cacheSeeder;
+
+    @Mock
     private DistributionMetricsService distributionMetricsService;
 
     @Mock
     private Counter counter;
 
     @Mock
+    private LocalStore seedStore;
+
+    @Mock
     private Closeable poller;
 
     private ResourceResolverFactory resolverFactory = new MockResourceResolverFactory();
@@ -136,8 +135,9 @@
         when(distributionMetricsService.getQueueCacheFetchCount())
                 .thenReturn(counter);
 
-        LocalStore seedStore = new LocalStore(resolverFactory, "seeds", PUB_SLING_ID);
-        cache = new PubQueueCache(clientProvider, eventAdmin, distributionMetricsService, TOPIC, seedStore);
+        when(seedStore.load(anyString(), any())).thenReturn(0L);
+
+        cache = new PubQueueCache(clientProvider, eventAdmin, distributionMetricsService, TOPIC, seedStore, cacheSeeder);
         cache.storeSeed();
 
         executor = Executors.newFixedThreadPool(10);
@@ -149,36 +149,24 @@
         executor.shutdownNow();
         cache.close();
     }
-
-    @Test
-    public void testSeedingFromNewPackageMessage() throws Exception {
-        Future<OffsetQueue<DistributionQueueItem>> consumer = consumer(PUB_AGENT_NAME_1, 0);
-        // The consumer is blocked until the cache is seeded
-        assertFalse(consumer.isDone());
-        // sending any package message seeds the cache
-        simulateMessage(tailHandler, 0);
-        consumer.get(15, SECONDS);
+    
+    @Test(expected = RuntimeException.class)
+    public void testUnseededThrows() throws Exception {
+        cache.getOffsetQueue(PUB_AGENT_NAME_1, 0);
     }
 
     @Test
-    public void testSeedingConcurrentConsumers() throws Exception {
-        List<Future<OffsetQueue<DistributionQueueItem>>> consumers = new ArrayList<>();
-        // minOffset > 0 to leverage the cache
-        consumers.add(consumer(PUB_AGENT_NAME_1, 1));
-        consumers.add(consumer(PUB_AGENT_NAME_2, 1));
-        consumers.add(consumer(PUB_AGENT_NAME_3, 1));
-        // All consumers are blocked until the cache is seeded
-        consumers.forEach(future -> assertFalse(future.isDone()));
-        simulateMessage(tailHandler, 1);
-        consumers.forEach(future -> assertNotNull(get(future)));
+    public void testSeedingFromNewPackageMessage() throws Exception {
+        simulateMessage(tailHandler, PUB_AGENT_NAME_1, ADD, 0);
+        OffsetQueue<DistributionQueueItem> queue = cache.getOffsetQueue(PUB_AGENT_NAME_1, 0);
+        assertThat(queue.getSize(), greaterThan(0));
     }
 
     @Test
     public void testFetchWithSingleConsumer() throws Exception {
-        // build a consumer form offset 100
+        simulateMessage(tailHandler, 200);
         Future<OffsetQueue<DistributionQueueItem>> consumer = consumer(PUB_AGENT_NAME_1, 100);
         // seeding the cache with a message at offset 200
-        simulateMessage(tailHandler, 200);
         // wait that the consumer has started fetching the offsets from 100 to 200
         awaitHeadHandler();
         // simulate messages for the fetched offsets
@@ -195,11 +183,11 @@
 
 	@Test
     public void testFetchWithConcurrentConsumer() throws Exception {
+	    simulateMessage(tailHandler, 200);
         // build two consumers for same agent queue, from offset 100
         Future<OffsetQueue<DistributionQueueItem>> consumer1 = consumer(PUB_AGENT_NAME_1, 100);
         Future<OffsetQueue<DistributionQueueItem>> consumer2 = consumer(PUB_AGENT_NAME_1, 100);
         // seeding the cache with a message at offset 200
-        simulateMessage(tailHandler, 200);
         // wait that one consumer has started fetching the offsets from 100 to 200
         MessageHandler<PackageMessage> headHandler = awaitHeadHandler();
         // simulate messages for the fetched offsets
@@ -216,26 +204,15 @@
 
     @Test
     public void testCacheSize() throws Exception {
-        Future<OffsetQueue<DistributionQueueItem>> consumer = consumer(PUB_AGENT_NAME_1, 0);
         simulateMessage(tailHandler, PUB_AGENT_NAME_3, ADD, 0);
         simulateMessage(tailHandler, PUB_AGENT_NAME_3, DELETE, 1);
         simulateMessage(tailHandler, PUB_AGENT_NAME_1, ADD, 2);
         simulateMessage(tailHandler, PUB_AGENT_NAME_3, TEST, 3);    // TEST message does not increase the cache size
         simulateMessage(tailHandler, PUB_AGENT_NAME_2, TEST, 4);    // TEST message does not increase the cache size
         simulateMessage(tailHandler, PUB_AGENT_NAME_3, ADD, 5);
-        consumer.get(15, SECONDS);
         assertEquals(4, cache.size());
     }
 
-    @Test(expected = ExecutionException.class)
-    public void testCloseUnseededPoller() throws Throwable {
-        Future<OffsetQueue<DistributionQueueItem>> task = consumer(PUB_AGENT_NAME_1, 1); // minOffset > 0 to leverage the cache
-        Awaitility.setDefaultPollDelay(Duration.ZERO);
-        cache.close();
-        task.get();
-    }
-    
-
     private void simulateMessages(MessageHandler<PackageMessage> handler, long fromOffset, long toOffset) {
         LongStream.rangeClosed(fromOffset, toOffset).forEach(offset -> simulateMessage(handler, offset));
     }
@@ -266,14 +243,6 @@
         return executor.submit(() -> cache.getOffsetQueue(pubAgentName, minOffset));
     }
 
-    private OffsetQueue<DistributionQueueItem> get(Future<OffsetQueue<DistributionQueueItem>> future) {
-        try {
-            return future.get(1, SECONDS);
-        } catch (TimeoutException | InterruptedException | ExecutionException e) {
-            throw new RuntimeException(e);
-        }
-    }
-
     @SafeVarargs
     private final <T> T pickAny(T... c) {
         if (c == null || c.length == 0) {
diff --git a/src/test/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueueProviderTest.java b/src/test/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueueProviderTest.java
index b93003a..009235c 100644
--- a/src/test/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueueProviderTest.java
+++ b/src/test/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueueProviderTest.java
@@ -38,10 +38,13 @@
 import javax.management.ObjectName;
 import javax.management.ReflectionException;
 
+import org.apache.sling.api.resource.PersistenceException;
 import org.apache.sling.api.resource.ResourceResolverFactory;
 import org.apache.sling.distribution.journal.impl.shared.Topics;
 import org.apache.sling.distribution.journal.MessageSender;
 import com.google.protobuf.GeneratedMessage;
+
+import org.apache.sling.distribution.journal.impl.subscriber.LocalStore;
 import org.apache.sling.distribution.queue.DistributionQueueEntry;
 import org.apache.sling.distribution.queue.DistributionQueueItem;
 import org.apache.sling.distribution.queue.spi.DistributionQueue;
@@ -113,7 +116,7 @@
     private MBeanServer mbeanServer;
     
     @Before
-    public void before() {
+    public void before() throws PersistenceException {
         MockitoAnnotations.initMocks(this);
         when(clientProvider.createPoller(
                 Mockito.eq(Topics.PACKAGE_TOPIC),
@@ -131,11 +134,12 @@
         Topics topics = new Topics();
         String slingId = UUID.randomUUID().toString();
         when(slingSettings.getSlingId()).thenReturn(slingId);
+        LocalStore seedStore = new LocalStore(resolverFactory, "seeds", slingId);
+        seedStore.store("offset", 1L);
         pubQueueCacheService = new PubQueueCacheService(clientProvider, topics, eventAdmin, slingSettings, resolverFactory, slingId);
         pubQueueCacheService.activate();
         queueProvider = new PubQueueProviderImpl(pubQueueCacheService, clientProvider, topics);
         queueProvider.activate();
-        pubQueueCacheService.storeSeed();
         handler = handlerCaptor.getValue().getHandler();
         statHandler = statHandlerCaptor.getValue().getHandler();
     }
diff --git a/src/test/java/org/apache/sling/distribution/journal/impl/queue/impl/QueueCacheSeederTest.java b/src/test/java/org/apache/sling/distribution/journal/impl/queue/impl/QueueCacheSeederTest.java
new file mode 100644
index 0000000..eeebb7a
--- /dev/null
+++ b/src/test/java/org/apache/sling/distribution/journal/impl/queue/impl/QueueCacheSeederTest.java
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sling.distribution.journal.impl.queue.impl;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.function.LongConsumer;
+
+import org.apache.sling.distribution.journal.HandlerAdapter;
+import org.apache.sling.distribution.journal.MessageSender;
+import org.apache.sling.distribution.journal.MessagingProvider;
+import org.apache.sling.distribution.journal.Reset;
+import org.apache.sling.distribution.journal.impl.shared.TestMessageInfo;
+import org.apache.sling.distribution.journal.messages.Messages.PackageMessage;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.ArgumentCaptor;
+import org.mockito.Captor;
+import org.mockito.Mock;
+import org.mockito.MockitoAnnotations;
+import org.mockito.runners.MockitoJUnitRunner;
+
+import static java.lang.System.currentTimeMillis;
+import static org.apache.sling.distribution.journal.impl.shared.Topics.PACKAGE_TOPIC;
+import static org.apache.sling.distribution.journal.messages.Messages.PackageMessage.ReqType.TEST;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.doNothing;
+import static org.mockito.Mockito.timeout;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+@RunWith(MockitoJUnitRunner.class)
+public class QueueCacheSeederTest {
+
+    @Mock
+    private MessagingProvider clientProvider;
+
+    @Captor
+    private ArgumentCaptor<HandlerAdapter<PackageMessage>> pkgHandlerCaptor;
+
+    @Captor
+    private ArgumentCaptor<PackageMessage> pkgMsgCaptor;
+
+    @Mock
+    private Closeable poller;
+
+    @Mock
+    private MessageSender<PackageMessage> sender;
+
+    @Mock
+    private LongConsumer callback;
+
+    private QueueCacheSeeder seeder;
+
+    @Before
+    public void before() {
+        MockitoAnnotations.initMocks(this);
+        when(clientProvider.createPoller(
+                eq(PACKAGE_TOPIC),
+                any(Reset.class),
+                pkgHandlerCaptor.capture()))
+                .thenReturn(poller);
+        doNothing().when(sender).send(eq(PACKAGE_TOPIC), pkgMsgCaptor.capture());
+        when(clientProvider.<PackageMessage>createSender())
+                .thenReturn(sender);
+        seeder = new QueueCacheSeeder(clientProvider, PACKAGE_TOPIC);
+    }
+
+    @Test
+    public void testSeededCallback() throws IOException {
+        seeder.seed(callback);
+        long offset = 15L;
+        simulateSeedingMsg(offset);
+        verify(callback).accept(offset);
+        verify(poller).close();
+    }
+
+    @Test
+    public void testSendingSeeds() {
+        seeder.seed(callback);
+        verify(sender, timeout(5000).atLeastOnce()).send(eq(PACKAGE_TOPIC), pkgMsgCaptor.capture());
+        PackageMessage seedMsg = pkgMsgCaptor.getValue();
+        assertNotNull(seedMsg);
+        assertEquals(TEST, seedMsg.getReqType());
+    }
+
+    @After
+    public void after() {
+        seeder.close();
+    }
+
+    private void simulateSeedingMsg(long offset) {
+        PackageMessage msg = seeder.createTestMessage();
+        pkgHandlerCaptor.getValue().getHandler().handle(
+                new TestMessageInfo(PACKAGE_TOPIC, 0, offset, currentTimeMillis()),
+                msg);
+    }
+}
\ No newline at end of file
diff --git a/src/test/java/org/apache/sling/distribution/journal/impl/subscriber/LocalStoreTest.java b/src/test/java/org/apache/sling/distribution/journal/impl/subscriber/LocalStoreTest.java
index 4bc8761..c989b4a 100644
--- a/src/test/java/org/apache/sling/distribution/journal/impl/subscriber/LocalStoreTest.java
+++ b/src/test/java/org/apache/sling/distribution/journal/impl/subscriber/LocalStoreTest.java
@@ -54,9 +54,9 @@
         ResourceResolver resolver = resolverFactory.getServiceResourceResolver(null);
         LocalStore offsetStore = new LocalStore(resolverFactory, "packages", "store3");
         offsetStore.store(resolver, "key1", "value1");
-        assertNull(offsetStore.load("key1", null));
+        assertNull(offsetStore.load("key1", String.class));
         resolver.commit();
-        assertEquals("value1", offsetStore.load("key1", null));
+        assertEquals("value1", offsetStore.load("key1", String.class));
     }
 
     @Test
@@ -72,8 +72,8 @@
         statusStore.store(resolver, map);
         resolver.commit();
 
-        assertEquals("value1", statusStore.load("key1", null));
-        assertEquals(false, statusStore.load("key2", null));
+        assertEquals("value1", statusStore.load("key1", String.class));
+        assertEquals(false, statusStore.load("key2", Boolean.class));
     }
 
     @Test
@@ -92,7 +92,7 @@
         statusStore.store(resolver, "key2", true);
         resolver.commit();
 
-        assertEquals("value1", statusStore.load("key1", null));
-        assertEquals(true, statusStore.load("key2", null));
+        assertEquals("value1", statusStore.load("key1", String.class));
+        assertEquals(true, statusStore.load("key2", Boolean.class));
     }
 }