SLING-8944 - Systemready check for idle subscriber (#19)
* SLING-8944 - Systemready check for idle subscriber
* SLING-8944 - Readyness status based on busy and idle consumer
* SLING-8944 - Only schedule tasks if not already ready
diff --git a/pom.xml b/pom.xml
index 7669d24..b7af823 100644
--- a/pom.xml
+++ b/pom.xml
@@ -84,6 +84,11 @@
</dependency>
<dependency>
<groupId>org.apache.felix</groupId>
+ <artifactId>org.apache.felix.systemready</artifactId>
+ <version>0.4.2</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.felix</groupId>
<artifactId>org.apache.felix.converter</artifactId>
<version>1.0.0</version>
<scope>test</scope>
diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/DistributionSubscriber.java b/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/DistributionSubscriber.java
index 8ac9e56..9dd0d70 100644
--- a/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/DistributionSubscriber.java
+++ b/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/DistributionSubscriber.java
@@ -132,7 +132,10 @@
@Reference
private Packaging packaging;
-
+
+ @Reference
+ private SubscriberIdle subscriberIdle;
+
private ServiceRegistration<DistributionAgent> componentReg;
private Closeable packagePoller;
@@ -346,8 +349,11 @@
// block until an item is available
DistributionQueueItem item = blockingPeekQueueItem();
// and then process it
+ subscriberIdle.busy();
try (Timer.Context context = distributionMetricsService.getProcessQueueItemDuration().time()) {
processQueueItem(item);
+ } finally {
+ subscriberIdle.idle();
}
} catch (IllegalStateException e) {
/**
diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/SubscriberIdle.java b/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/SubscriberIdle.java
new file mode 100644
index 0000000..7dac19c
--- /dev/null
+++ b/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/SubscriberIdle.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sling.distribution.journal.impl.subscriber;
+
+import java.io.Closeable;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.felix.systemready.CheckStatus;
+import org.apache.felix.systemready.CheckStatus.State;
+import org.apache.felix.systemready.StateType;
+import org.apache.felix.systemready.SystemReadyCheck;
+import org.osgi.service.component.annotations.Component;
+
+/**
+ * A DistributionSubscriber is considered ready only when it is idle for more than
+ * the READY_IDLE_TIME_SECONDS at least once.
+ */
+@Component
+public class SubscriberIdle implements SystemReadyCheck, Closeable {
+ private static final int DEFAULT_IDLE_TIME_MILLIS = 10000;
+
+ private final int idleMillis;
+ private final AtomicBoolean isReady = new AtomicBoolean();
+ private final ScheduledExecutorService executor;
+ private ScheduledFuture<?> schedule;
+
+ public SubscriberIdle() {
+ this(DEFAULT_IDLE_TIME_MILLIS);
+ }
+
+ public SubscriberIdle(int idleMillis) {
+ this.idleMillis = idleMillis;
+ executor = Executors.newScheduledThreadPool(1);
+ }
+
+ @Override
+ public String getName() {
+ return "DistributionSubscriber idle";
+ }
+
+ @Override
+ public CheckStatus getStatus() {
+ State state = isReady.get() ? State.GREEN : State.RED;
+ return new CheckStatus(getName(), StateType.READY, state, "DistributionSubscriber idle");
+ }
+
+ /**
+ * Called when processing of a message starts
+ */
+ public synchronized void busy() {
+ if (schedule != null) {
+ schedule.cancel(false);
+ }
+ }
+
+ /**
+ * Called when processing of a message has finished
+ */
+ public synchronized void idle() {
+ if (!isReady.get()) {
+ busy();
+ schedule = executor.schedule(this::ready, idleMillis, TimeUnit.MILLISECONDS);
+ }
+ }
+
+ private void ready() {
+ isReady.set(true);
+ }
+
+ @Override
+ public void close() {
+ executor.shutdownNow();
+ }
+
+}
diff --git a/src/test/java/org/apache/sling/distribution/journal/impl/subscriber/SubscriberIdleTest.java b/src/test/java/org/apache/sling/distribution/journal/impl/subscriber/SubscriberIdleTest.java
new file mode 100644
index 0000000..76ae4bc
--- /dev/null
+++ b/src/test/java/org/apache/sling/distribution/journal/impl/subscriber/SubscriberIdleTest.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.sling.distribution.journal.impl.subscriber;
+
+import static org.hamcrest.CoreMatchers.equalTo;
+import static org.junit.Assert.assertThat;
+
+import org.apache.felix.systemready.CheckStatus.State;
+import org.junit.Test;
+
+public class SubscriberIdleTest {
+
+ private SubscriberIdle idle;
+
+ @Test
+ public void testIdle() throws InterruptedException {
+ idle = new SubscriberIdle(40);
+ assertState("Initial state", State.RED);
+ idle.busy();
+ idle.idle();
+ assertState("State after reset", State.RED);
+ Thread.sleep(30);
+ assertState("State after time below idle limit", State.RED);
+ idle.busy();
+ Thread.sleep(80);
+ idle.idle();
+ assertState("State after long processing", State.RED);
+ Thread.sleep(80);
+ assertState("State after time over idle limit", State.GREEN);
+ idle.busy();
+ assertState("State should not be reset once it reached GREEN", State.GREEN);
+ idle.close();
+ }
+
+ private void assertState(String message, State expectedState) {
+ assertThat(message, idle.getStatus().getState(), equalTo(expectedState));
+ }
+
+
+}
diff --git a/src/test/java/org/apache/sling/distribution/journal/impl/subscriber/SubscriberTest.java b/src/test/java/org/apache/sling/distribution/journal/impl/subscriber/SubscriberTest.java
index cf06393..6337c60 100644
--- a/src/test/java/org/apache/sling/distribution/journal/impl/subscriber/SubscriberTest.java
+++ b/src/test/java/org/apache/sling/distribution/journal/impl/subscriber/SubscriberTest.java
@@ -188,6 +188,9 @@
@Mock
private Timer.Context timerContext;
+
+ @Mock
+ private SubscriberIdle subscriberIdle;
@InjectMocks
DistributionSubscriber subscriber;
@@ -200,7 +203,7 @@
@Mock
private ServiceRegistration<DistributionAgent> reg;
-
+
private MessageHandler<PackageMessage> packageHandler;