SLING-9488 - Throw in case of journal not seeded (#39)
diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueueCache.java b/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueueCache.java
index b2f0135..c6bf1de 100644
--- a/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueueCache.java
+++ b/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueueCache.java
@@ -122,8 +122,6 @@
private final DistributionMetricsService distributionMetricsService;
- private volatile boolean closed;
-
public PubQueueCache(MessagingProvider messagingProvider, EventAdmin eventAdmin, DistributionMetricsService distributionMetricsService, String topic, LocalStore seedStore) {
this.messagingProvider = messagingProvider;
this.eventAdmin = eventAdmin;
@@ -157,14 +155,10 @@
@Nonnull
public OffsetQueue<DistributionQueueItem> getOffsetQueue(String pubAgentName, long minOffset) throws InterruptedException {
- if (minOffset > 0) {
- /*
- * we fetch data only when requested
- * at least one processed offset
- */
- waitSeeded();
- fetchIfNeeded(minOffset);
+ if (!isSeeded()) {
+ throw new RuntimeException("Gave up waiting for seeded cache");
}
+ fetchIfNeeded(minOffset);
return agentQueues.getOrDefault(pubAgentName, new OffsetQueueImpl<>());
}
@@ -190,7 +184,6 @@
}
public void close() {
- closed = true;
IOUtils.closeQuietly(tailPoller);
jmxRegs.forEach(IOUtils::closeQuietly);
}
@@ -253,15 +246,8 @@
updateMinOffset(requestedMinOffset);
}
- private void waitSeeded() throws InterruptedException {
- long start = System.currentTimeMillis();
- while (getMinOffset() == Long.MAX_VALUE) {
- LOG.debug("Waiting for seeded cache");
- if (closed || System.currentTimeMillis() - start > MAX_FETCH_WAIT_MS) {
- throw new RuntimeException("Gave up waiting for seeded cache");
- }
- Thread.sleep(1000);
- }
+ private boolean isSeeded() {
+ return getMinOffset() != Long.MAX_VALUE;
}
protected long getMinOffset() {
diff --git a/src/test/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueueCacheTest.java b/src/test/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueueCacheTest.java
index 828ed35..e8881bf 100644
--- a/src/test/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueueCacheTest.java
+++ b/src/test/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueueCacheTest.java
@@ -24,9 +24,9 @@
import static org.apache.sling.distribution.journal.messages.Messages.PackageMessage.ReqType.DELETE;
import static org.apache.sling.distribution.journal.messages.Messages.PackageMessage.ReqType.TEST;
import static org.hamcrest.CoreMatchers.notNullValue;
+import static org.hamcrest.Matchers.greaterThan;
import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertThat;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.anyLong;
import static org.mockito.Matchers.eq;
@@ -34,15 +34,11 @@
import java.io.Closeable;
import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
import java.util.Random;
import java.util.UUID;
-import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
-import java.util.concurrent.TimeoutException;
import java.util.stream.LongStream;
import org.apache.sling.api.resource.ResourceResolverFactory;
@@ -60,7 +56,6 @@
import org.apache.sling.distribution.queue.DistributionQueueItem;
import org.apache.sling.testing.resourceresolver.MockResourceResolverFactory;
import org.awaitility.Awaitility;
-import org.awaitility.Duration;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
@@ -149,36 +144,24 @@
executor.shutdownNow();
cache.close();
}
-
- @Test
- public void testSeedingFromNewPackageMessage() throws Exception {
- Future<OffsetQueue<DistributionQueueItem>> consumer = consumer(PUB_AGENT_NAME_1, 0);
- // The consumer is blocked until the cache is seeded
- assertFalse(consumer.isDone());
- // sending any package message seeds the cache
- simulateMessage(tailHandler, 0);
- consumer.get(15, SECONDS);
+
+ @Test(expected = RuntimeException.class)
+ public void testUnseededThrows() throws Exception {
+ cache.getOffsetQueue(PUB_AGENT_NAME_1, 0);
}
@Test
- public void testSeedingConcurrentConsumers() throws Exception {
- List<Future<OffsetQueue<DistributionQueueItem>>> consumers = new ArrayList<>();
- // minOffset > 0 to leverage the cache
- consumers.add(consumer(PUB_AGENT_NAME_1, 1));
- consumers.add(consumer(PUB_AGENT_NAME_2, 1));
- consumers.add(consumer(PUB_AGENT_NAME_3, 1));
- // All consumers are blocked until the cache is seeded
- consumers.forEach(future -> assertFalse(future.isDone()));
- simulateMessage(tailHandler, 1);
- consumers.forEach(future -> assertNotNull(get(future)));
+ public void testSeedingFromNewPackageMessage() throws Exception {
+ simulateMessage(tailHandler, PUB_AGENT_NAME_1, ADD, 0);
+ OffsetQueue<DistributionQueueItem> queue = cache.getOffsetQueue(PUB_AGENT_NAME_1, 0);
+ assertThat(queue.getSize(), greaterThan(0));
}
@Test
public void testFetchWithSingleConsumer() throws Exception {
- // build a consumer form offset 100
+ simulateMessage(tailHandler, 200);
Future<OffsetQueue<DistributionQueueItem>> consumer = consumer(PUB_AGENT_NAME_1, 100);
// seeding the cache with a message at offset 200
- simulateMessage(tailHandler, 200);
// wait that the consumer has started fetching the offsets from 100 to 200
awaitHeadHandler();
// simulate messages for the fetched offsets
@@ -195,11 +178,11 @@
@Test
public void testFetchWithConcurrentConsumer() throws Exception {
+ simulateMessage(tailHandler, 200);
// build two consumers for same agent queue, from offset 100
Future<OffsetQueue<DistributionQueueItem>> consumer1 = consumer(PUB_AGENT_NAME_1, 100);
Future<OffsetQueue<DistributionQueueItem>> consumer2 = consumer(PUB_AGENT_NAME_1, 100);
// seeding the cache with a message at offset 200
- simulateMessage(tailHandler, 200);
// wait that one consumer has started fetching the offsets from 100 to 200
MessageHandler<PackageMessage> headHandler = awaitHeadHandler();
// simulate messages for the fetched offsets
@@ -216,26 +199,15 @@
@Test
public void testCacheSize() throws Exception {
- Future<OffsetQueue<DistributionQueueItem>> consumer = consumer(PUB_AGENT_NAME_1, 0);
simulateMessage(tailHandler, PUB_AGENT_NAME_3, ADD, 0);
simulateMessage(tailHandler, PUB_AGENT_NAME_3, DELETE, 1);
simulateMessage(tailHandler, PUB_AGENT_NAME_1, ADD, 2);
simulateMessage(tailHandler, PUB_AGENT_NAME_3, TEST, 3); // TEST message does not increase the cache size
simulateMessage(tailHandler, PUB_AGENT_NAME_2, TEST, 4); // TEST message does not increase the cache size
simulateMessage(tailHandler, PUB_AGENT_NAME_3, ADD, 5);
- consumer.get(15, SECONDS);
assertEquals(4, cache.size());
}
- @Test(expected = ExecutionException.class)
- public void testCloseUnseededPoller() throws Throwable {
- Future<OffsetQueue<DistributionQueueItem>> task = consumer(PUB_AGENT_NAME_1, 1); // minOffset > 0 to leverage the cache
- Awaitility.setDefaultPollDelay(Duration.ZERO);
- cache.close();
- task.get();
- }
-
-
private void simulateMessages(MessageHandler<PackageMessage> handler, long fromOffset, long toOffset) {
LongStream.rangeClosed(fromOffset, toOffset).forEach(offset -> simulateMessage(handler, offset));
}
@@ -266,14 +238,6 @@
return executor.submit(() -> cache.getOffsetQueue(pubAgentName, minOffset));
}
- private OffsetQueue<DistributionQueueItem> get(Future<OffsetQueue<DistributionQueueItem>> future) {
- try {
- return future.get(1, SECONDS);
- } catch (TimeoutException | InterruptedException | ExecutionException e) {
- throw new RuntimeException(e);
- }
- }
-
@SafeVarargs
private final <T> T pickAny(T... c) {
if (c == null || c.length == 0) {