SLING-9482 - Seed the cache from offset persisted in the source repository
diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/publisher/DiscoveryService.java b/src/main/java/org/apache/sling/distribution/journal/impl/publisher/DiscoveryService.java
index 1d20a55..496ff81 100644
--- a/src/main/java/org/apache/sling/distribution/journal/impl/publisher/DiscoveryService.java
+++ b/src/main/java/org/apache/sling/distribution/journal/impl/publisher/DiscoveryService.java
@@ -26,11 +26,9 @@
 import java.io.Closeable;
 import java.util.Dictionary;
 import java.util.Hashtable;
-import java.util.Optional;
 
 import javax.annotation.ParametersAreNonnullByDefault;
 
-import org.apache.sling.distribution.journal.impl.queue.impl.PubQueueCacheService;
 import org.apache.sling.distribution.journal.impl.shared.PublisherConfigurationAvailable;
 import org.apache.sling.distribution.journal.impl.shared.Topics;
 import org.apache.sling.distribution.journal.messages.Messages;
@@ -83,9 +81,6 @@
 
     @Reference
     private TopologyChangeHandler topologyChangeHandler;
-    
-    @Reference
-    private PubQueueCacheService pubQueueCacheService;
 
     private volatile ServiceRegistration<?> reg;
 
@@ -99,12 +94,10 @@
     public DiscoveryService(
             MessagingProvider messagingProvider,
             TopologyChangeHandler topologyChangeHandler,
-            Topics topics,
-            PubQueueCacheService pubQueueCacheService) {
+            Topics topics) {
         this.messagingProvider = messagingProvider;
         this.topologyChangeHandler = topologyChangeHandler;
         this.topics = topics;
-        this.pubQueueCacheService = pubQueueCacheService;
     }
 
     @Activate
@@ -135,7 +128,6 @@
         TopologyView oldView = viewManager.updateView();
         TopologyView newView = viewManager.getCurrentView();
         handleChanges(newView, oldView);
-        seedCache(newView);
     }
 
     private void handleChanges(TopologyView newView, TopologyView oldView) {
@@ -151,23 +143,6 @@
         }
     }
 
