SLING-8944 - Register systemready check programmatically to make sure it
only happens for the receiving side.
diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/DistributionSubscriber.java b/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/DistributionSubscriber.java
index c5e25e2..3a40fbf 100644
--- a/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/DistributionSubscriber.java
+++ b/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/DistributionSubscriber.java
@@ -136,7 +136,6 @@
@Reference
private Packaging packaging;
- @Reference
private SubscriberIdle subscriberIdle;
private ServiceRegistration<DistributionAgent> componentReg;
@@ -176,7 +175,9 @@
requireNonNull(topics);
requireNonNull(eventAdmin);
requireNonNull(precondition);
-
+
+ subscriberIdle = new SubscriberIdle(context, SubscriberIdle.DEFAULT_IDLE_TIME_MILLIS);
+
queueNames = getNotEmpty(config.agentNames());
int maxRetries = config.maxRetries();
boolean editable = config.editable();
@@ -236,11 +237,9 @@
@Deactivate
public void deactivate() {
- IOUtils.closeQuietly(announcer);
- IOUtils.closeQuietly(bookKeeper);
componentReg.unregister();
- IOUtils.closeQuietly(packagePoller);
- IOUtils.closeQuietly(commandPoller);
+ IOUtils.closeQuietly(subscriberIdle, announcer, bookKeeper,
+ packagePoller, commandPoller);
running = false;
Thread interrupter = this.queueProcessor;
if (interrupter != null) {
diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/SubscriberIdle.java b/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/SubscriberIdle.java
index 3bf5745..784c33d 100644
--- a/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/SubscriberIdle.java
+++ b/src/main/java/org/apache/sling/distribution/journal/impl/subscriber/SubscriberIdle.java
@@ -19,6 +19,7 @@
package org.apache.sling.distribution.journal.impl.subscriber;
import java.io.Closeable;
+import java.util.Hashtable;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
@@ -29,28 +30,27 @@
import org.apache.felix.systemready.CheckStatus.State;
import org.apache.felix.systemready.StateType;
import org.apache.felix.systemready.SystemReadyCheck;
-import org.osgi.service.component.annotations.Component;
+import org.osgi.framework.BundleContext;
+import org.osgi.framework.ServiceRegistration;
/**
* A DistributionSubscriber is considered ready only when it is idle for more than
* the READY_IDLE_TIME_SECONDS at least once.
*/
-@Component(service = {SubscriberIdle.class, SystemReadyCheck.class})
public class SubscriberIdle implements SystemReadyCheck, Closeable {
- private static final int DEFAULT_IDLE_TIME_MILLIS = 10000;
+ public static final int DEFAULT_IDLE_TIME_MILLIS = 10000;
private final int idleMillis;
private final AtomicBoolean isReady = new AtomicBoolean();
private final ScheduledExecutorService executor;
private ScheduledFuture<?> schedule;
-
- public SubscriberIdle() {
- this(DEFAULT_IDLE_TIME_MILLIS);
- }
- public SubscriberIdle(int idleMillis) {
+ private ServiceRegistration<SystemReadyCheck> reg;
+
+ public SubscriberIdle(BundleContext context, int idleMillis) {
this.idleMillis = idleMillis;
executor = Executors.newScheduledThreadPool(1);
+ this.reg = context.registerService(SystemReadyCheck.class, this, new Hashtable<>());
}
@Override
@@ -90,6 +90,9 @@
@Override
public void close() {
executor.shutdownNow();
+ if (reg != null) {
+ reg.unregister();
+ }
}
}
diff --git a/src/test/java/org/apache/sling/distribution/journal/impl/subscriber/SubscriberIdleTest.java b/src/test/java/org/apache/sling/distribution/journal/impl/subscriber/SubscriberIdleTest.java
index 76ae4bc..a424671 100644
--- a/src/test/java/org/apache/sling/distribution/journal/impl/subscriber/SubscriberIdleTest.java
+++ b/src/test/java/org/apache/sling/distribution/journal/impl/subscriber/SubscriberIdleTest.java
@@ -23,14 +23,18 @@
import org.apache.felix.systemready.CheckStatus.State;
import org.junit.Test;
+import org.mockito.Mockito;
+import org.osgi.framework.BundleContext;
public class SubscriberIdleTest {
+ private static final int IDLE_MILLIES = 40;
private SubscriberIdle idle;
-
+
@Test
public void testIdle() throws InterruptedException {
- idle = new SubscriberIdle(40);
+ BundleContext context = Mockito.mock(BundleContext.class);
+ idle = new SubscriberIdle(context , IDLE_MILLIES);
assertState("Initial state", State.RED);
idle.busy();
idle.idle();
@@ -52,5 +56,4 @@
assertThat(message, idle.getStatus().getState(), equalTo(expectedState));
}
-
}