SLING-8415 - throw upon catching interrupted exception
diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueueCache.java b/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueueCache.java
index 89b90ca..89a001f 100644
--- a/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueueCache.java
+++ b/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueueCache.java
@@ -124,6 +124,8 @@
      */
     private volatile boolean closed;
 
+    private Thread seeder;
+
 
     public PubQueueCache(MessagingProvider messagingProvider, EventAdmin eventAdmin, DistributionMetricsService distributionMetricsService, String topic) {
         this.messagingProvider = messagingProvider;
@@ -136,11 +138,11 @@
                 Reset.latest,
                 create(PackageMessage.class, this::handlePackage));
 
-        RunnableUtil.startBackgroundThread(this::sendSeedingMessages, "queue seeding");
+        seeder = RunnableUtil.startBackgroundThread(this::sendSeedingMessages, "queue seeding");
     }
 
     @Nonnull
-    public OffsetQueue<DistributionQueueItem> getOffsetQueue(String pubAgentName, long minOffset) {
+    public OffsetQueue<DistributionQueueItem> getOffsetQueue(String pubAgentName, long minOffset) throws InterruptedException {
         fetchIfNeeded(minOffset);
         return agentQueues.getOrDefault(pubAgentName, new OffsetQueueImpl<>());
     }
@@ -151,29 +153,29 @@
 
     public void close() {
         closed = true;
+        seeder.interrupt();
         IOUtils.closeQuietly(tailPoller);
         jmxRegs.stream().forEach(IOUtils::closeQuietly);
     }
     
     private void sendSeedingMessages() {
-        LOG.info("Send seeding messages");
+        LOG.info("Start message seeder");
         MessageSender<PackageMessage> sender = messagingProvider.createSender();
-        while (! closed) {
+        while (! Thread.interrupted()) {
             PackageMessage pkgMsg = createTestMessage();
+            LOG.debug("Send seeding message");
             try {
-                LOG.debug("Send seeding message");
                 sender.send(topic, pkgMsg);
-                if (seeded.await(SEEDING_DELAY_MS, MILLISECONDS)) {
-                    LOG.info("Cache has been seeded");
-                    return;
-                }
             } catch (MessagingException e) {
-                LOG.info(e.getMessage());
-                sleep(1000);
-            } catch (InterruptedException ignore) {
-                // ignore
+                LOG.warn(e.getMessage(), e);
+            }
+            try {
+                Thread.sleep(SEEDING_DELAY_MS);
+            } catch (InterruptedException e) {
+                Thread.currentThread().interrupt();
             }
         }
+        LOG.info("Stop message seeder");
     }
 
     private PackageMessage createTestMessage() {
@@ -192,7 +194,7 @@
      *
      * @param requestedMinOffset the min offset to start fetching data from
      */
-    private void fetchIfNeeded(long requestedMinOffset) {
+    private void fetchIfNeeded(long requestedMinOffset) throws InterruptedException {
 
         // We wait on the cache to be seeded (at least one message handled)
         // before computing potential missing offsets.
@@ -227,8 +229,6 @@
 
                     fetch(requestedMinOffset, cachedMinOffset);
                 }
-            } catch (Exception e) {
-                throw new RuntimeException(String.format("Failed to fetch offsets [%s,%s[, %s", requestedMinOffset, cachedMinOffset, e.getMessage()), e);
             } finally {
                 headPollerLock.unlock();
             }
@@ -249,19 +249,15 @@
         updateMinOffset(requestedMinOffset);
     }
 
-    private void waitSeeded() {
-        while (! closed) {
-            try {
-                if (seeded.await(SEEDING_DELAY_MS, MILLISECONDS)) {
-                    return;
-                } else {
-                    LOG.debug("Waiting for seeded cache");
-                }
-            } catch (InterruptedException ignore) {
-                // ignore
+    private void waitSeeded() throws InterruptedException {
+        while (!closed) {
+            if (seeded.await(SEEDING_DELAY_MS, MILLISECONDS)) {
+                return;
+            } else {
+                LOG.debug("Waiting for seeded cache");
             }
         }
-        throw new RuntimeException();
+        throw new InterruptedException("Cache is closed");
     }
 
     protected long getMinOffset() {
@@ -320,16 +316,12 @@
         return agentQueue;
     }
 
-    private void sleep(long delay) {
-        try {
-            Thread.sleep(delay);
-        } catch (InterruptedException ignore) {
-            // ignore
-        }
-    }
-
     private void handlePackage(final MessageInfo info, final PackageMessage message) {
         merge(singletonList(new FullMessage<>(info, message)));
+        if (seeded.getCount() > 0) {
+            LOG.info("Cache has been seeded");
+        }
         seeded.countDown();
+        seeder.interrupt();
     }
 }
diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueueCacheService.java b/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueueCacheService.java
index 2f06b49..5ba6d6b 100644
--- a/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueueCacheService.java
+++ b/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueueCacheService.java
@@ -94,7 +94,12 @@
     }
 
     public OffsetQueue<DistributionQueueItem> getOffsetQueue(String pubAgentName, long minOffset) {
-        return cache.getOffsetQueue(pubAgentName, minOffset);
+        try {
+            return cache.getOffsetQueue(pubAgentName, minOffset);
+        } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+            throw new RuntimeException(e);
+        }
     }
 
     private void cleanup() {