-    private void seedCache(TopologyView newView) {
-        /*
-         * Seeding the cache requires an offset
-         * corresponding to a message that has
-         * already been produced.
-         *
-         * We don't consider states with an
-         * offset smaller than 0 as those offsets
-         * do not correspond to already processed
-         * messages.
-         */
-        Optional<Long> minProcOffset = newView.offsets()
-                .filter(offset -> offset != -1)
-                .reduce(Long::min);
-        minProcOffset.ifPresent(aLong -> pubQueueCacheService.seed(aLong));
-    }
-
     private void startTopologyViewUpdaterTask(BundleContext context) {
         // Register periodic task to update the topology view
         Dictionary<String, Object> props = new Hashtable<>();
diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueueCache.java b/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueueCache.java
index 001f745..2b04a4a 100644
--- a/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueueCache.java
+++ b/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueueCache.java
@@ -37,10 +37,12 @@
 import javax.annotation.Nonnull;
 import javax.annotation.ParametersAreNonnullByDefault;
 
+import org.apache.sling.api.resource.PersistenceException;
 import org.apache.sling.distribution.journal.impl.event.DistributionEvent;
 import org.apache.sling.distribution.journal.impl.shared.DistributionMetricsService;
 import org.apache.sling.distribution.journal.impl.shared.JMXRegistration;
 import org.apache.commons.io.IOUtils;
+import org.apache.sling.distribution.journal.impl.subscriber.LocalStore;
 import org.apache.sling.distribution.queue.DistributionQueueItem;
 import org.osgi.service.event.Event;
 import org.osgi.service.event.EventAdmin;
@@ -92,6 +94,20 @@
      */
     private final AtomicLong minOffset = new AtomicLong(Long.MAX_VALUE);
 
+    /**
+     * Holds the max offset of package handled by the cache.
+     * Given that the cache does not store all package messages
+     * (i.e. TEST packages are not cached), the max offset does not
+     * necessarily match the offset of the newest message cached.
+     */
+    private final AtomicLong maxOffset = new AtomicLong(-1L);
+
+    /**
+     * Holds the last known seed offset stored to the
+     * seed store.
+     */
+    private volatile long seedOffset = 0L;
+
     private final Set<JMXRegistration> jmxRegs = new HashSet<>();
 
     private final MessagingProvider messagingProvider;
@@ -102,27 +118,30 @@
 
     private final String topic;
 
+    private final LocalStore seedStore;
+
     private final DistributionMetricsService distributionMetricsService;
     
     private volatile boolean closed;
 
-    public PubQueueCache(MessagingProvider messagingProvider, EventAdmin eventAdmin, DistributionMetricsService distributionMetricsService, String topic) {
+    public PubQueueCache(MessagingProvider messagingProvider, EventAdmin eventAdmin, DistributionMetricsService distributionMetricsService, String topic, LocalStore seedStore) {
         this.messagingProvider = messagingProvider;
         this.eventAdmin = eventAdmin;
         this.distributionMetricsService = distributionMetricsService;
         this.topic = topic;
+        this.seedStore = seedStore;
+        seedOffset = seedStore.load("offset", 0);
+        seed(seedOffset);
     }
 
-    public void seed(long offset) {
-        if (tailPoller == null) {
-            LOG.info("Seed with offset: {}", offset);
-            String assignTo = messagingProvider.assignTo(offset);
-            tailPoller = messagingProvider.createPoller(
+    private void seed(long offset) {
+        LOG.info("Seed with offset: {}", offset);
+        String assignTo = messagingProvider.assignTo(offset);
+        tailPoller = messagingProvider.createPoller(
                 this.topic,
-                Reset.latest,
+                Reset.earliest,
                 assignTo,
                 create(PackageMessage.class, this::handlePackage));
-        }
     }
 
     @Nonnull
@@ -138,6 +157,19 @@
         return agentQueues.getOrDefault(pubAgentName, new OffsetQueueImpl<>());
     }
 
+    public void storeSeed() {
+        long newSeed = maxOffset.longValue();
+        if (newSeed > seedOffset) {
+            LOG.info("Store seed offset {}", newSeed);
+            try {
+                seedStore.store("offset", newSeed);
+                seedOffset = newSeed;
+            } catch (PersistenceException e) {
+                LOG.warn("Failed to persist seed offset", e);
+            }
+        }
+    }
+
     public int size() {
         return agentQueues.values().stream().mapToInt(OffsetQueue::getSize).sum();
     }
@@ -223,11 +255,14 @@
 
     private void updateMinOffset(long offset) {
         // atomically compare and set minOffset
-        // as the min between the provided offset
-        // and the current minOffset
         minOffset.accumulateAndGet(offset, Math::min);
     }
 
+    private void updateMaxOffset(long offset) {
+        // atomically compare and set maxOffset
+        maxOffset.accumulateAndGet(offset, Math::max);
+    }
+
     private void merge(List<FullMessage<PackageMessage>> messages) {
         LOG.debug("Merging fetched offsets");
         messages.stream()
@@ -275,5 +310,6 @@
 
     private void handlePackage(final MessageInfo info, final PackageMessage message) {
         merge(singletonList(new FullMessage<>(info, message)));
+        updateMaxOffset(info.getOffset());
     }
 }
diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueueCacheService.java b/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueueCacheService.java
index 5f794e4..304bfb4 100644
--- a/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueueCacheService.java
+++ b/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueueCacheService.java
@@ -21,6 +21,7 @@
 import javax.annotation.Nonnull;
 import javax.annotation.ParametersAreNonnullByDefault;
 
+import org.apache.sling.api.resource.ResourceResolverFactory;
 import org.apache.sling.distribution.journal.impl.shared.DistributionMetricsService;
 import org.apache.sling.distribution.journal.impl.shared.PublisherConfigurationAvailable;
 import org.apache.sling.distribution.journal.impl.shared.Topics;
@@ -28,7 +29,9 @@
 import org.apache.sling.distribution.journal.MessagingProvider;
 import org.apache.sling.distribution.journal.JournalAvailable;
 
+import org.apache.sling.distribution.journal.impl.subscriber.LocalStore;
 import org.apache.sling.distribution.queue.DistributionQueueItem;
+import org.apache.sling.settings.SlingSettingsService;
 import org.osgi.service.component.annotations.Activate;
 import org.osgi.service.component.annotations.Component;
 import org.osgi.service.component.annotations.Deactivate;
@@ -37,17 +40,9 @@
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import static org.apache.sling.commons.scheduler.Scheduler.PROPERTY_SCHEDULER_CONCURRENT;
-import static org.apache.sling.commons.scheduler.Scheduler.PROPERTY_SCHEDULER_PERIOD;
-
-@Component(
-        service = {PubQueueCacheService.class, Runnable.class},
-        property = {
-                PROPERTY_SCHEDULER_CONCURRENT + ":Boolean=false",
-                PROPERTY_SCHEDULER_PERIOD + ":Long=" + 12 * 60 * 60 // 12 hours
-        })
+@Component(immediate = true, service = PubQueueCacheService.class)
 @ParametersAreNonnullByDefault
-public class PubQueueCacheService implements Runnable {
+public class PubQueueCacheService {
 
     private static final Logger LOG = LoggerFactory.getLogger(PubQueueCacheService.class);
 
@@ -81,28 +76,44 @@
     @Reference
     private DistributionMetricsService distributionMetricsService;
 
+    @Reference
+    private SlingSettingsService slingSettings;
+
+    @Reference
+    private ResourceResolverFactory resolverFactory;
+
     private volatile PubQueueCache cache;
 
+    private String pubSlingId;
+
     public PubQueueCacheService() {}
 
     public PubQueueCacheService(MessagingProvider messagingProvider,
                                 Topics topics,
-                                EventAdmin eventAdmin) {
+                                EventAdmin eventAdmin,
+                                SlingSettingsService slingSettingsService,
+                                ResourceResolverFactory resolverFactory,
+                                String pubSlingId) {
         this.messagingProvider = messagingProvider;
         this.topics = topics;
         this.eventAdmin = eventAdmin;
+        this.slingSettings = slingSettingsService;
+        this.resolverFactory = resolverFactory;
+        this.pubSlingId = pubSlingId;
     }
 
     @Activate
     public void activate() {
+        pubSlingId = slingSettings.getSlingId();
         cache = newCache();
         LOG.info("Started Publisher queue cache service");
     }
 
     @Deactivate
     public void deactivate() {
-        if (cache != null) {
-            cache.close();
+        PubQueueCache queueCache = this.cache;
+        if (queueCache != null) {
+            queueCache.close();
         }
         LOG.info("Stopped Publisher queue cache service");
     }
@@ -117,18 +128,17 @@
         }
     }
 
-    public void seed(long offset) {
-        if (cache != null) {
-            cache.seed(offset);
-        }
-    }
-
-    private void cleanup() {
-        if (cache != null) {
-            int size = cache.size();
+    /**
+     * The cleanup renew the cache when
+     * a capacity threshold has been reached.
+     */
+    public void cleanup() {
+        PubQueueCache queueCache = this.cache;
+        if (queueCache != null) {
+            int size = queueCache.size();
             if (size > CLEANUP_THRESHOLD) {
                 LOG.info("Cleanup package cache (size={}/{})", size, CLEANUP_THRESHOLD);
-                cache.close();
+                queueCache.close();
                 cache = newCache();
             } else {
                 LOG.info("No cleanup required for package cache (size={}/{})", size, CLEANUP_THRESHOLD);
@@ -136,14 +146,15 @@
         }
     }
 
-    private PubQueueCache newCache() {
-        return new PubQueueCache(messagingProvider, eventAdmin, distributionMetricsService, topics.getPackageTopic());
+    public void storeSeed() {
+        PubQueueCache queueCache = this.cache;
+        if (queueCache != null) {
+            queueCache.storeSeed();
+        }
     }
 
-    @Override
-    public void run() {
-        LOG.info("Starting package cache cleanup task");
-        cleanup();
-        LOG.info("Stopping package cache cleanup task");
+    private PubQueueCache newCache() {
+        LocalStore seedStore = new LocalStore(resolverFactory, "seeds", pubSlingId);
+        return new PubQueueCache(messagingProvider, eventAdmin, distributionMetricsService, topics.getPackageTopic(), seedStore);
     }
 }
diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/QueueCacheCleanupTask.java b/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/QueueCacheCleanupTask.java
new file mode 100644
index 0000000..b2bd673
--- /dev/null
+++ b/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/QueueCacheCleanupTask.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sling.distribution.journal.impl.queue.impl;
+
+import javax.annotation.ParametersAreNonnullByDefault;
+
+import org.osgi.service.component.annotations.Component;
+import org.osgi.service.component.annotations.Reference;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.apache.sling.commons.scheduler.Scheduler.PROPERTY_SCHEDULER_CONCURRENT;
+import static org.apache.sling.commons.scheduler.Scheduler.PROPERTY_SCHEDULER_PERIOD;
+
+/**
+ * Periodical task to cleanup the resources
+ * used by the cache.
+ */
+@Component(
+        service = Runnable.class,
+        property = {
+                PROPERTY_SCHEDULER_CONCURRENT + ":Boolean=false",
+                PROPERTY_SCHEDULER_PERIOD + ":Long=" + 12 * 60 * 60 // 12 hours
+        })
+@ParametersAreNonnullByDefault
+public class QueueCacheCleanupTask implements Runnable {
+
+    private static final Logger LOG = LoggerFactory.getLogger(QueueCacheCleanupTask.class);
+
+    @Reference
+    private PubQueueCacheService queueCacheService;
+
+    @Override
+    public void run() {
+        LOG.info("Starting package cache cleanup task");
+        queueCacheService.cleanup();
+        LOG.info("Stopping package cache cleanup task");
+    }
+}
+
diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/QueueCacheSeederTask.java b/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/QueueCacheSeederTask.java
new file mode 100644
index 0000000..b18ee55
--- /dev/null
+++ b/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/QueueCacheSeederTask.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sling.distribution.journal.impl.queue.impl;
+
+import javax.annotation.ParametersAreNonnullByDefault;
+
+import org.osgi.service.component.annotations.Component;
+import org.osgi.service.component.annotations.Reference;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.apache.sling.commons.scheduler.Scheduler.PROPERTY_SCHEDULER_CONCURRENT;
+import static org.apache.sling.commons.scheduler.Scheduler.PROPERTY_SCHEDULER_PERIOD;
+import static org.apache.sling.commons.scheduler.Scheduler.PROPERTY_SCHEDULER_RUN_ON;
+import static org.apache.sling.commons.scheduler.Scheduler.VALUE_RUN_ON_LEADER;
+
+/**
+ * Periodical task to persist a cache seed
+ * to the repository. The task must run only
+ * on the leader instance to avoid concurrent
+ * writes and reduce write operations in
+ * clustered deployments.
+ */
+@Component(
+        service = Runnable.class,
+        property = {
+                PROPERTY_SCHEDULER_CONCURRENT + ":Boolean=false",
+                PROPERTY_SCHEDULER_RUN_ON + "=" +  VALUE_RUN_ON_LEADER,
+                PROPERTY_SCHEDULER_PERIOD + ":Long=" + 15 * 60 // 15 minutes
+        })
+@ParametersAreNonnullByDefault
+public class QueueCacheSeederTask implements Runnable {
+
+    private static final Logger LOG = LoggerFactory.getLogger(QueueCacheSeederTask.class);
+
+    @Reference
+    private PubQueueCacheService queueCacheService;
+
+    @Override
+    public void run() {
+        LOG.debug("Starting package cache seeder task");
+        queueCacheService.storeSeed();
+        LOG.debug("Stopping package cache seeder task");
+    }
+}
diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/LocalStore.java b/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/LocalStore.java
index c619d79..b59fbde 100644
--- a/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/LocalStore.java
+++ b/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/LocalStore.java
@@ -64,6 +64,16 @@
         createParent();
     }
 
+    public synchronized void store(String key, Object value)
+            throws PersistenceException {
+        try (ResourceResolver resolver = requireNonNull(getBookKeeperServiceResolver())) {
+            store(resolver, key, value);
+            resolver.commit();
+        } catch (LoginException e) {
+            throw new RuntimeException("Failed to load data from the repository." + e.getMessage(), e);
+        }
+    }
+
     public synchronized void store(ResourceResolver serviceResolver, String key, Object value)
             throws PersistenceException {
         store(serviceResolver, Collections.singletonMap(key, value));
diff --git a/src/test/java/org/apache/sling/distribution/journal/impl/publisher/DiscoveryServiceTest.java b/src/test/java/org/apache/sling/distribution/journal/impl/publisher/DiscoveryServiceTest.java
index bd8637b..d39f280 100644
--- a/src/test/java/org/apache/sling/distribution/journal/impl/publisher/DiscoveryServiceTest.java
+++ b/src/test/java/org/apache/sling/distribution/journal/impl/publisher/DiscoveryServiceTest.java
@@ -34,7 +34,6 @@
 import org.apache.sling.distribution.journal.MessageInfo;
 import org.apache.sling.distribution.journal.MessagingProvider;
 import org.apache.sling.distribution.journal.Reset;
-import org.apache.sling.distribution.journal.impl.queue.impl.PubQueueCacheService;
 import org.apache.sling.distribution.journal.impl.shared.TestMessageInfo;
 import org.apache.sling.distribution.journal.impl.shared.Topics;
 import org.apache.sling.distribution.journal.messages.Messages.DiscoveryMessage;
@@ -76,9 +75,6 @@
 
     @Mock
     TopologyChangeHandler topologyChangeHandler;
-    
-    @Mock
-    private PubQueueCacheService pubQueueCacheService;
 
     private MessageHandler<DiscoveryMessage> discoveryHandler;
     
@@ -89,7 +85,7 @@
     public void before() {
         discoveryService = new DiscoveryService(
                 clientProvider, topologyChangeHandler, 
-                topics, pubQueueCacheService);
+                topics);
         when(clientProvider.createPoller(
                 Mockito.anyString(), 
                 Mockito.any(Reset.class),
@@ -110,19 +106,6 @@
         discoveryService.run();
         assertThat(discoveryService.getTopologyView().getState(subAgentId, PUB1_AGENT_NAME).getOffset(), equalTo(10L));
     }
-
-    @Test
-    public void testPubQueueCacheSeed() throws IOException {
-        DiscoveryMessage message = discoveryMessage(
-                SUB1_SLING_ID, 
-                SUB1_AGENT, 
-                subscriberState(PUB1_AGENT_NAME, 20),
-                subscriberState(PUB1_AGENT_NAME, 10)
-                );
-        discoveryHandler.handle(messageInfo(0), message);
-        discoveryService.run();
-        verify(pubQueueCacheService).seed(Mockito.eq(10l));
-    }
     
     @After
     public void after() throws IOException {
diff --git a/src/test/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueueCacheTest.java b/src/test/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueueCacheTest.java
index 4d730c3..828ed35 100644
--- a/src/test/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueueCacheTest.java
+++ b/src/test/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueueCacheTest.java
@@ -45,6 +45,7 @@
 import java.util.concurrent.TimeoutException;
 import java.util.stream.LongStream;
 
+import org.apache.sling.api.resource.ResourceResolverFactory;
 import org.apache.sling.commons.metrics.Counter;
 import org.apache.sling.distribution.journal.HandlerAdapter;
 import org.apache.sling.distribution.journal.MessageHandler;
@@ -53,9 +54,11 @@
 import org.apache.sling.distribution.journal.impl.queue.OffsetQueue;
 import org.apache.sling.distribution.journal.impl.shared.DistributionMetricsService;
 import org.apache.sling.distribution.journal.impl.shared.TestMessageInfo;
+import org.apache.sling.distribution.journal.impl.subscriber.LocalStore;
 import org.apache.sling.distribution.journal.messages.Messages.PackageMessage;
 import org.apache.sling.distribution.journal.messages.Messages.PackageMessage.ReqType;
 import org.apache.sling.distribution.queue.DistributionQueueItem;
+import org.apache.sling.testing.resourceresolver.MockResourceResolverFactory;
 import org.awaitility.Awaitility;
 import org.awaitility.Duration;
 import org.junit.After;
@@ -77,6 +80,8 @@
 
     private static final String TOPIC = "package_topic";
 
+    private static final String PUB_SLING_ID = "79fd948e-9435-4128-b42f-32327ba21df3";
+
     private static final String PUB_AGENT_NAME_1 = "pubAgentName1";
 
     private static final String PUB_AGENT_NAME_2 = "pubAgentName2";
@@ -109,6 +114,8 @@
     @Mock
     private Closeable poller;
 
+    private ResourceResolverFactory resolverFactory = new MockResourceResolverFactory();
+
     private PubQueueCache cache;
 
     private ExecutorService executor;
@@ -129,8 +136,9 @@
         when(distributionMetricsService.getQueueCacheFetchCount())
                 .thenReturn(counter);
 
-        cache = new PubQueueCache(clientProvider, eventAdmin, distributionMetricsService, TOPIC);
-        cache.seed(0);
+        LocalStore seedStore = new LocalStore(resolverFactory, "seeds", PUB_SLING_ID);
+        cache = new PubQueueCache(clientProvider, eventAdmin, distributionMetricsService, TOPIC, seedStore);
+        cache.storeSeed();
 
         executor = Executors.newFixedThreadPool(10);
         tailHandler = handlerCaptor.getValue().getHandler();
diff --git a/src/test/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueueProviderTest.java b/src/test/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueueProviderTest.java
index 3a1a27a..b93003a 100644
--- a/src/test/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueueProviderTest.java
+++ b/src/test/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueueProviderTest.java
@@ -38,6 +38,7 @@
 import javax.management.ObjectName;
 import javax.management.ReflectionException;
 
+import org.apache.sling.api.resource.ResourceResolverFactory;
 import org.apache.sling.distribution.journal.impl.shared.Topics;
 import org.apache.sling.distribution.journal.MessageSender;
 import com.google.protobuf.GeneratedMessage;
@@ -45,6 +46,7 @@
 import org.apache.sling.distribution.queue.DistributionQueueItem;
 import org.apache.sling.distribution.queue.spi.DistributionQueue;
 import org.apache.sling.settings.SlingSettingsService;
+import org.apache.sling.testing.resourceresolver.MockResourceResolverFactory;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
@@ -100,6 +102,8 @@
     @Mock
     private MessageSender<GeneratedMessage> sender;
 
+    private ResourceResolverFactory resolverFactory = new MockResourceResolverFactory();
+
     private PubQueueCacheService pubQueueCacheService;
 
     private MessageHandler<Messages.PackageMessage> handler;
@@ -125,12 +129,13 @@
         when(clientProvider.createSender())
         .thenReturn(sender);
         Topics topics = new Topics();
-        when(slingSettings.getSlingId()).thenReturn(UUID.randomUUID().toString());
-        pubQueueCacheService = new PubQueueCacheService(clientProvider, topics, eventAdmin);
+        String slingId = UUID.randomUUID().toString();
+        when(slingSettings.getSlingId()).thenReturn(slingId);
+        pubQueueCacheService = new PubQueueCacheService(clientProvider, topics, eventAdmin, slingSettings, resolverFactory, slingId);
         pubQueueCacheService.activate();
         queueProvider = new PubQueueProviderImpl(pubQueueCacheService, clientProvider, topics);
         queueProvider.activate();
-        pubQueueCacheService.seed(0);
+        pubQueueCacheService.storeSeed();
         handler = handlerCaptor.getValue().getHandler();
         statHandler = statHandlerCaptor.getValue().getHandler();
     }