SLING-8944 - Systemready check for idle subscriber (#19)

* SLING-8944 - Systemready check for idle subscriber

* SLING-8944 - Readyness status based on busy and idle consumer

* SLING-8944 - Only schedule tasks if not already ready
diff --git a/pom.xml b/pom.xml
index 7669d24..b7af823 100644
--- a/pom.xml
+++ b/pom.xml
@@ -84,6 +84,11 @@
         </dependency>
         <dependency>
             <groupId>org.apache.felix</groupId>
+            <artifactId>org.apache.felix.systemready</artifactId>
+            <version>0.4.2</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.felix</groupId>
             <artifactId>org.apache.felix.converter</artifactId>
             <version>1.0.0</version>
             <scope>test</scope>
diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/DistributionSubscriber.java b/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/DistributionSubscriber.java
index 8ac9e56..9dd0d70 100644
--- a/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/DistributionSubscriber.java
+++ b/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/DistributionSubscriber.java
@@ -132,7 +132,10 @@
 
     @Reference
     private Packaging packaging;
-
+    
+    @Reference
+    private SubscriberIdle subscriberIdle;
+    
     private ServiceRegistration<DistributionAgent> componentReg;
 
     private Closeable packagePoller;
@@ -346,8 +349,11 @@
             // block until an item is available
             DistributionQueueItem item = blockingPeekQueueItem();
             // and then process it
+            subscriberIdle.busy();
             try (Timer.Context context = distributionMetricsService.getProcessQueueItemDuration().time()) {
                 processQueueItem(item);
+            } finally {
+                subscriberIdle.idle();
             }
         } catch (IllegalStateException e) {
             /**
diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/SubscriberIdle.java b/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/SubscriberIdle.java
new file mode 100644
index 0000000..7dac19c
--- /dev/null
+++ b/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/SubscriberIdle.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sling.distribution.journal.impl.subscriber;
+
+import java.io.Closeable;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.felix.systemready.CheckStatus;
+import org.apache.felix.systemready.CheckStatus.State;
+import org.apache.felix.systemready.StateType;
+import org.apache.felix.systemready.SystemReadyCheck;
+import org.osgi.service.component.annotations.Component;
+
+/**
+ * A DistributionSubscriber is considered ready only when it is idle for more than 
+ * the READY_IDLE_TIME_SECONDS at least once.
+ */
+@Component
+public class SubscriberIdle implements SystemReadyCheck, Closeable {
+    private static final int DEFAULT_IDLE_TIME_MILLIS = 10000;
+
+    private final int idleMillis;
+    private final AtomicBoolean isReady = new AtomicBoolean();
+    private final ScheduledExecutorService executor;
+    private ScheduledFuture<?> schedule;
+    
+    public SubscriberIdle() {
+        this(DEFAULT_IDLE_TIME_MILLIS);
+    }
+
+    public SubscriberIdle(int idleMillis) {
+        this.idleMillis = idleMillis;
+        executor = Executors.newScheduledThreadPool(1);
+    }
+    
+    @Override
+    public String getName() {
+        return "DistributionSubscriber idle";
+    }
+
+    @Override
+    public CheckStatus getStatus() {
+        State state = isReady.get() ? State.GREEN : State.RED; 
+        return new CheckStatus(getName(), StateType.READY, state, "DistributionSubscriber idle");
+    }
+    
+    /**
+     * Called when processing of a message starts
+     */
+    public synchronized void busy() {
+        if (schedule != null) {
+            schedule.cancel(false);
+        }
+    }
+
+    /**
+     * Called when processing of a message has finished
+     */
+    public synchronized void idle() {
+        if (!isReady.get()) {
+            busy();
+            schedule = executor.schedule(this::ready, idleMillis, TimeUnit.MILLISECONDS);
+        }
+    }
+    
+    private void ready() {
+        isReady.set(true);
+    }
+
+    @Override
+    public void close() {
+        executor.shutdownNow();
+    }
+
+}
diff --git a/src/test/java/org/apache/sling/distribution/journal/impl/subscriber/SubscriberIdleTest.java b/src/test/java/org/apache/sling/distribution/journal/impl/subscriber/SubscriberIdleTest.java
new file mode 100644
index 0000000..76ae4bc
--- /dev/null
+++ b/src/test/java/org/apache/sling/distribution/journal/impl/subscriber/SubscriberIdleTest.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sling.distribution.journal.impl.subscriber;
+
+import static org.hamcrest.CoreMatchers.equalTo;
+import static org.junit.Assert.assertThat;
+
+import org.apache.felix.systemready.CheckStatus.State;
+import org.junit.Test;
+
+public class SubscriberIdleTest {
+
+    private SubscriberIdle idle;
+
+    @Test
+    public void testIdle() throws InterruptedException {
+        idle = new SubscriberIdle(40);
+        assertState("Initial state", State.RED);
+        idle.busy();
+        idle.idle();
+        assertState("State after reset", State.RED);
+        Thread.sleep(30);
+        assertState("State after time below idle limit", State.RED);
+        idle.busy();
+        Thread.sleep(80);
+        idle.idle();
+        assertState("State after long processing", State.RED);
+        Thread.sleep(80);
+        assertState("State after time over idle limit", State.GREEN);
+        idle.busy();
+        assertState("State should not be reset once it reached GREEN", State.GREEN);
+        idle.close();
+    }
+
+    private void assertState(String message, State expectedState) {
+        assertThat(message, idle.getStatus().getState(), equalTo(expectedState));
+    }
+    
+    
+}
diff --git a/src/test/java/org/apache/sling/distribution/journal/impl/subscriber/SubscriberTest.java b/src/test/java/org/apache/sling/distribution/journal/impl/subscriber/SubscriberTest.java
index cf06393..6337c60 100644
--- a/src/test/java/org/apache/sling/distribution/journal/impl/subscriber/SubscriberTest.java
+++ b/src/test/java/org/apache/sling/distribution/journal/impl/subscriber/SubscriberTest.java
@@ -188,6 +188,9 @@
 
     @Mock
     private Timer.Context timerContext;
+    
+    @Mock
+    private SubscriberIdle subscriberIdle;
 
     @InjectMocks
     DistributionSubscriber subscriber;
@@ -200,7 +203,7 @@
     
     @Mock
     private ServiceRegistration<DistributionAgent> reg;
-
+    
     private MessageHandler<PackageMessage> packageHandler;