SLING-8415 - throw upon catching interrupted exception
diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueueCache.java b/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueueCache.java
index 89b90ca..89a001f 100644
--- a/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueueCache.java
+++ b/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueueCache.java
@@ -124,6 +124,8 @@
*/
private volatile boolean closed;
+ private Thread seeder;
+
public PubQueueCache(MessagingProvider messagingProvider, EventAdmin eventAdmin, DistributionMetricsService distributionMetricsService, String topic) {
this.messagingProvider = messagingProvider;
@@ -136,11 +138,11 @@
Reset.latest,
create(PackageMessage.class, this::handlePackage));
- RunnableUtil.startBackgroundThread(this::sendSeedingMessages, "queue seeding");
+ seeder = RunnableUtil.startBackgroundThread(this::sendSeedingMessages, "queue seeding");
}
@Nonnull
- public OffsetQueue<DistributionQueueItem> getOffsetQueue(String pubAgentName, long minOffset) {
+ public OffsetQueue<DistributionQueueItem> getOffsetQueue(String pubAgentName, long minOffset) throws InterruptedException {
fetchIfNeeded(minOffset);
return agentQueues.getOrDefault(pubAgentName, new OffsetQueueImpl<>());
}
@@ -151,29 +153,29 @@
public void close() {
closed = true;
+ seeder.interrupt();
IOUtils.closeQuietly(tailPoller);
jmxRegs.stream().forEach(IOUtils::closeQuietly);
}
private void sendSeedingMessages() {
- LOG.info("Send seeding messages");
+ LOG.info("Start message seeder");
MessageSender<PackageMessage> sender = messagingProvider.createSender();
- while (! closed) {
+ while (! Thread.interrupted()) {
PackageMessage pkgMsg = createTestMessage();
+ LOG.debug("Send seeding message");
try {
- LOG.debug("Send seeding message");
sender.send(topic, pkgMsg);
- if (seeded.await(SEEDING_DELAY_MS, MILLISECONDS)) {
- LOG.info("Cache has been seeded");
- return;
- }
} catch (MessagingException e) {
- LOG.info(e.getMessage());
- sleep(1000);
- } catch (InterruptedException ignore) {
- // ignore
+ LOG.warn(e.getMessage(), e);
+ }
+ try {
+ Thread.sleep(SEEDING_DELAY_MS);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
}
}
+ LOG.info("Stop message seeder");
}
private PackageMessage createTestMessage() {
@@ -192,7 +194,7 @@
*
* @param requestedMinOffset the min offset to start fetching data from
*/
- private void fetchIfNeeded(long requestedMinOffset) {
+ private void fetchIfNeeded(long requestedMinOffset) throws InterruptedException {
// We wait on the cache to be seeded (at least one message handled)
// before computing potential missing offsets.
@@ -227,8 +229,6 @@
fetch(requestedMinOffset, cachedMinOffset);
}
- } catch (Exception e) {
- throw new RuntimeException(String.format("Failed to fetch offsets [%s,%s[, %s", requestedMinOffset, cachedMinOffset, e.getMessage()), e);
} finally {
headPollerLock.unlock();
}
@@ -249,19 +249,15 @@
updateMinOffset(requestedMinOffset);
}
- private void waitSeeded() {
- while (! closed) {
- try {
- if (seeded.await(SEEDING_DELAY_MS, MILLISECONDS)) {
- return;
- } else {
- LOG.debug("Waiting for seeded cache");
- }
- } catch (InterruptedException ignore) {
- // ignore
+ private void waitSeeded() throws InterruptedException {
+ while (!closed) {
+ if (seeded.await(SEEDING_DELAY_MS, MILLISECONDS)) {
+ return;
+ } else {
+ LOG.debug("Waiting for seeded cache");
}
}
- throw new RuntimeException();
+ throw new InterruptedException("Cache is closed");
}
protected long getMinOffset() {
@@ -320,16 +316,12 @@
return agentQueue;
}
- private void sleep(long delay) {
- try {
- Thread.sleep(delay);
- } catch (InterruptedException ignore) {
- // ignore
- }
- }
-
private void handlePackage(final MessageInfo info, final PackageMessage message) {
merge(singletonList(new FullMessage<>(info, message)));
+ if (seeded.getCount() > 0) {
+ LOG.info("Cache has been seeded");
+ }
seeded.countDown();
+ seeder.interrupt();
}
}
diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueueCacheService.java b/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueueCacheService.java
index 2f06b49..5ba6d6b 100644
--- a/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueueCacheService.java
+++ b/src/main/java/org/apache/sling/distribution/journal/impl/queue/impl/PubQueueCacheService.java
@@ -94,7 +94,12 @@
}
public OffsetQueue<DistributionQueueItem> getOffsetQueue(String pubAgentName, long minOffset) {
- return cache.getOffsetQueue(pubAgentName, minOffset);
+ try {
+ return cache.getOffsetQueue(pubAgentName, minOffset);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new RuntimeException(e);
+ }
}
private void cleanup() {