SLING-9716 - Subscriber idle check should turn green when failing with the same package
diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/DistributionSubscriber.java b/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/DistributionSubscriber.java
index 44bb98f..9c6f002 100644
--- a/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/DistributionSubscriber.java
+++ b/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/DistributionSubscriber.java
@@ -337,7 +337,10 @@
         PackageMessage pkgMsg = item.getMessage();
         boolean skip = shouldSkip(info.getOffset());
         try {
-            subscriberIdle.ifPresent(SubscriberIdle::busy);
+            subscriberIdle.ifPresent((idle) -> {
+                int retries = bookKeeper.getRetries(pkgMsg.getPubAgentName());
+                idle.busy(retries);
+            });
             if (skip) {
                 bookKeeper.removePackage(pkgMsg, info.getOffset());
             } else {
diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/SubscriberIdle.java b/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/SubscriberIdle.java
index 3220a31..4f8949b 100644
--- a/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/SubscriberIdle.java
+++ b/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/SubscriberIdle.java
@@ -34,12 +34,15 @@
 import org.osgi.framework.ServiceRegistration;
 
 /**
- * A DistributionSubscriber is considered ready only when it is idle for more than 
- * the READY_IDLE_TIME_SECONDS at least once.
+ * A DistributionSubscriber is considered ready when it is idle for more than
+ * the READY_IDLE_TIME_SECONDS at least once ; or when it is busy processing
+ * the same package for more than MAX_RETRIES times.
  */
 public class SubscriberIdle implements SystemReadyCheck, Closeable {
     public static final int DEFAULT_IDLE_TIME_MILLIS = 10000;
 
+    public static final int MAX_RETRIES = 10;
+
     private final int idleMillis;
     private final AtomicBoolean isReady;
     private final ScheduledExecutorService executor;
@@ -68,9 +71,14 @@
     
     /**
      * Called when processing of a message starts
+     *
+     * @param retries the number of retries to process the message
      */
-    public synchronized void busy() {
+    public synchronized void busy(int retries) {
         cancelSchedule();
+        if (retries > MAX_RETRIES) {
+            ready();
+        }
     }
 
     /**
diff --git a/src/test/java/org/apache/sling/distribution/journal/impl/subscriber/SubscriberIdleTest.java b/src/test/java/org/apache/sling/distribution/journal/impl/subscriber/SubscriberIdleTest.java
index ac97fee..ed481c7 100644
--- a/src/test/java/org/apache/sling/distribution/journal/impl/subscriber/SubscriberIdleTest.java
+++ b/src/test/java/org/apache/sling/distribution/journal/impl/subscriber/SubscriberIdleTest.java
@@ -18,12 +18,15 @@
  */
 package org.apache.sling.distribution.journal.impl.subscriber;
 
+import static org.apache.sling.distribution.journal.impl.subscriber.SubscriberIdle.MAX_RETRIES;
 import static org.hamcrest.CoreMatchers.equalTo;
 import static org.junit.Assert.assertThat;
 
 import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.apache.felix.systemready.CheckStatus.State;
+import org.junit.After;
+import org.junit.Before;
 import org.junit.Test;
 import org.mockito.Mockito;
 import org.osgi.framework.BundleContext;
@@ -33,27 +36,50 @@
     private static final int IDLE_MILLIES = 40;
     private SubscriberIdle idle;
     private AtomicBoolean readyHolder;
-    
-    @Test
-    public void testIdle() throws InterruptedException {
+
+    @Before
+    public void before() {
         BundleContext context = Mockito.mock(BundleContext.class);
         readyHolder = new AtomicBoolean();
         idle = new SubscriberIdle(context , IDLE_MILLIES, readyHolder);
+    }
+
+    @After
+    public void after() {
+        idle.close();
+    }
+    
+    @Test
+    public void testIdle() throws InterruptedException {
         assertState("Initial state", State.RED);
-        idle.busy();
+        idle.busy(0);
         idle.idle();
         assertState("State after reset", State.RED);
         Thread.sleep(30);
         assertState("State after time below idle limit", State.RED);
-        idle.busy();
+        idle.busy(0);
         Thread.sleep(80);
         idle.idle();
         assertState("State after long processing", State.RED);
         Thread.sleep(80);
         assertState("State after time over idle limit", State.GREEN);
-        idle.busy();
+        idle.busy(0);
         assertState("State should not be reset once it reached GREEN", State.GREEN);
-        idle.close();
+    }
+
+    @Test
+    public void testMaxRetries() {
+        idle.busy(0);
+        idle.idle();
+        assertState("State with no retries", State.RED);
+        idle.busy(MAX_RETRIES);
+        idle.idle();
+        assertState("State with retries <= MAX_RETRIES", State.RED);
+        idle.busy(MAX_RETRIES + 1);
+        idle.idle();
+        assertState("State with retries > MAX_RETRIES", State.GREEN);
+        idle.busy(0);
+        assertState("State should not be reset once it reached GREEN", State.GREEN);
     }
     
     @Test