SLING-9716 - Subscriber idle check should turn green when failing with the same package
diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/DistributionSubscriber.java b/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/DistributionSubscriber.java
index 44bb98f..9c6f002 100644
--- a/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/DistributionSubscriber.java
+++ b/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/DistributionSubscriber.java
@@ -337,7 +337,10 @@
PackageMessage pkgMsg = item.getMessage();
boolean skip = shouldSkip(info.getOffset());
try {
- subscriberIdle.ifPresent(SubscriberIdle::busy);
+ subscriberIdle.ifPresent((idle) -> {
+ int retries = bookKeeper.getRetries(pkgMsg.getPubAgentName());
+ idle.busy(retries);
+ });
if (skip) {
bookKeeper.removePackage(pkgMsg, info.getOffset());
} else {
diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/SubscriberIdle.java b/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/SubscriberIdle.java
index 3220a31..4f8949b 100644
--- a/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/SubscriberIdle.java
+++ b/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/SubscriberIdle.java
@@ -34,12 +34,15 @@
import org.osgi.framework.ServiceRegistration;
/**
- * A DistributionSubscriber is considered ready only when it is idle for more than
- * the READY_IDLE_TIME_SECONDS at least once.
+ * A DistributionSubscriber is considered ready when it is idle for more than
+ * the READY_IDLE_TIME_SECONDS at least once ; or when it is busy processing
+ * the same package for more than MAX_RETRIES times.
*/
public class SubscriberIdle implements SystemReadyCheck, Closeable {
public static final int DEFAULT_IDLE_TIME_MILLIS = 10000;
+ public static final int MAX_RETRIES = 10;
+
private final int idleMillis;
private final AtomicBoolean isReady;
private final ScheduledExecutorService executor;
@@ -68,9 +71,14 @@
/**
* Called when processing of a message starts
+ *
+ * @param retries the number of retries to process the message
*/
- public synchronized void busy() {
+ public synchronized void busy(int retries) {
cancelSchedule();
+ if (retries > MAX_RETRIES) {
+ ready();
+ }
}
/**
diff --git a/src/test/java/org/apache/sling/distribution/journal/impl/subscriber/SubscriberIdleTest.java b/src/test/java/org/apache/sling/distribution/journal/impl/subscriber/SubscriberIdleTest.java
index ac97fee..ed481c7 100644
--- a/src/test/java/org/apache/sling/distribution/journal/impl/subscriber/SubscriberIdleTest.java
+++ b/src/test/java/org/apache/sling/distribution/journal/impl/subscriber/SubscriberIdleTest.java
@@ -18,12 +18,15 @@
*/
package org.apache.sling.distribution.journal.impl.subscriber;
+import static org.apache.sling.distribution.journal.impl.subscriber.SubscriberIdle.MAX_RETRIES;
import static org.hamcrest.CoreMatchers.equalTo;
import static org.junit.Assert.assertThat;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.felix.systemready.CheckStatus.State;
+import org.junit.After;
+import org.junit.Before;
import org.junit.Test;
import org.mockito.Mockito;
import org.osgi.framework.BundleContext;
@@ -33,27 +36,50 @@
private static final int IDLE_MILLIES = 40;
private SubscriberIdle idle;
private AtomicBoolean readyHolder;
-
- @Test
- public void testIdle() throws InterruptedException {
+
+ @Before
+ public void before() {
BundleContext context = Mockito.mock(BundleContext.class);
readyHolder = new AtomicBoolean();
idle = new SubscriberIdle(context , IDLE_MILLIES, readyHolder);
+ }
+
+ @After
+ public void after() {
+ idle.close();
+ }
+
+ @Test
+ public void testIdle() throws InterruptedException {
assertState("Initial state", State.RED);
- idle.busy();
+ idle.busy(0);
idle.idle();
assertState("State after reset", State.RED);
Thread.sleep(30);
assertState("State after time below idle limit", State.RED);
- idle.busy();
+ idle.busy(0);
Thread.sleep(80);
idle.idle();
assertState("State after long processing", State.RED);
Thread.sleep(80);
assertState("State after time over idle limit", State.GREEN);
- idle.busy();
+ idle.busy(0);
assertState("State should not be reset once it reached GREEN", State.GREEN);
- idle.close();
+ }
+
+ @Test
+ public void testMaxRetries() {
+ idle.busy(0);
+ idle.idle();
+ assertState("State with no retries", State.RED);
+ idle.busy(MAX_RETRIES);
+ idle.idle();
+ assertState("State with retries <= MAX_RETRIES", State.RED);
+ idle.busy(MAX_RETRIES + 1);
+ idle.idle();
+ assertState("State with retries > MAX_RETRIES", State.GREEN);
+ idle.busy(0);
+ assertState("State should not be reset once it reached GREEN", State.GREEN);
}
@Test