SLING-9482 - Seed the cache from offset persisted in the source repository
diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/publisher/DiscoveryService.java b/src/main/java/org/apache/sling/distribution/journal/impl/publisher/DiscoveryService.java
index 1d20a55..496ff81 100644
--- a/src/main/java/org/apache/sling/distribution/journal/impl/publisher/DiscoveryService.java
+++ b/src/main/java/org/apache/sling/distribution/journal/impl/publisher/DiscoveryService.java
@@ -26,11 +26,9 @@
import java.io.Closeable;
import java.util.Dictionary;
import java.util.Hashtable;
-import java.util.Optional;
import javax.annotation.ParametersAreNonnullByDefault;
-import org.apache.sling.distribution.journal.impl.queue.impl.PubQueueCacheService;
import org.apache.sling.distribution.journal.impl.shared.PublisherConfigurationAvailable;
import org.apache.sling.distribution.journal.impl.shared.Topics;
import org.apache.sling.distribution.journal.messages.Messages;
@@ -83,9 +81,6 @@
@Reference
private TopologyChangeHandler topologyChangeHandler;
-
- @Reference
- private PubQueueCacheService pubQueueCacheService;
private volatile ServiceRegistration<?> reg;
@@ -99,12 +94,10 @@
public DiscoveryService(
MessagingProvider messagingProvider,
TopologyChangeHandler topologyChangeHandler,
- Topics topics,
- PubQueueCacheService pubQueueCacheService) {
+ Topics topics) {
this.messagingProvider = messagingProvider;
this.topologyChangeHandler = topologyChangeHandler;
this.topics = topics;
- this.pubQueueCacheService = pubQueueCacheService;
}
@Activate
@@ -135,7 +128,6 @@
TopologyView oldView = viewManager.updateView();
TopologyView newView = viewManager.getCurrentView();
handleChanges(newView, oldView);
- seedCache(newView);
}
private void handleChanges(TopologyView newView, TopologyView oldView) {
@@ -151,23 +143,6 @@
}
}
- private void seedCache(TopologyView newView) {
- /*
- * Seeding the cache requires an offset
- * corresponding to a message that has
- * already been produced.
- *
- * We don't consider states with an
- * offset smaller than 0 as those offsets
- * do not correspond to already processed
- * messages.
- */
- Optional<Long> minProcOffset = newView.offsets()
- .filter(offset -> offset != -1)
- .reduce(Long::min);
- minProcOffset.ifPresent(aLong -> pubQueueCacheService.seed(aLong));
- }
-
private void startTopologyViewUpdaterTask(BundleContext context) {
// Register periodic task to update the topology view
Dictionary<String, Object> props = new Hashtable<>();
diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueueCache.java b/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueueCache.java
index 001f745..2b04a4a 100644
--- a/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueueCache.java
+++ b/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueueCache.java
@@ -37,10 +37,12 @@
import javax.annotation.Nonnull;
import javax.annotation.ParametersAreNonnullByDefault;
+import org.apache.sling.api.resource.PersistenceException;
import org.apache.sling.distribution.journal.impl.event.DistributionEvent;
import org.apache.sling.distribution.journal.impl.shared.DistributionMetricsService;
import org.apache.sling.distribution.journal.impl.shared.JMXRegistration;
import org.apache.commons.io.IOUtils;
+import org.apache.sling.distribution.journal.impl.subscriber.LocalStore;
import org.apache.sling.distribution.queue.DistributionQueueItem;
import org.osgi.service.event.Event;
import org.osgi.service.event.EventAdmin;
@@ -92,6 +94,20 @@
*/
private final AtomicLong minOffset = new AtomicLong(Long.MAX_VALUE);
+ /**
+ * Holds the max offset of package handled by the cache.
+ * Given that the cache does not store all package messages
+ * (i.e. TEST packages are not cached), the max offset does not
+ * necessarily match the offset of the newest message cached.
+ */
+ private final AtomicLong maxOffset = new AtomicLong(-1L);
+
+ /**
+ * Holds the last known seed offset stored to the
+ * seed store.
+ */
+ private volatile long seedOffset = 0L;
+
private final Set<JMXRegistration> jmxRegs = new HashSet<>();
private final MessagingProvider messagingProvider;
@@ -102,27 +118,30 @@
private final String topic;
+ private final LocalStore seedStore;
+
private final DistributionMetricsService distributionMetricsService;
private volatile boolean closed;
- public PubQueueCache(MessagingProvider messagingProvider, EventAdmin eventAdmin, DistributionMetricsService distributionMetricsService, String topic) {
+ public PubQueueCache(MessagingProvider messagingProvider, EventAdmin eventAdmin, DistributionMetricsService distributionMetricsService, String topic, LocalStore seedStore) {
this.messagingProvider = messagingProvider;
this.eventAdmin = eventAdmin;
this.distributionMetricsService = distributionMetricsService;
this.topic = topic;
+ this.seedStore = seedStore;
+ seedOffset = seedStore.load("offset", 0);
+ seed(seedOffset);
}
- public void seed(long offset) {
- if (tailPoller == null) {
- LOG.info("Seed with offset: {}", offset);
- String assignTo = messagingProvider.assignTo(offset);
- tailPoller = messagingProvider.createPoller(
+ private void seed(long offset) {
+ LOG.info("Seed with offset: {}", offset);
+ String assignTo = messagingProvider.assignTo(offset);
+ tailPoller = messagingProvider.createPoller(
this.topic,
- Reset.latest,
+ Reset.earliest,
assignTo,
create(PackageMessage.class, this::handlePackage));
- }
}
@Nonnull
@@ -138,6 +157,19 @@
return agentQueues.getOrDefault(pubAgentName, new OffsetQueueImpl<>());
}
+ public void storeSeed() {
+ long newSeed = maxOffset.longValue();
+ if (newSeed > seedOffset) {
+ LOG.info("Store seed offset {}", newSeed);
+ try {
+ seedStore.store("offset", newSeed);
+ seedOffset = newSeed;
+ } catch (PersistenceException e) {
+ LOG.warn("Failed to persist seed offset", e);
+ }
+ }
+ }
+
public int size() {
return agentQueues.values().stream().mapToInt(OffsetQueue::getSize).sum();
}
@@ -223,11 +255,14 @@
private void updateMinOffset(long offset) {
// atomically compare and set minOffset
- // as the min between the provided offset
- // and the current minOffset
minOffset.accumulateAndGet(offset, Math::min);
}
+ private void updateMaxOffset(long offset) {
+ // atomically compare and set maxOffset
+ maxOffset.accumulateAndGet(offset, Math::max);
+ }
+
private void merge(List<FullMessage<PackageMessage>> messages) {
LOG.debug("Merging fetched offsets");
messages.stream()
@@ -275,5 +310,6 @@
private void handlePackage(final MessageInfo info, final PackageMessage message) {
merge(singletonList(new FullMessage<>(info, message)));
+ updateMaxOffset(info.getOffset());
}
}
diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueueCacheService.java b/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueueCacheService.java
index 5f794e4..304bfb4 100644
--- a/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueueCacheService.java
+++ b/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueueCacheService.java
@@ -21,6 +21,7 @@
import javax.annotation.Nonnull;
import javax.annotation.ParametersAreNonnullByDefault;
+import org.apache.sling.api.resource.ResourceResolverFactory;
import org.apache.sling.distribution.journal.impl.shared.DistributionMetricsService;
import org.apache.sling.distribution.journal.impl.shared.PublisherConfigurationAvailable;
import org.apache.sling.distribution.journal.impl.shared.Topics;
@@ -28,7 +29,9 @@
import org.apache.sling.distribution.journal.MessagingProvider;
import org.apache.sling.distribution.journal.JournalAvailable;
+import org.apache.sling.distribution.journal.impl.subscriber.LocalStore;
import org.apache.sling.distribution.queue.DistributionQueueItem;
+import org.apache.sling.settings.SlingSettingsService;
import org.osgi.service.component.annotations.Activate;
import org.osgi.service.component.annotations.Component;
import org.osgi.service.component.annotations.Deactivate;
@@ -37,17 +40,9 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import static org.apache.sling.commons.scheduler.Scheduler.PROPERTY_SCHEDULER_CONCURRENT;
-import static org.apache.sling.commons.scheduler.Scheduler.PROPERTY_SCHEDULER_PERIOD;
-
-@Component(
- service = {PubQueueCacheService.class, Runnable.class},
- property = {
- PROPERTY_SCHEDULER_CONCURRENT + ":Boolean=false",
- PROPERTY_SCHEDULER_PERIOD + ":Long=" + 12 * 60 * 60 // 12 hours
- })
+@Component(immediate = true, service = PubQueueCacheService.class)
@ParametersAreNonnullByDefault
-public class PubQueueCacheService implements Runnable {
+public class PubQueueCacheService {
private static final Logger LOG = LoggerFactory.getLogger(PubQueueCacheService.class);
@@ -81,28 +76,44 @@
@Reference
private DistributionMetricsService distributionMetricsService;
+ @Reference
+ private SlingSettingsService slingSettings;
+
+ @Reference
+ private ResourceResolverFactory resolverFactory;
+
private volatile PubQueueCache cache;
+ private String pubSlingId;
+
public PubQueueCacheService() {}
public PubQueueCacheService(MessagingProvider messagingProvider,
Topics topics,
- EventAdmin eventAdmin) {
+ EventAdmin eventAdmin,
+ SlingSettingsService slingSettingsService,
+ ResourceResolverFactory resolverFactory,
+ String pubSlingId) {
this.messagingProvider = messagingProvider;
this.topics = topics;
this.eventAdmin = eventAdmin;
+ this.slingSettings = slingSettingsService;
+ this.resolverFactory = resolverFactory;
+ this.pubSlingId = pubSlingId;
}
@Activate
public void activate() {
+ pubSlingId = slingSettings.getSlingId();
cache = newCache();
LOG.info("Started Publisher queue cache service");
}
@Deactivate
public void deactivate() {
- if (cache != null) {
- cache.close();
+ PubQueueCache queueCache = this.cache;
+ if (queueCache != null) {
+ queueCache.close();
}
LOG.info("Stopped Publisher queue cache service");
}
@@ -117,18 +128,17 @@
}
}
- public void seed(long offset) {
- if (cache != null) {
- cache.seed(offset);
- }
- }
-
- private void cleanup() {
- if (cache != null) {
- int size = cache.size();
+ /**
+ * The cleanup renew the cache when
+ * a capacity threshold has been reached.
+ */
+ public void cleanup() {
+ PubQueueCache queueCache = this.cache;
+ if (queueCache != null) {
+ int size = queueCache.size();
if (size > CLEANUP_THRESHOLD) {
LOG.info("Cleanup package cache (size={}/{})", size, CLEANUP_THRESHOLD);
- cache.close();
+ queueCache.close();
cache = newCache();
} else {
LOG.info("No cleanup required for package cache (size={}/{})", size, CLEANUP_THRESHOLD);
@@ -136,14 +146,15 @@
}
}
- private PubQueueCache newCache() {
- return new PubQueueCache(messagingProvider, eventAdmin, distributionMetricsService, topics.getPackageTopic());
+ public void storeSeed() {
+ PubQueueCache queueCache = this.cache;
+ if (queueCache != null) {
+ queueCache.storeSeed();
+ }
}
- @Override
- public void run() {
- LOG.info("Starting package cache cleanup task");
- cleanup();
- LOG.info("Stopping package cache cleanup task");
+ private PubQueueCache newCache() {
+ LocalStore seedStore = new LocalStore(resolverFactory, "seeds", pubSlingId);
+ return new PubQueueCache(messagingProvider, eventAdmin, distributionMetricsService, topics.getPackageTopic(), seedStore);
}
}
diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/QueueCacheCleanupTask.java b/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/QueueCacheCleanupTask.java
new file mode 100644
index 0000000..b2bd673
--- /dev/null
+++ b/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/QueueCacheCleanupTask.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sling.distribution.journal.impl.queue.impl;
+
+import javax.annotation.ParametersAreNonnullByDefault;
+
+import org.osgi.service.component.annotations.Component;
+import org.osgi.service.component.annotations.Reference;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.apache.sling.commons.scheduler.Scheduler.PROPERTY_SCHEDULER_CONCURRENT;
+import static org.apache.sling.commons.scheduler.Scheduler.PROPERTY_SCHEDULER_PERIOD;
+
+/**
+ * Periodical task to cleanup the resources
+ * used by the cache.
+ */
+@Component(
+ service = Runnable.class,
+ property = {
+ PROPERTY_SCHEDULER_CONCURRENT + ":Boolean=false",
+ PROPERTY_SCHEDULER_PERIOD + ":Long=" + 12 * 60 * 60 // 12 hours
+ })
+@ParametersAreNonnullByDefault
+public class QueueCacheCleanupTask implements Runnable {
+
+ private static final Logger LOG = LoggerFactory.getLogger(QueueCacheCleanupTask.class);
+
+ @Reference
+ private PubQueueCacheService queueCacheService;
+
+ @Override
+ public void run() {
+ LOG.info("Starting package cache cleanup task");
+ queueCacheService.cleanup();
+ LOG.info("Stopping package cache cleanup task");
+ }
+}
+
diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/QueueCacheSeederTask.java b/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/QueueCacheSeederTask.java
new file mode 100644
index 0000000..b18ee55
--- /dev/null
+++ b/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/QueueCacheSeederTask.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sling.distribution.journal.impl.queue.impl;
+
+import javax.annotation.ParametersAreNonnullByDefault;
+
+import org.osgi.service.component.annotations.Component;
+import org.osgi.service.component.annotations.Reference;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.apache.sling.commons.scheduler.Scheduler.PROPERTY_SCHEDULER_CONCURRENT;
+import static org.apache.sling.commons.scheduler.Scheduler.PROPERTY_SCHEDULER_PERIOD;
+import static org.apache.sling.commons.scheduler.Scheduler.PROPERTY_SCHEDULER_RUN_ON;
+import static org.apache.sling.commons.scheduler.Scheduler.VALUE_RUN_ON_LEADER;
+
+/**
+ * Periodical task to persist a cache seed
+ * to the repository. The task must run only
+ * on the leader instance to avoid concurrent
+ * writes and reduce write operations in
+ * clustered deployments.
+ */
+@Component(
+ service = Runnable.class,
+ property = {
+ PROPERTY_SCHEDULER_CONCURRENT + ":Boolean=false",
+ PROPERTY_SCHEDULER_RUN_ON + "=" + VALUE_RUN_ON_LEADER,
+ PROPERTY_SCHEDULER_PERIOD + ":Long=" + 15 * 60 // 15 minutes
+ })
+@ParametersAreNonnullByDefault
+public class QueueCacheSeederTask implements Runnable {
+
+ private static final Logger LOG = LoggerFactory.getLogger(QueueCacheSeederTask.class);
+
+ @Reference
+ private PubQueueCacheService queueCacheService;
+
+ @Override
+ public void run() {
+ LOG.debug("Starting package cache seeder task");
+ queueCacheService.storeSeed();
+ LOG.debug("Stopping package cache seeder task");
+ }
+}
diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/LocalStore.java b/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/LocalStore.java
index c619d79..b59fbde 100644
--- a/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/LocalStore.java
+++ b/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/LocalStore.java
@@ -64,6 +64,16 @@
createParent();
}
+ public synchronized void store(String key, Object value)
+ throws PersistenceException {
+ try (ResourceResolver resolver = requireNonNull(getBookKeeperServiceResolver())) {
+ store(resolver, key, value);
+ resolver.commit();
+ } catch (LoginException e) {
+ throw new RuntimeException("Failed to load data from the repository." + e.getMessage(), e);
+ }
+ }
+
public synchronized void store(ResourceResolver serviceResolver, String key, Object value)
throws PersistenceException {
store(serviceResolver, Collections.singletonMap(key, value));
diff --git a/src/test/java/org/apache/sling/distribution/journal/impl/publisher/DiscoveryServiceTest.java b/src/test/java/org/apache/sling/distribution/journal/impl/publisher/DiscoveryServiceTest.java
index bd8637b..d39f280 100644
--- a/src/test/java/org/apache/sling/distribution/journal/impl/publisher/DiscoveryServiceTest.java
+++ b/src/test/java/org/apache/sling/distribution/journal/impl/publisher/DiscoveryServiceTest.java
@@ -34,7 +34,6 @@
import org.apache.sling.distribution.journal.MessageInfo;
import org.apache.sling.distribution.journal.MessagingProvider;
import org.apache.sling.distribution.journal.Reset;
-import org.apache.sling.distribution.journal.impl.queue.impl.PubQueueCacheService;
import org.apache.sling.distribution.journal.impl.shared.TestMessageInfo;
import org.apache.sling.distribution.journal.impl.shared.Topics;
import org.apache.sling.distribution.journal.messages.Messages.DiscoveryMessage;
@@ -76,9 +75,6 @@
@Mock
TopologyChangeHandler topologyChangeHandler;
-
- @Mock
- private PubQueueCacheService pubQueueCacheService;
private MessageHandler<DiscoveryMessage> discoveryHandler;
@@ -89,7 +85,7 @@
public void before() {
discoveryService = new DiscoveryService(
clientProvider, topologyChangeHandler,
- topics, pubQueueCacheService);
+ topics);
when(clientProvider.createPoller(
Mockito.anyString(),
Mockito.any(Reset.class),
@@ -110,19 +106,6 @@
discoveryService.run();
assertThat(discoveryService.getTopologyView().getState(subAgentId, PUB1_AGENT_NAME).getOffset(), equalTo(10L));
}
-
- @Test
- public void testPubQueueCacheSeed() throws IOException {
- DiscoveryMessage message = discoveryMessage(
- SUB1_SLING_ID,
- SUB1_AGENT,
- subscriberState(PUB1_AGENT_NAME, 20),
- subscriberState(PUB1_AGENT_NAME, 10)
- );
- discoveryHandler.handle(messageInfo(0), message);
- discoveryService.run();
- verify(pubQueueCacheService).seed(Mockito.eq(10l));
- }
@After
public void after() throws IOException {
diff --git a/src/test/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueueCacheTest.java b/src/test/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueueCacheTest.java
index 4d730c3..828ed35 100644
--- a/src/test/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueueCacheTest.java
+++ b/src/test/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueueCacheTest.java
@@ -45,6 +45,7 @@
import java.util.concurrent.TimeoutException;
import java.util.stream.LongStream;
+import org.apache.sling.api.resource.ResourceResolverFactory;
import org.apache.sling.commons.metrics.Counter;
import org.apache.sling.distribution.journal.HandlerAdapter;
import org.apache.sling.distribution.journal.MessageHandler;
@@ -53,9 +54,11 @@
import org.apache.sling.distribution.journal.impl.queue.OffsetQueue;
import org.apache.sling.distribution.journal.impl.shared.DistributionMetricsService;
import org.apache.sling.distribution.journal.impl.shared.TestMessageInfo;
+import org.apache.sling.distribution.journal.impl.subscriber.LocalStore;
import org.apache.sling.distribution.journal.messages.Messages.PackageMessage;
import org.apache.sling.distribution.journal.messages.Messages.PackageMessage.ReqType;
import org.apache.sling.distribution.queue.DistributionQueueItem;
+import org.apache.sling.testing.resourceresolver.MockResourceResolverFactory;
import org.awaitility.Awaitility;
import org.awaitility.Duration;
import org.junit.After;
@@ -77,6 +80,8 @@
private static final String TOPIC = "package_topic";
+ private static final String PUB_SLING_ID = "79fd948e-9435-4128-b42f-32327ba21df3";
+
private static final String PUB_AGENT_NAME_1 = "pubAgentName1";
private static final String PUB_AGENT_NAME_2 = "pubAgentName2";
@@ -109,6 +114,8 @@
@Mock
private Closeable poller;
+ private ResourceResolverFactory resolverFactory = new MockResourceResolverFactory();
+
private PubQueueCache cache;
private ExecutorService executor;
@@ -129,8 +136,9 @@
when(distributionMetricsService.getQueueCacheFetchCount())
.thenReturn(counter);
- cache = new PubQueueCache(clientProvider, eventAdmin, distributionMetricsService, TOPIC);
- cache.seed(0);
+ LocalStore seedStore = new LocalStore(resolverFactory, "seeds", PUB_SLING_ID);
+ cache = new PubQueueCache(clientProvider, eventAdmin, distributionMetricsService, TOPIC, seedStore);
+ cache.storeSeed();
executor = Executors.newFixedThreadPool(10);
tailHandler = handlerCaptor.getValue().getHandler();
diff --git a/src/test/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueueProviderTest.java b/src/test/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueueProviderTest.java
index 3a1a27a..b93003a 100644
--- a/src/test/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueueProviderTest.java
+++ b/src/test/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueueProviderTest.java
@@ -38,6 +38,7 @@
import javax.management.ObjectName;
import javax.management.ReflectionException;
+import org.apache.sling.api.resource.ResourceResolverFactory;
import org.apache.sling.distribution.journal.impl.shared.Topics;
import org.apache.sling.distribution.journal.MessageSender;
import com.google.protobuf.GeneratedMessage;
@@ -45,6 +46,7 @@
import org.apache.sling.distribution.queue.DistributionQueueItem;
import org.apache.sling.distribution.queue.spi.DistributionQueue;
import org.apache.sling.settings.SlingSettingsService;
+import org.apache.sling.testing.resourceresolver.MockResourceResolverFactory;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
@@ -100,6 +102,8 @@
@Mock
private MessageSender<GeneratedMessage> sender;
+ private ResourceResolverFactory resolverFactory = new MockResourceResolverFactory();
+
private PubQueueCacheService pubQueueCacheService;
private MessageHandler<Messages.PackageMessage> handler;
@@ -125,12 +129,13 @@
when(clientProvider.createSender())
.thenReturn(sender);
Topics topics = new Topics();
- when(slingSettings.getSlingId()).thenReturn(UUID.randomUUID().toString());
- pubQueueCacheService = new PubQueueCacheService(clientProvider, topics, eventAdmin);
+ String slingId = UUID.randomUUID().toString();
+ when(slingSettings.getSlingId()).thenReturn(slingId);
+ pubQueueCacheService = new PubQueueCacheService(clientProvider, topics, eventAdmin, slingSettings, resolverFactory, slingId);
pubQueueCacheService.activate();
queueProvider = new PubQueueProviderImpl(pubQueueCacheService, clientProvider, topics);
queueProvider.activate();
- pubQueueCacheService.seed(0);
+ pubQueueCacheService.storeSeed();
handler = handlerCaptor.getValue().getHandler();
statHandler = statHandlerCaptor.getValue().getHandler();
}