Merge pull request #38 from Buuhuu/SLING-9495
SLING-9495: send the request type as distribution.type
diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueueCache.java b/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueueCache.java
index b2f0135..ec811d2 100644
--- a/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueueCache.java
+++ b/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueueCache.java
@@ -106,7 +106,7 @@
* Holds the last known seed offset stored to the
* seed store.
*/
- private volatile long seedOffset = 0L;
+ private volatile long seedOffset = -1L;
private final Set<JMXRegistration> jmxRegs = new HashSet<>();
@@ -116,36 +116,42 @@
private volatile Closeable tailPoller;
+ private final QueueCacheSeeder seeder;
+
private final String topic;
private final LocalStore seedStore;
private final DistributionMetricsService distributionMetricsService;
- private volatile boolean closed;
-
- public PubQueueCache(MessagingProvider messagingProvider, EventAdmin eventAdmin, DistributionMetricsService distributionMetricsService, String topic, LocalStore seedStore) {
+ public PubQueueCache(MessagingProvider messagingProvider, EventAdmin eventAdmin, DistributionMetricsService distributionMetricsService, String topic, LocalStore seedStore, QueueCacheSeeder seeder) {
this.messagingProvider = messagingProvider;
this.eventAdmin = eventAdmin;
this.distributionMetricsService = distributionMetricsService;
this.topic = topic;
this.seedStore = seedStore;
- Long offset = seedStore.load().get("offset", Long.class);
+ this.seeder = seeder;
+ Long offset = seedStore.load("offset", Long.class);
if (offset != null) {
seedOffset = offset;
+ startPoller(seedOffset);
+ /*
+ * We need at least one seeding message
+ * for cases where the seedOffset is no
+ * longer on the journal.
+ */
+ seeder.seedOne();
} else {
/*
* Fallback to seeding messages when
* no offset could be found in the
* repository.
*/
- seedOffset = messagingProvider.retrieveOffset(topic, Reset.latest);
- storeSeed(seedOffset);
+ seeder.seed(this::startPoller);
}
- seed(seedOffset);
}
- private void seed(long offset) {
+ private void startPoller(long offset) {
LOG.info("Seed with offset: {}", offset);
String assignTo = messagingProvider.assignTo(offset);
tailPoller = messagingProvider.createPoller(
@@ -157,14 +163,10 @@
@Nonnull
public OffsetQueue<DistributionQueueItem> getOffsetQueue(String pubAgentName, long minOffset) throws InterruptedException {
- if (minOffset > 0) {
- /*
- * we fetch data only when requested
- * at least one processed offset
- */
- waitSeeded();
- fetchIfNeeded(minOffset);
+ if (!isSeeded()) {
+ throw new RuntimeException("Gave up waiting for seeded cache");
}
+ fetchIfNeeded(minOffset);
return agentQueues.getOrDefault(pubAgentName, new OffsetQueueImpl<>());
}
@@ -190,8 +192,8 @@
}
public void close() {
- closed = true;
IOUtils.closeQuietly(tailPoller);
+ IOUtils.closeQuietly(seeder);
jmxRegs.forEach(IOUtils::closeQuietly);
}
@@ -253,15 +255,8 @@
updateMinOffset(requestedMinOffset);
}
- private void waitSeeded() throws InterruptedException {
- long start = System.currentTimeMillis();
- while (getMinOffset() == Long.MAX_VALUE) {
- LOG.debug("Waiting for seeded cache");
- if (closed || System.currentTimeMillis() - start > MAX_FETCH_WAIT_MS) {
- throw new RuntimeException("Gave up waiting for seeded cache");
- }
- Thread.sleep(1000);
- }
+ private boolean isSeeded() {
+ return getMinOffset() != Long.MAX_VALUE;
}
protected long getMinOffset() {
diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueueCacheService.java b/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueueCacheService.java
index 304bfb4..2eddc6f 100644
--- a/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueueCacheService.java
+++ b/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueueCacheService.java
@@ -155,6 +155,8 @@
private PubQueueCache newCache() {
LocalStore seedStore = new LocalStore(resolverFactory, "seeds", pubSlingId);
- return new PubQueueCache(messagingProvider, eventAdmin, distributionMetricsService, topics.getPackageTopic(), seedStore);
+ String topic = topics.getPackageTopic();
+ QueueCacheSeeder seeder = new QueueCacheSeeder(messagingProvider, topic);
+ return new PubQueueCache(messagingProvider, eventAdmin, distributionMetricsService, topic, seedStore, seeder);
}
}
diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/QueueCacheSeeder.java b/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/QueueCacheSeeder.java
new file mode 100644
index 0000000..79f6063
--- /dev/null
+++ b/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/QueueCacheSeeder.java
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sling.distribution.journal.impl.queue.impl;
+
+import java.io.Closeable;
+import java.util.UUID;
+import java.util.function.LongConsumer;
+
+import org.apache.commons.io.IOUtils;
+import org.apache.sling.distribution.journal.MessageSender;
+import org.apache.sling.distribution.journal.MessagingException;
+import org.apache.sling.distribution.journal.MessagingProvider;
+import org.apache.sling.distribution.journal.Reset;
+import org.apache.sling.distribution.journal.messages.Messages.PackageMessage;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.apache.sling.distribution.journal.HandlerAdapter.create;
+import static org.apache.sling.distribution.journal.RunnableUtil.startBackgroundThread;
+
+public class QueueCacheSeeder implements Closeable {
+
+
+ private static final Logger LOG = LoggerFactory.getLogger(QueueCacheSeeder.class);
+
+ /**
+ * Interval in millisecond between two seeding messages to seed the cache.
+ */
+ private static final long CACHE_SEEDING_DELAY_MS = 10_000;
+
+ private final String topic;
+
+ private final MessagingProvider provider;
+
+ private volatile Closeable poller;
+
+ private volatile boolean closed;
+
+ public QueueCacheSeeder(MessagingProvider provider, String topic) {
+ this.provider = provider;
+ this.topic = topic;
+ }
+
+ public void seedOne() {
+ startBackgroundThread(this::sendSeedingMessage, "Seeder thread - one seed");
+ }
+
+ public void seed(LongConsumer callback) {
+ poller = provider.createPoller(topic, Reset.latest,
+ create(PackageMessage.class, (info, msg) -> {
+ close();
+ callback.accept(info.getOffset());
+ }));
+ startBackgroundThread(this::sendSeedingMessages, "Seeder thread");
+ }
+
+ @Override
+ public void close() {
+ closed = true;
+ IOUtils.closeQuietly(poller);
+ }
+
+ private void sendSeedingMessages() {
+ LOG.info("Start message seeder");
+ try {
+ MessageSender<PackageMessage> sender = provider.createSender();
+ while (!closed) {
+ sendSeedingMessage(sender);
+ delay(CACHE_SEEDING_DELAY_MS);
+ }
+ } finally {
+ LOG.info("Stop message seeder");
+ }
+ }
+
+ private void sendSeedingMessage() {
+ sendSeedingMessage(provider.createSender());
+ }
+
+ private void sendSeedingMessage(MessageSender<PackageMessage> sender) {
+ PackageMessage pkgMsg = createTestMessage();
+ LOG.info("Send seeding message");
+ try {
+ sender.send(topic, pkgMsg);
+ } catch (MessagingException e) {
+ LOG.warn(e.getMessage(), e);
+ delay(CACHE_SEEDING_DELAY_MS * 10);
+ }
+ }
+
+ private static void delay(long sleepMs) {
+ try {
+ Thread.sleep(sleepMs);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ }
+ }
+
+ protected PackageMessage createTestMessage() {
+ String pkgId = UUID.randomUUID().toString();
+ return PackageMessage.newBuilder()
+ .setPubSlingId("seeder")
+ .setPkgId(pkgId)
+ .setPkgType("seeder")
+ .setReqType(PackageMessage.ReqType.TEST)
+ .build();
+ }
+}
diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/LocalStore.java b/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/LocalStore.java
index b59fbde..878c831 100644
--- a/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/LocalStore.java
+++ b/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/LocalStore.java
@@ -91,6 +91,10 @@
LOG.debug(String.format("Stored data %s for storeId %s", map.toString(), storeId));
}
+ public <T> T load(String key, Class<T> clazz) {
+ return load().get(key, clazz);
+ }
+
public <T> T load(String key, T defaultValue) {
LOG.debug(String.format("Loading key %s for storeId %s with default value %s", key, storeId, defaultValue));
return load().get(key, defaultValue);
diff --git a/src/test/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueueCacheTest.java b/src/test/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueueCacheTest.java
index 828ed35..3f0dc5e 100644
--- a/src/test/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueueCacheTest.java
+++ b/src/test/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueueCacheTest.java
@@ -24,25 +24,22 @@
import static org.apache.sling.distribution.journal.messages.Messages.PackageMessage.ReqType.DELETE;
import static org.apache.sling.distribution.journal.messages.Messages.PackageMessage.ReqType.TEST;
import static org.hamcrest.CoreMatchers.notNullValue;
+import static org.hamcrest.Matchers.greaterThan;
import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertThat;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.anyLong;
+import static org.mockito.Matchers.anyString;
import static org.mockito.Matchers.eq;
import static org.mockito.Mockito.when;
import java.io.Closeable;
import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
import java.util.Random;
import java.util.UUID;
-import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
-import java.util.concurrent.TimeoutException;
import java.util.stream.LongStream;
import org.apache.sling.api.resource.ResourceResolverFactory;
@@ -60,7 +57,6 @@
import org.apache.sling.distribution.queue.DistributionQueueItem;
import org.apache.sling.testing.resourceresolver.MockResourceResolverFactory;
import org.awaitility.Awaitility;
-import org.awaitility.Duration;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
@@ -89,9 +85,6 @@
private static final String PUB_AGENT_NAME_3 = "pubAgentName3";
private static final Random RAND = new Random();
-
- @Captor
- private ArgumentCaptor<PackageMessage> seedingMessageCaptor;
@Captor
private ArgumentCaptor<HandlerAdapter<PackageMessage>> handlerCaptor;
@@ -106,12 +99,18 @@
private MessagingProvider clientProvider;
@Mock
+ private QueueCacheSeeder cacheSeeder;
+
+ @Mock
private DistributionMetricsService distributionMetricsService;
@Mock
private Counter counter;
@Mock
+ private LocalStore seedStore;
+
+ @Mock
private Closeable poller;
private ResourceResolverFactory resolverFactory = new MockResourceResolverFactory();
@@ -136,8 +135,9 @@
when(distributionMetricsService.getQueueCacheFetchCount())
.thenReturn(counter);
- LocalStore seedStore = new LocalStore(resolverFactory, "seeds", PUB_SLING_ID);
- cache = new PubQueueCache(clientProvider, eventAdmin, distributionMetricsService, TOPIC, seedStore);
+ when(seedStore.load(anyString(), any())).thenReturn(0L);
+
+ cache = new PubQueueCache(clientProvider, eventAdmin, distributionMetricsService, TOPIC, seedStore, cacheSeeder);
cache.storeSeed();
executor = Executors.newFixedThreadPool(10);
@@ -149,36 +149,24 @@
executor.shutdownNow();
cache.close();
}
-
- @Test
- public void testSeedingFromNewPackageMessage() throws Exception {
- Future<OffsetQueue<DistributionQueueItem>> consumer = consumer(PUB_AGENT_NAME_1, 0);
- // The consumer is blocked until the cache is seeded
- assertFalse(consumer.isDone());
- // sending any package message seeds the cache
- simulateMessage(tailHandler, 0);
- consumer.get(15, SECONDS);
+
+ @Test(expected = RuntimeException.class)
+ public void testUnseededThrows() throws Exception {
+ cache.getOffsetQueue(PUB_AGENT_NAME_1, 0);
}
@Test
- public void testSeedingConcurrentConsumers() throws Exception {
- List<Future<OffsetQueue<DistributionQueueItem>>> consumers = new ArrayList<>();
- // minOffset > 0 to leverage the cache
- consumers.add(consumer(PUB_AGENT_NAME_1, 1));
- consumers.add(consumer(PUB_AGENT_NAME_2, 1));
- consumers.add(consumer(PUB_AGENT_NAME_3, 1));
- // All consumers are blocked until the cache is seeded
- consumers.forEach(future -> assertFalse(future.isDone()));
- simulateMessage(tailHandler, 1);
- consumers.forEach(future -> assertNotNull(get(future)));
+ public void testSeedingFromNewPackageMessage() throws Exception {
+ simulateMessage(tailHandler, PUB_AGENT_NAME_1, ADD, 0);
+ OffsetQueue<DistributionQueueItem> queue = cache.getOffsetQueue(PUB_AGENT_NAME_1, 0);
+ assertThat(queue.getSize(), greaterThan(0));
}
@Test
public void testFetchWithSingleConsumer() throws Exception {
- // build a consumer form offset 100
+ simulateMessage(tailHandler, 200);
Future<OffsetQueue<DistributionQueueItem>> consumer = consumer(PUB_AGENT_NAME_1, 100);
// seeding the cache with a message at offset 200
- simulateMessage(tailHandler, 200);
// wait that the consumer has started fetching the offsets from 100 to 200
awaitHeadHandler();
// simulate messages for the fetched offsets
@@ -195,11 +183,11 @@
@Test
public void testFetchWithConcurrentConsumer() throws Exception {
+ simulateMessage(tailHandler, 200);
// build two consumers for same agent queue, from offset 100
Future<OffsetQueue<DistributionQueueItem>> consumer1 = consumer(PUB_AGENT_NAME_1, 100);
Future<OffsetQueue<DistributionQueueItem>> consumer2 = consumer(PUB_AGENT_NAME_1, 100);
// seeding the cache with a message at offset 200
- simulateMessage(tailHandler, 200);
// wait that one consumer has started fetching the offsets from 100 to 200
MessageHandler<PackageMessage> headHandler = awaitHeadHandler();
// simulate messages for the fetched offsets
@@ -216,26 +204,15 @@
@Test
public void testCacheSize() throws Exception {
- Future<OffsetQueue<DistributionQueueItem>> consumer = consumer(PUB_AGENT_NAME_1, 0);
simulateMessage(tailHandler, PUB_AGENT_NAME_3, ADD, 0);
simulateMessage(tailHandler, PUB_AGENT_NAME_3, DELETE, 1);
simulateMessage(tailHandler, PUB_AGENT_NAME_1, ADD, 2);
simulateMessage(tailHandler, PUB_AGENT_NAME_3, TEST, 3); // TEST message does not increase the cache size
simulateMessage(tailHandler, PUB_AGENT_NAME_2, TEST, 4); // TEST message does not increase the cache size
simulateMessage(tailHandler, PUB_AGENT_NAME_3, ADD, 5);
- consumer.get(15, SECONDS);
assertEquals(4, cache.size());
}
- @Test(expected = ExecutionException.class)
- public void testCloseUnseededPoller() throws Throwable {
- Future<OffsetQueue<DistributionQueueItem>> task = consumer(PUB_AGENT_NAME_1, 1); // minOffset > 0 to leverage the cache
- Awaitility.setDefaultPollDelay(Duration.ZERO);
- cache.close();
- task.get();
- }
-
-
private void simulateMessages(MessageHandler<PackageMessage> handler, long fromOffset, long toOffset) {
LongStream.rangeClosed(fromOffset, toOffset).forEach(offset -> simulateMessage(handler, offset));
}
@@ -266,14 +243,6 @@
return executor.submit(() -> cache.getOffsetQueue(pubAgentName, minOffset));
}
- private OffsetQueue<DistributionQueueItem> get(Future<OffsetQueue<DistributionQueueItem>> future) {
- try {
- return future.get(1, SECONDS);
- } catch (TimeoutException | InterruptedException | ExecutionException e) {
- throw new RuntimeException(e);
- }
- }
-
@SafeVarargs
private final <T> T pickAny(T... c) {
if (c == null || c.length == 0) {
diff --git a/src/test/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueueProviderTest.java b/src/test/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueueProviderTest.java
index b93003a..009235c 100644
--- a/src/test/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueueProviderTest.java
+++ b/src/test/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueueProviderTest.java
@@ -38,10 +38,13 @@
import javax.management.ObjectName;
import javax.management.ReflectionException;
+import org.apache.sling.api.resource.PersistenceException;
import org.apache.sling.api.resource.ResourceResolverFactory;
import org.apache.sling.distribution.journal.impl.shared.Topics;
import org.apache.sling.distribution.journal.MessageSender;
import com.google.protobuf.GeneratedMessage;
+
+import org.apache.sling.distribution.journal.impl.subscriber.LocalStore;
import org.apache.sling.distribution.queue.DistributionQueueEntry;
import org.apache.sling.distribution.queue.DistributionQueueItem;
import org.apache.sling.distribution.queue.spi.DistributionQueue;
@@ -113,7 +116,7 @@
private MBeanServer mbeanServer;
@Before
- public void before() {
+ public void before() throws PersistenceException {
MockitoAnnotations.initMocks(this);
when(clientProvider.createPoller(
Mockito.eq(Topics.PACKAGE_TOPIC),
@@ -131,11 +134,12 @@
Topics topics = new Topics();
String slingId = UUID.randomUUID().toString();
when(slingSettings.getSlingId()).thenReturn(slingId);
+ LocalStore seedStore = new LocalStore(resolverFactory, "seeds", slingId);
+ seedStore.store("offset", 1L);
pubQueueCacheService = new PubQueueCacheService(clientProvider, topics, eventAdmin, slingSettings, resolverFactory, slingId);
pubQueueCacheService.activate();
queueProvider = new PubQueueProviderImpl(pubQueueCacheService, clientProvider, topics);
queueProvider.activate();
- pubQueueCacheService.storeSeed();
handler = handlerCaptor.getValue().getHandler();
statHandler = statHandlerCaptor.getValue().getHandler();
}
diff --git a/src/test/java/org/apache/sling/distribution/journal/impl/queue/impl/QueueCacheSeederTest.java b/src/test/java/org/apache/sling/distribution/journal/impl/queue/impl/QueueCacheSeederTest.java
new file mode 100644
index 0000000..eeebb7a
--- /dev/null
+++ b/src/test/java/org/apache/sling/distribution/journal/impl/queue/impl/QueueCacheSeederTest.java
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sling.distribution.journal.impl.queue.impl;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.function.LongConsumer;
+
+import org.apache.sling.distribution.journal.HandlerAdapter;
+import org.apache.sling.distribution.journal.MessageSender;
+import org.apache.sling.distribution.journal.MessagingProvider;
+import org.apache.sling.distribution.journal.Reset;
+import org.apache.sling.distribution.journal.impl.shared.TestMessageInfo;
+import org.apache.sling.distribution.journal.messages.Messages.PackageMessage;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.ArgumentCaptor;
+import org.mockito.Captor;
+import org.mockito.Mock;
+import org.mockito.MockitoAnnotations;
+import org.mockito.runners.MockitoJUnitRunner;
+
+import static java.lang.System.currentTimeMillis;
+import static org.apache.sling.distribution.journal.impl.shared.Topics.PACKAGE_TOPIC;
+import static org.apache.sling.distribution.journal.messages.Messages.PackageMessage.ReqType.TEST;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.doNothing;
+import static org.mockito.Mockito.timeout;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+@RunWith(MockitoJUnitRunner.class)
+public class QueueCacheSeederTest {
+
+ @Mock
+ private MessagingProvider clientProvider;
+
+ @Captor
+ private ArgumentCaptor<HandlerAdapter<PackageMessage>> pkgHandlerCaptor;
+
+ @Captor
+ private ArgumentCaptor<PackageMessage> pkgMsgCaptor;
+
+ @Mock
+ private Closeable poller;
+
+ @Mock
+ private MessageSender<PackageMessage> sender;
+
+ @Mock
+ private LongConsumer callback;
+
+ private QueueCacheSeeder seeder;
+
+ @Before
+ public void before() {
+ MockitoAnnotations.initMocks(this);
+ when(clientProvider.createPoller(
+ eq(PACKAGE_TOPIC),
+ any(Reset.class),
+ pkgHandlerCaptor.capture()))
+ .thenReturn(poller);
+ doNothing().when(sender).send(eq(PACKAGE_TOPIC), pkgMsgCaptor.capture());
+ when(clientProvider.<PackageMessage>createSender())
+ .thenReturn(sender);
+ seeder = new QueueCacheSeeder(clientProvider, PACKAGE_TOPIC);
+ }
+
+ @Test
+ public void testSeededCallback() throws IOException {
+ seeder.seed(callback);
+ long offset = 15L;
+ simulateSeedingMsg(offset);
+ verify(callback).accept(offset);
+ verify(poller).close();
+ }
+
+ @Test
+ public void testSendingSeeds() {
+ seeder.seed(callback);
+ verify(sender, timeout(5000).atLeastOnce()).send(eq(PACKAGE_TOPIC), pkgMsgCaptor.capture());
+ PackageMessage seedMsg = pkgMsgCaptor.getValue();
+ assertNotNull(seedMsg);
+ assertEquals(TEST, seedMsg.getReqType());
+ }
+
+ @After
+ public void after() {
+ seeder.close();
+ }
+
+ private void simulateSeedingMsg(long offset) {
+ PackageMessage msg = seeder.createTestMessage();
+ pkgHandlerCaptor.getValue().getHandler().handle(
+ new TestMessageInfo(PACKAGE_TOPIC, 0, offset, currentTimeMillis()),
+ msg);
+ }
+}
\ No newline at end of file
diff --git a/src/test/java/org/apache/sling/distribution/journal/impl/subscriber/LocalStoreTest.java b/src/test/java/org/apache/sling/distribution/journal/impl/subscriber/LocalStoreTest.java
index 4bc8761..c989b4a 100644
--- a/src/test/java/org/apache/sling/distribution/journal/impl/subscriber/LocalStoreTest.java
+++ b/src/test/java/org/apache/sling/distribution/journal/impl/subscriber/LocalStoreTest.java
@@ -54,9 +54,9 @@
ResourceResolver resolver = resolverFactory.getServiceResourceResolver(null);
LocalStore offsetStore = new LocalStore(resolverFactory, "packages", "store3");
offsetStore.store(resolver, "key1", "value1");
- assertNull(offsetStore.load("key1", null));
+ assertNull(offsetStore.load("key1", String.class));
resolver.commit();
- assertEquals("value1", offsetStore.load("key1", null));
+ assertEquals("value1", offsetStore.load("key1", String.class));
}
@Test
@@ -72,8 +72,8 @@
statusStore.store(resolver, map);
resolver.commit();
- assertEquals("value1", statusStore.load("key1", null));
- assertEquals(false, statusStore.load("key2", null));
+ assertEquals("value1", statusStore.load("key1", String.class));
+ assertEquals(false, statusStore.load("key2", Boolean.class));
}
@Test
@@ -92,7 +92,7 @@
statusStore.store(resolver, "key2", true);
resolver.commit();
- assertEquals("value1", statusStore.load("key1", null));
- assertEquals(true, statusStore.load("key2", null));
+ assertEquals("value1", statusStore.load("key1", String.class));
+ assertEquals(true, statusStore.load("key2", Boolean.class));
}
}