SLING-8531 - Fixes from review
diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/shared/ExponentialBackOff.java b/src/main/java/org/apache/sling/distribution/journal/impl/shared/ExponentialBackOff.java
index 148bd2a..eb3ebb6 100644
--- a/src/main/java/org/apache/sling/distribution/journal/impl/shared/ExponentialBackOff.java
+++ b/src/main/java/org/apache/sling/distribution/journal/impl/shared/ExponentialBackOff.java
@@ -19,7 +19,7 @@
package org.apache.sling.distribution.journal.impl.shared;
import java.io.Closeable;
-import java.io.IOException;
+import java.time.Duration;
import java.util.Random;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
@@ -31,7 +31,8 @@
/**
* Retry with exponential backoff.
*
- * Calls the checkCallback until it does not throw an Exception
+ * Calls checkCallback until it does not throw an Exception.
+ * Retries are first done with startDelay, then doubled until maxDelay is reached.
*/
public class ExponentialBackOff implements Closeable {
private final Logger log = LoggerFactory.getLogger(this.getClass());
@@ -44,17 +45,21 @@
private int currentMaxDelay;
- public ExponentialBackOff(int maxDelay, Runnable checkCallback) {
- this.currentMaxDelay = 128;
- this.maxDelay = maxDelay;
+ public ExponentialBackOff(Duration startDelay, Duration maxDelay, Runnable checkCallback) {
+ this.currentMaxDelay = asMS(startDelay);
+ this.maxDelay = asMS(maxDelay);
this.checkCallback = checkCallback;
this.executor = Executors.newScheduledThreadPool(1);
this.random = new Random();
scheduleCheck();
}
+ private int asMS(Duration startDelay) {
+ return new Long(startDelay.toMillis()).intValue();
+ }
+
@Override
- public void close() throws IOException {
+ public synchronized void close() {
this.executor.shutdown();
}
@@ -68,6 +73,7 @@
private void check() {
try {
this.checkCallback.run();
+ this.close();
} catch (RuntimeException e) {
scheduleCheck();
}
diff --git a/src/main/java/org/apache/sling/distribution/journal/impl/shared/JournalAvailableChecker.java b/src/main/java/org/apache/sling/distribution/journal/impl/shared/JournalAvailableChecker.java
index dcb5fd9..9f95cb7 100644
--- a/src/main/java/org/apache/sling/distribution/journal/impl/shared/JournalAvailableChecker.java
+++ b/src/main/java/org/apache/sling/distribution/journal/impl/shared/JournalAvailableChecker.java
@@ -18,8 +18,11 @@
*/
package org.apache.sling.distribution.journal.impl.shared;
+import static java.time.temporal.ChronoUnit.MINUTES;
+import static java.time.temporal.ChronoUnit.SECONDS;
import static java.util.Objects.requireNonNull;
+import java.time.Duration;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.commons.io.IOUtils;
@@ -44,14 +47,14 @@
)
public class JournalAvailableChecker implements JournalAvailable, EventHandler {
- private static final int MAX_RETGRY_DELAY_MS = 10000;
+ private static final Duration INITIAL_RETRY_DELAY = Duration.of(1, SECONDS);
+ private static final Duration MAX_RETRY_DELAY = Duration.of(5, MINUTES);
// Minimal number of errors before journal is considered unavailable
public static final int MIN_ERRORS = 2;
private static final Logger LOG = LoggerFactory.getLogger(JournalAvailableChecker.class);
-
@Reference
Topics topics;
@@ -98,10 +101,10 @@
}
private void available() {
+ LOG.info("Journal is available");
+ IOUtils.closeQuietly(this.backoffRetry);
this.backoffRetry = null;
if (this.reg == null) {
- IOUtils.closeQuietly(this.backoffRetry);
- LOG.info("Journal is available");
this.reg = context.registerService(JournalAvailable.class, this, null);
}
}
@@ -147,12 +150,12 @@
unRegister();
startChecks();
} else {
- LOG.info("Received exception event {}. {} of {} errors occured.", type, this.numErrors.get(), MIN_ERRORS);
+ LOG.info("Received exception event {}. {} of {} errors occurred.", type, curNumErrors, MIN_ERRORS);
}
}
private void startChecks() {
- this.backoffRetry = new ExponentialBackOff(MAX_RETGRY_DELAY_MS, this::run);
+ this.backoffRetry = new ExponentialBackOff(INITIAL_RETRY_DELAY, MAX_RETRY_DELAY, this::run);
this.numErrors.set(0);
}
}
diff --git a/src/test/java/org/apache/sling/distribution/journal/impl/shared/ExponentialBackoffTest.java b/src/test/java/org/apache/sling/distribution/journal/impl/shared/ExponentialBackoffTest.java
index 01bcec5..1fc07eb 100644
--- a/src/test/java/org/apache/sling/distribution/journal/impl/shared/ExponentialBackoffTest.java
+++ b/src/test/java/org/apache/sling/distribution/journal/impl/shared/ExponentialBackoffTest.java
@@ -18,9 +18,12 @@
*/
package org.apache.sling.distribution.journal.impl.shared;
+import static java.time.temporal.ChronoUnit.MILLIS;
import static org.hamcrest.CoreMatchers.equalTo;
import static org.junit.Assert.assertThat;
+import java.time.Duration;
+import java.time.temporal.ChronoUnit;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
@@ -32,13 +35,14 @@
public class ExponentialBackoffTest {
private static final int RETRIES = 10;
- private static final int MAX_DELAY_MS = 256;
+ private static final Duration INITIAL_DELAY = Duration.of(64, MILLIS);
+ private static final Duration MAX_DELAY = Duration.of(256, MILLIS);
private CountDownLatch countDown = new CountDownLatch(RETRIES);
@Test
public void testIsAvailable() throws Exception {
- ExponentialBackOff backOff = new ExponentialBackOff(MAX_DELAY_MS, this::checkCallback);
- boolean finished = this.countDown.await(MAX_DELAY_MS * RETRIES, TimeUnit.MILLISECONDS);
+ ExponentialBackOff backOff = new ExponentialBackOff(INITIAL_DELAY, MAX_DELAY, this::checkCallback);
+ boolean finished = this.countDown.await(MAX_DELAY.toMillis() * RETRIES, TimeUnit.MILLISECONDS);
backOff.close();
assertThat("Should finish before the timeout", finished, equalTo(true));
}