SLING-9401 - Make subscribeIdle check configurable
diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/DistributionSubscriber.java b/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/DistributionSubscriber.java
index 663dce7..4aae01d 100644
--- a/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/DistributionSubscriber.java
+++ b/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/DistributionSubscriber.java
@@ -36,6 +36,7 @@
import java.util.Hashtable;
import java.util.List;
import java.util.Map;
+import java.util.Optional;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
@@ -142,7 +143,7 @@
@Reference
private Packaging packaging;
- SubscriberIdle subscriberIdle;
+ Optional<SubscriberIdle> subscriberIdle;
private ServiceRegistration<DistributionAgent> componentReg;
@@ -180,9 +181,13 @@
requireNonNull(eventAdmin);
requireNonNull(precondition);
- // Unofficial config (currently just for test)
- Integer idleMillies = (Integer) properties.getOrDefault("idleMillies", SubscriberIdle.DEFAULT_IDLE_TIME_MILLIS);
- subscriberIdle = new SubscriberIdle(context, idleMillies);
+ if (config.subscriberIdleCheck()) {
+ // Unofficial config (currently just for test)
+ Integer idleMillies = (Integer) properties.getOrDefault("idleMillies", SubscriberIdle.DEFAULT_IDLE_TIME_MILLIS);
+ subscriberIdle = Optional.of(new SubscriberIdle(context, idleMillies));
+ } else {
+ subscriberIdle = Optional.empty();
+ }
queueNames = getNotEmpty(config.agentNames());
int maxRetries = config.maxRetries();
@@ -252,8 +257,9 @@
*/
componentReg.unregister();
- IOUtils.closeQuietly(subscriberIdle, announcer, bookKeeper,
+ IOUtils.closeQuietly(announcer, bookKeeper,
packagePoller, commandPoller);
+ subscriberIdle.ifPresent(IOUtils::closeQuietly);
running = false;
String msg = String.format(
"Stopped Subscriber agent %s, subscribed to Publisher agent names %s with package builder %s",
@@ -378,7 +384,7 @@
try (Timer.Context context = distributionMetricsService.getProcessQueueItemDuration().time()) {
processQueueItem(item);
} finally {
- subscriberIdle.idle();
+ subscriberIdle.ifPresent(SubscriberIdle::idle);
}
} catch (TimeoutException e) {
@@ -425,7 +431,7 @@
long offset = queueItem.get(RECORD_OFFSET, Long.class);
PackageMessage pkgMsg = queueItem.get(PACKAGE_MSG, PackageMessage.class);
boolean skip = shouldSkip(offset);
- subscriberIdle.busy();
+ subscriberIdle.ifPresent(SubscriberIdle::busy);
if (skip) {
bookKeeper.removePackage(pkgMsg, offset);
} else {
diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/SubscriberConfiguration.java b/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/SubscriberConfiguration.java
index 937c30a..91ce8f8 100644
--- a/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/SubscriberConfiguration.java
+++ b/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/SubscriberConfiguration.java
@@ -52,4 +52,6 @@
@AttributeDefinition(name = "packageHandling", description = "Defines if content packages in /etc/packages should be processed (Extract, Install, Off).")
PackageHandling packageHandling() default PackageHandling.Off;
+ @AttributeDefinition(name = "subscriberIdleCheck", description = "Defines if we register a subscriber idle health check.")
+ boolean subscriberIdleCheck() default false;
}
diff --git a/src/test/java/org/apache/sling/distribution/journal/impl/subscriber/SubscriberTest.java b/src/test/java/org/apache/sling/distribution/journal/impl/subscriber/SubscriberTest.java
index 1d47520..6b093eb 100644
--- a/src/test/java/org/apache/sling/distribution/journal/impl/subscriber/SubscriberTest.java
+++ b/src/test/java/org/apache/sling/distribution/journal/impl/subscriber/SubscriberTest.java
@@ -351,7 +351,7 @@
packageHandler.handle(info, message);
waitSubscriber(RUNNING);
- await("Should report ready").until(subscriber.subscriberIdle::isReady);
+ await("Should report ready").until(() -> subscriber.subscriberIdle.get().isReady());
sem.release();
}
@@ -363,7 +363,8 @@
Map<String, Object> basicProps = ImmutableMap.of(
"name", SUB1_AGENT_NAME,
"agentNames", PUB1_AGENT_NAME,
- "idleMillies", 1000);
+ "idleMillies", 1000,
+ "subscriberIdleCheck", true);
Map<String, Object> props = new HashMap<>();
props.putAll(basicProps);
props.putAll(overrides);