SLING-9488 - Throw in case of journal not seeded (#39)

diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueueCache.java b/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueueCache.java
index b2f0135..c6bf1de 100644
--- a/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueueCache.java
+++ b/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueueCache.java
@@ -122,8 +122,6 @@
 
     private final DistributionMetricsService distributionMetricsService;
     
-    private volatile boolean closed;
-
     public PubQueueCache(MessagingProvider messagingProvider, EventAdmin eventAdmin, DistributionMetricsService distributionMetricsService, String topic, LocalStore seedStore) {
         this.messagingProvider = messagingProvider;
         this.eventAdmin = eventAdmin;
@@ -157,14 +155,10 @@
 
     @Nonnull
     public OffsetQueue<DistributionQueueItem> getOffsetQueue(String pubAgentName, long minOffset) throws InterruptedException {
-        if (minOffset > 0) {
-            /*
-             * we fetch data only when requested
-             * at least one processed offset
-             */
-            waitSeeded();
-            fetchIfNeeded(minOffset);
+        if (!isSeeded()) {
+            throw new RuntimeException("Gave up waiting for seeded cache");
         }
+        fetchIfNeeded(minOffset);
         return agentQueues.getOrDefault(pubAgentName, new OffsetQueueImpl<>());
     }
 
@@ -190,7 +184,6 @@
     }
 
     public void close() {
-        closed = true;
         IOUtils.closeQuietly(tailPoller);
         jmxRegs.forEach(IOUtils::closeQuietly);
     }
@@ -253,15 +246,8 @@
         updateMinOffset(requestedMinOffset);
     }
 
-    private void waitSeeded() throws InterruptedException {
-        long start = System.currentTimeMillis();
-        while (getMinOffset() == Long.MAX_VALUE) {
-            LOG.debug("Waiting for seeded cache");
-            if (closed || System.currentTimeMillis() - start > MAX_FETCH_WAIT_MS) {
-                throw new RuntimeException("Gave up waiting for seeded cache");
-            }
-            Thread.sleep(1000);
-        }
+    private boolean isSeeded() {
+        return getMinOffset() != Long.MAX_VALUE;
     }
 
     protected long getMinOffset() {
diff --git a/src/test/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueueCacheTest.java b/src/test/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueueCacheTest.java
index 828ed35..e8881bf 100644
--- a/src/test/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueueCacheTest.java
+++ b/src/test/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueueCacheTest.java
@@ -24,9 +24,9 @@
 import static org.apache.sling.distribution.journal.messages.Messages.PackageMessage.ReqType.DELETE;
 import static org.apache.sling.distribution.journal.messages.Messages.PackageMessage.ReqType.TEST;
 import static org.hamcrest.CoreMatchers.notNullValue;
+import static org.hamcrest.Matchers.greaterThan;
 import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertThat;
 import static org.mockito.Matchers.any;
 import static org.mockito.Matchers.anyLong;
 import static org.mockito.Matchers.eq;
@@ -34,15 +34,11 @@
 
 import java.io.Closeable;
 import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
 import java.util.Random;
 import java.util.UUID;
-import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
-import java.util.concurrent.TimeoutException;
 import java.util.stream.LongStream;
 
 import org.apache.sling.api.resource.ResourceResolverFactory;
@@ -60,7 +56,6 @@
 import org.apache.sling.distribution.queue.DistributionQueueItem;
 import org.apache.sling.testing.resourceresolver.MockResourceResolverFactory;
 import org.awaitility.Awaitility;
-import org.awaitility.Duration;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
@@ -149,36 +144,24 @@
         executor.shutdownNow();
         cache.close();
     }
-
-    @Test
-    public void testSeedingFromNewPackageMessage() throws Exception {
-        Future<OffsetQueue<DistributionQueueItem>> consumer = consumer(PUB_AGENT_NAME_1, 0);
-        // The consumer is blocked until the cache is seeded
-        assertFalse(consumer.isDone());
-        // sending any package message seeds the cache
-        simulateMessage(tailHandler, 0);
-        consumer.get(15, SECONDS);
+    
+    @Test(expected = RuntimeException.class)
+    public void testUnseededThrows() throws Exception {
+        cache.getOffsetQueue(PUB_AGENT_NAME_1, 0);
     }
 
     @Test
-    public void testSeedingConcurrentConsumers() throws Exception {
-        List<Future<OffsetQueue<DistributionQueueItem>>> consumers = new ArrayList<>();
-        // minOffset > 0 to leverage the cache
-        consumers.add(consumer(PUB_AGENT_NAME_1, 1));
-        consumers.add(consumer(PUB_AGENT_NAME_2, 1));
-        consumers.add(consumer(PUB_AGENT_NAME_3, 1));
-        // All consumers are blocked until the cache is seeded
-        consumers.forEach(future -> assertFalse(future.isDone()));
-        simulateMessage(tailHandler, 1);
-        consumers.forEach(future -> assertNotNull(get(future)));
+    public void testSeedingFromNewPackageMessage() throws Exception {
+        simulateMessage(tailHandler, PUB_AGENT_NAME_1, ADD, 0);
+        OffsetQueue<DistributionQueueItem> queue = cache.getOffsetQueue(PUB_AGENT_NAME_1, 0);
+        assertThat(queue.getSize(), greaterThan(0));
     }
 
     @Test
     public void testFetchWithSingleConsumer() throws Exception {
-        // build a consumer form offset 100
+        simulateMessage(tailHandler, 200);
         Future<OffsetQueue<DistributionQueueItem>> consumer = consumer(PUB_AGENT_NAME_1, 100);
         // seeding the cache with a message at offset 200
-        simulateMessage(tailHandler, 200);
         // wait that the consumer has started fetching the offsets from 100 to 200
         awaitHeadHandler();
         // simulate messages for the fetched offsets
@@ -195,11 +178,11 @@
 
 	@Test
     public void testFetchWithConcurrentConsumer() throws Exception {
+	    simulateMessage(tailHandler, 200);
         // build two consumers for same agent queue, from offset 100
         Future<OffsetQueue<DistributionQueueItem>> consumer1 = consumer(PUB_AGENT_NAME_1, 100);
         Future<OffsetQueue<DistributionQueueItem>> consumer2 = consumer(PUB_AGENT_NAME_1, 100);
         // seeding the cache with a message at offset 200
-        simulateMessage(tailHandler, 200);
         // wait that one consumer has started fetching the offsets from 100 to 200
         MessageHandler<PackageMessage> headHandler = awaitHeadHandler();
         // simulate messages for the fetched offsets
@@ -216,26 +199,15 @@
 
     @Test
     public void testCacheSize() throws Exception {
-        Future<OffsetQueue<DistributionQueueItem>> consumer = consumer(PUB_AGENT_NAME_1, 0);
         simulateMessage(tailHandler, PUB_AGENT_NAME_3, ADD, 0);
         simulateMessage(tailHandler, PUB_AGENT_NAME_3, DELETE, 1);
         simulateMessage(tailHandler, PUB_AGENT_NAME_1, ADD, 2);
         simulateMessage(tailHandler, PUB_AGENT_NAME_3, TEST, 3);    // TEST message does not increase the cache size
         simulateMessage(tailHandler, PUB_AGENT_NAME_2, TEST, 4);    // TEST message does not increase the cache size
         simulateMessage(tailHandler, PUB_AGENT_NAME_3, ADD, 5);
-        consumer.get(15, SECONDS);
         assertEquals(4, cache.size());
     }
 
-    @Test(expected = ExecutionException.class)
-    public void testCloseUnseededPoller() throws Throwable {
-        Future<OffsetQueue<DistributionQueueItem>> task = consumer(PUB_AGENT_NAME_1, 1); // minOffset > 0 to leverage the cache
-        Awaitility.setDefaultPollDelay(Duration.ZERO);
-        cache.close();
-        task.get();
-    }
-    
-
     private void simulateMessages(MessageHandler<PackageMessage> handler, long fromOffset, long toOffset) {
         LongStream.rangeClosed(fromOffset, toOffset).forEach(offset -> simulateMessage(handler, offset));
     }
@@ -266,14 +238,6 @@
         return executor.submit(() -> cache.getOffsetQueue(pubAgentName, minOffset));
     }
 
-    private OffsetQueue<DistributionQueueItem> get(Future<OffsetQueue<DistributionQueueItem>> future) {
-        try {
-            return future.get(1, SECONDS);
-        } catch (TimeoutException | InterruptedException | ExecutionException e) {
-            throw new RuntimeException(e);
-        }
-    }
-
     @SafeVarargs
     private final <T> T pickAny(T... c) {
         if (c == null || c.length == 0) {