Add support for repair coordinator to retry messages that timeout (#68)
patch by David Capwell; reviewed by Caleb Rackliffe for CASSANDRA-18816
diff --git a/accord-core/src/main/java/accord/utils/DefaultRandom.java b/accord-core/src/main/java/accord/utils/DefaultRandom.java
index 8efff22..5d00da3 100644
--- a/accord-core/src/main/java/accord/utils/DefaultRandom.java
+++ b/accord-core/src/main/java/accord/utils/DefaultRandom.java
@@ -20,25 +20,20 @@
import java.util.Random;
-public class DefaultRandom extends Random implements RandomSource
+public class DefaultRandom extends WrappedRandomSource
{
public DefaultRandom()
{
+ super(new Random());
}
public DefaultRandom(long seed)
{
- super(seed);
+ super(new Random(seed));
}
@Override
public DefaultRandom fork() {
return new DefaultRandom(nextLong());
}
-
- @Override
- public Random asJdkRandom()
- {
- return this;
- }
}
diff --git a/accord-core/src/main/java/accord/utils/RandomSource.java b/accord-core/src/main/java/accord/utils/RandomSource.java
index da5033a..3d4861e 100644
--- a/accord-core/src/main/java/accord/utils/RandomSource.java
+++ b/accord-core/src/main/java/accord/utils/RandomSource.java
@@ -248,6 +248,11 @@
}
}
+ default <T> T pick(T[] array)
+ {
+ return array[nextInt(array.length)];
+ }
+
default <T> T pick(List<T> values)
{
return pick(values, 0, values.size());
diff --git a/accord-core/src/test/java/accord/burn/random/FrequentLargeRange.java b/accord-core/src/test/java/accord/burn/random/FrequentLargeRange.java
index 41cae46..7c77b97 100644
--- a/accord-core/src/test/java/accord/burn/random/FrequentLargeRange.java
+++ b/accord-core/src/test/java/accord/burn/random/FrequentLargeRange.java
@@ -31,11 +31,11 @@
private final LongGen small, large;
private final Gen<Boolean> runs;
- public FrequentLargeRange(LongGen small, LongGen large, double ratio)
+ public FrequentLargeRange(LongGen small, LongGen large, double ratio, int maxRuns)
{
this.small = small;
this.large = large;
- this.runs = Gens.bools().biasedRepeatingRuns(ratio);
+ this.runs = Gens.bools().biasedRepeatingRuns(ratio, maxRuns);
}
@Override
@@ -54,6 +54,7 @@
{
private final RandomSource random;
private Double ratio;
+ private Integer maxRuns;
private LongGen small, large;
public Builder(RandomSource random)
@@ -73,6 +74,18 @@
return this;
}
+ public Builder maxRuns(int maxRuns)
+ {
+ this.maxRuns = maxRuns;
+ return this;
+ }
+
+ public Builder maxRuns(int min, int max)
+ {
+ this.maxRuns = random.nextInt(min, max);
+ return this;
+ }
+
public Builder small(Duration min, Duration max)
{
small = create(min, max);
@@ -132,7 +145,9 @@
throw new IllegalStateException("Large range undefined");
if (ratio == null)
ratio(1, 11);
- return new FrequentLargeRange(small, large, ratio);
+ if (maxRuns == null)
+ maxRuns(3, 15);
+ return new FrequentLargeRange(small, large, ratio, maxRuns);
}
}
}
diff --git a/accord-core/src/test/java/accord/burn/random/SegmentedRandomRangeTest.java b/accord-core/src/test/java/accord/burn/random/FrequentLargeRangeTest.java
similarity index 97%
rename from accord-core/src/test/java/accord/burn/random/SegmentedRandomRangeTest.java
rename to accord-core/src/test/java/accord/burn/random/FrequentLargeRangeTest.java
index b16cdec..ed838bb 100644
--- a/accord-core/src/test/java/accord/burn/random/SegmentedRandomRangeTest.java
+++ b/accord-core/src/test/java/accord/burn/random/FrequentLargeRangeTest.java
@@ -30,7 +30,7 @@
import static accord.utils.Property.qt;
-class SegmentedRandomRangeTest
+class FrequentLargeRangeTest
{
enum Type
{
@@ -107,13 +107,13 @@
int largeRatio = ratio.next(rs);
return new TestCase(0, ints[0], ints[1], ints[2], largeRatio, typeGen.next(rs));
};
- qt().forAll(Gens.random(), test).check(SegmentedRandomRangeTest::test);
+ qt().forAll(Gens.random(), test).check(FrequentLargeRangeTest::test);
}
private static void test(RandomSource rs, TestCase tc)
{
double ratio = tc.ratio();
- FrequentLargeRange period = new FrequentLargeRange(tc.min(rs), tc.max(rs), ratio);
+ FrequentLargeRange period = new FrequentLargeRange(tc.min(rs), tc.max(rs), ratio, tc.largeRatio);
int numSamples = 1000;
int maxResamples = 1000;
diff --git a/accord-core/src/test/java/accord/impl/basic/NodeSink.java b/accord-core/src/test/java/accord/impl/basic/NodeSink.java
index a19d696..ccec87f 100644
--- a/accord-core/src/test/java/accord/impl/basic/NodeSink.java
+++ b/accord-core/src/test/java/accord/impl/basic/NodeSink.java
@@ -179,8 +179,8 @@
private Supplier<Action> actions()
{
- Gen<Boolean> drops = Gens.bools().biasedRepeatingRuns(0.01);
- Gen<Boolean> failures = Gens.bools().biasedRepeatingRuns(0.01);
+ Gen<Boolean> drops = Gens.bools().biasedRepeatingRuns(0.01, random.nextInt(3, 15));
+ Gen<Boolean> failures = Gens.bools().biasedRepeatingRuns(0.01, random.nextInt(3, 15));
Gen<Action> actionGen = rs -> {
if (drops.next(rs))
return Action.DROP;
diff --git a/accord-core/src/test/java/accord/utils/GenTest.java b/accord-core/src/test/java/accord/utils/GenTest.java
index 7a04669..8d4ebd7 100644
--- a/accord-core/src/test/java/accord/utils/GenTest.java
+++ b/accord-core/src/test/java/accord/utils/GenTest.java
@@ -141,7 +141,7 @@
{
double ratio = 0.0625;
int samples = 1000;
- Gen<Runs> gen = Gens.lists(Gens.bools().biasedRepeatingRuns(ratio)).ofSize(samples).map(Runs::new);
+ Gen<Runs> gen = Gens.lists(Gens.bools().biasedRepeatingRuns(ratio, 15)).ofSize(samples).map(Runs::new);
qt().forAll(gen).check(runs -> {
assertThat(IntStream.of(runs.runs).filter(i -> i > 5).toArray()).isNotEmpty();
assertThat(runs.counts.get(true) / 1000.0).isBetween(ratio * .5, 0.1);
diff --git a/accord-core/src/test/java/accord/utils/Gens.java b/accord-core/src/test/java/accord/utils/Gens.java
index 244cd64..741efd1 100644
--- a/accord-core/src/test/java/accord/utils/Gens.java
+++ b/accord-core/src/test/java/accord/utils/Gens.java
@@ -34,7 +34,6 @@
import java.util.function.Supplier;
import java.util.stream.Collectors;
-
public class Gens {
private Gens() {
}
@@ -170,10 +169,9 @@
return RandomSource::nextBoolean;
}
- public Gen<Boolean> biasedRepeatingRuns(double ratio)
+ public Gen<Boolean> biasedRepeatingRuns(double ratio, int maxRuns)
{
Invariants.checkArgument(ratio > 0 && ratio <= 1, "Expected %d to be larger than 0 and <= 1", ratio);
- int steps = (int) (1 / ratio);
double lower = ratio * .8;
double upper = ratio * 1.2;
return new Gen<Boolean>() {
@@ -204,7 +202,7 @@
}
if (rs.decide(ratio))
{
- run = rs.nextInt(steps);
+ run = rs.nextInt(maxRuns);
run--;
trueCount++;
return true;
@@ -281,7 +279,7 @@
return pick(values);
}
}
-
+
public static class StringDSL
{
public Gen<String> of(Gen.IntGen sizes, char[] domain)
diff --git a/accord-core/src/test/java/accord/utils/Property.java b/accord-core/src/test/java/accord/utils/Property.java
index 0173568..9c81375 100644
--- a/accord-core/src/test/java/accord/utils/Property.java
+++ b/accord-core/src/test/java/accord/utils/Property.java
@@ -18,9 +18,18 @@
package accord.utils;
+import accord.utils.async.AsyncChains;
+import accord.utils.async.AsyncResult;
+import accord.utils.async.AsyncResults;
+
+import java.time.Duration;
import java.util.Arrays;
import java.util.Objects;
+import java.util.concurrent.ExecutionException;
import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import javax.annotation.Nullable;
public class Property
{
@@ -30,6 +39,8 @@
protected int examples = 1000;
protected boolean pure = true;
+ @Nullable
+ protected Duration timeout = null;
protected Common() {
}
@@ -38,6 +49,7 @@
this.seed = other.seed;
this.examples = other.examples;
this.pure = other.pure;
+ this.timeout = other.timeout;
}
public T withSeed(long seed)
@@ -59,6 +71,51 @@
this.pure = pure;
return (T) this;
}
+
+ public T withTimeout(Duration timeout)
+ {
+ this.timeout = timeout;
+ return (T) this;
+ }
+
+ protected void checkWithTimeout(Runnable fn)
+ {
+ AsyncResult.Settable<?> promise = AsyncResults.settable();
+ Thread t = new Thread(() -> {
+ try
+ {
+ fn.run();
+ promise.setSuccess(null);
+ }
+ catch (Throwable e)
+ {
+ promise.setFailure(e);
+ }
+ });
+ t.setName("property with timeout");
+ t.setDaemon(true);
+ try
+ {
+ t.start();
+ AsyncChains.getBlocking(promise, timeout.toNanos(), TimeUnit.NANOSECONDS);
+ }
+ catch (ExecutionException e)
+ {
+ throw new PropertyError(propertyError(this, e.getCause()));
+ }
+ catch (InterruptedException e)
+ {
+ t.interrupt();
+ throw new PropertyError(propertyError(this, e));
+ }
+ catch (TimeoutException e)
+ {
+ t.interrupt();
+ TimeoutException override = new TimeoutException("property test did not complete within " + this.timeout);
+ override.setStackTrace(new StackTraceElement[0]);
+ throw new PropertyError(propertyError(this, override));
+ }
+ }
}
public static class ForBuilder extends Common<ForBuilder>
@@ -162,6 +219,16 @@
public void check(FailingConsumer<T> fn)
{
+ if (timeout != null)
+ {
+ checkWithTimeout(() -> checkInternal(fn));
+ return;
+ }
+ checkInternal(fn);
+ }
+
+ private void checkInternal(FailingConsumer<T> fn)
+ {
RandomSource random = new DefaultRandom(seed);
for (int i = 0; i < examples; i++)
{
@@ -202,6 +269,16 @@
public void check(FailingBiConsumer<A, B> fn)
{
+ if (timeout != null)
+ {
+ checkWithTimeout(() -> checkInternal(fn));
+ return;
+ }
+ checkInternal(fn);
+ }
+
+ private void checkInternal(FailingBiConsumer<A, B> fn)
+ {
RandomSource random = new DefaultRandom(seed);
for (int i = 0; i < examples; i++)
{
@@ -246,6 +323,16 @@
public void check(FailingTriConsumer<A, B, C> fn)
{
+ if (timeout != null)
+ {
+ checkWithTimeout(() -> checkInternal(fn));
+ return;
+ }
+ checkInternal(fn);
+ }
+
+ private void checkInternal(FailingTriConsumer<A, B, C> fn)
+ {
RandomSource random = new DefaultRandom(seed);
for (int i = 0; i < examples; i++)
{
@@ -270,16 +357,23 @@
}
}
- private static void checkInterrupted() throws InterruptedException {
+ private static void checkInterrupted() throws InterruptedException
+ {
if (Thread.currentThread().isInterrupted())
throw new InterruptedException();
}
public static class PropertyError extends AssertionError
{
- public PropertyError(String message, Throwable cause) {
+ public PropertyError(String message, Throwable cause)
+ {
super(message, cause);
}
+
+ public PropertyError(String message)
+ {
+ super(message);
+ }
}
public static ForBuilder qt()