Add support for repair coordinator to retry messages that timeout (#68)

patch by David Capwell; reviewed by Caleb Rackliffe for CASSANDRA-18816
diff --git a/accord-core/src/main/java/accord/utils/DefaultRandom.java b/accord-core/src/main/java/accord/utils/DefaultRandom.java
index 8efff22..5d00da3 100644
--- a/accord-core/src/main/java/accord/utils/DefaultRandom.java
+++ b/accord-core/src/main/java/accord/utils/DefaultRandom.java
@@ -20,25 +20,20 @@
 
 import java.util.Random;
 
-public class DefaultRandom extends Random implements RandomSource
+public class DefaultRandom extends WrappedRandomSource
 {
     public DefaultRandom()
     {
+        super(new Random());
     }
 
     public DefaultRandom(long seed)
     {
-        super(seed);
+        super(new Random(seed));
     }
 
     @Override
     public DefaultRandom fork() {
         return new DefaultRandom(nextLong());
     }
-
-    @Override
-    public Random asJdkRandom()
-    {
-        return this;
-    }
 }
diff --git a/accord-core/src/main/java/accord/utils/RandomSource.java b/accord-core/src/main/java/accord/utils/RandomSource.java
index da5033a..3d4861e 100644
--- a/accord-core/src/main/java/accord/utils/RandomSource.java
+++ b/accord-core/src/main/java/accord/utils/RandomSource.java
@@ -248,6 +248,11 @@
         }
     }
 
+    default <T> T pick(T[] array)
+    {
+        return array[nextInt(array.length)];
+    }
+
     default <T> T pick(List<T> values)
     {
         return pick(values, 0, values.size());
diff --git a/accord-core/src/test/java/accord/burn/random/FrequentLargeRange.java b/accord-core/src/test/java/accord/burn/random/FrequentLargeRange.java
index 41cae46..7c77b97 100644
--- a/accord-core/src/test/java/accord/burn/random/FrequentLargeRange.java
+++ b/accord-core/src/test/java/accord/burn/random/FrequentLargeRange.java
@@ -31,11 +31,11 @@
     private final LongGen small, large;
     private final Gen<Boolean> runs;
 
-    public FrequentLargeRange(LongGen small, LongGen large, double ratio)
+    public FrequentLargeRange(LongGen small, LongGen large, double ratio, int maxRuns)
     {
         this.small = small;
         this.large = large;
-        this.runs = Gens.bools().biasedRepeatingRuns(ratio);
+        this.runs = Gens.bools().biasedRepeatingRuns(ratio, maxRuns);
     }
 
     @Override
@@ -54,6 +54,7 @@
     {
         private final RandomSource random;
         private Double ratio;
+        private Integer maxRuns;
         private LongGen small, large;
 
         public Builder(RandomSource random)
@@ -73,6 +74,18 @@
             return this;
         }
 
+        public Builder maxRuns(int maxRuns)
+        {
+            this.maxRuns = maxRuns;
+            return this;
+        }
+
+        public Builder maxRuns(int min, int max)
+        {
+            this.maxRuns = random.nextInt(min, max);
+            return this;
+        }
+
         public Builder small(Duration min, Duration max)
         {
             small = create(min, max);
@@ -132,7 +145,9 @@
                 throw new IllegalStateException("Large range undefined");
             if (ratio == null)
                 ratio(1, 11);
-            return new FrequentLargeRange(small, large, ratio);
+            if (maxRuns == null)
+                maxRuns(3, 15);
+            return new FrequentLargeRange(small, large, ratio, maxRuns);
         }
     }
 }
diff --git a/accord-core/src/test/java/accord/burn/random/SegmentedRandomRangeTest.java b/accord-core/src/test/java/accord/burn/random/FrequentLargeRangeTest.java
similarity index 97%
rename from accord-core/src/test/java/accord/burn/random/SegmentedRandomRangeTest.java
rename to accord-core/src/test/java/accord/burn/random/FrequentLargeRangeTest.java
index b16cdec..ed838bb 100644
--- a/accord-core/src/test/java/accord/burn/random/SegmentedRandomRangeTest.java
+++ b/accord-core/src/test/java/accord/burn/random/FrequentLargeRangeTest.java
@@ -30,7 +30,7 @@
 
 import static accord.utils.Property.qt;
 
-class SegmentedRandomRangeTest
+class FrequentLargeRangeTest
 {
     enum Type
     {
@@ -107,13 +107,13 @@
             int largeRatio = ratio.next(rs);
             return new TestCase(0, ints[0], ints[1], ints[2], largeRatio, typeGen.next(rs));
         };
-        qt().forAll(Gens.random(), test).check(SegmentedRandomRangeTest::test);
+        qt().forAll(Gens.random(), test).check(FrequentLargeRangeTest::test);
     }
 
     private static void test(RandomSource rs, TestCase tc)
     {
         double ratio = tc.ratio();
-        FrequentLargeRange period = new FrequentLargeRange(tc.min(rs), tc.max(rs), ratio);
+        FrequentLargeRange period = new FrequentLargeRange(tc.min(rs), tc.max(rs), ratio, tc.largeRatio);
         int numSamples = 1000;
         int maxResamples = 1000;
 
diff --git a/accord-core/src/test/java/accord/impl/basic/NodeSink.java b/accord-core/src/test/java/accord/impl/basic/NodeSink.java
index a19d696..ccec87f 100644
--- a/accord-core/src/test/java/accord/impl/basic/NodeSink.java
+++ b/accord-core/src/test/java/accord/impl/basic/NodeSink.java
@@ -179,8 +179,8 @@
 
     private Supplier<Action> actions()
     {
-        Gen<Boolean> drops = Gens.bools().biasedRepeatingRuns(0.01);
-        Gen<Boolean> failures = Gens.bools().biasedRepeatingRuns(0.01);
+        Gen<Boolean> drops = Gens.bools().biasedRepeatingRuns(0.01, random.nextInt(3, 15));
+        Gen<Boolean> failures = Gens.bools().biasedRepeatingRuns(0.01, random.nextInt(3, 15));
         Gen<Action> actionGen = rs -> {
             if (drops.next(rs))
                 return Action.DROP;
diff --git a/accord-core/src/test/java/accord/utils/GenTest.java b/accord-core/src/test/java/accord/utils/GenTest.java
index 7a04669..8d4ebd7 100644
--- a/accord-core/src/test/java/accord/utils/GenTest.java
+++ b/accord-core/src/test/java/accord/utils/GenTest.java
@@ -141,7 +141,7 @@
     {
         double ratio = 0.0625;
         int samples = 1000;
-        Gen<Runs> gen = Gens.lists(Gens.bools().biasedRepeatingRuns(ratio)).ofSize(samples).map(Runs::new);
+        Gen<Runs> gen = Gens.lists(Gens.bools().biasedRepeatingRuns(ratio, 15)).ofSize(samples).map(Runs::new);
         qt().forAll(gen).check(runs -> {
             assertThat(IntStream.of(runs.runs).filter(i -> i > 5).toArray()).isNotEmpty();
             assertThat(runs.counts.get(true) / 1000.0).isBetween(ratio * .5, 0.1);
diff --git a/accord-core/src/test/java/accord/utils/Gens.java b/accord-core/src/test/java/accord/utils/Gens.java
index 244cd64..741efd1 100644
--- a/accord-core/src/test/java/accord/utils/Gens.java
+++ b/accord-core/src/test/java/accord/utils/Gens.java
@@ -34,7 +34,6 @@
 import java.util.function.Supplier;
 import java.util.stream.Collectors;
 
-
 public class Gens {
     private Gens() {
     }
@@ -170,10 +169,9 @@
             return RandomSource::nextBoolean;
         }
 
-        public Gen<Boolean> biasedRepeatingRuns(double ratio)
+        public Gen<Boolean> biasedRepeatingRuns(double ratio, int maxRuns)
         {
             Invariants.checkArgument(ratio > 0 && ratio <= 1, "Expected %d to be larger than 0 and <= 1", ratio);
-            int steps = (int) (1 / ratio);
             double lower = ratio * .8;
             double upper = ratio * 1.2;
             return new Gen<Boolean>() {
@@ -204,7 +202,7 @@
                     }
                     if (rs.decide(ratio))
                     {
-                        run = rs.nextInt(steps);
+                        run = rs.nextInt(maxRuns);
                         run--;
                         trueCount++;
                         return true;
@@ -281,7 +279,7 @@
             return pick(values);
         }
     }
-    
+
     public static class StringDSL
     {
         public Gen<String> of(Gen.IntGen sizes, char[] domain)
diff --git a/accord-core/src/test/java/accord/utils/Property.java b/accord-core/src/test/java/accord/utils/Property.java
index 0173568..9c81375 100644
--- a/accord-core/src/test/java/accord/utils/Property.java
+++ b/accord-core/src/test/java/accord/utils/Property.java
@@ -18,9 +18,18 @@
 
 package accord.utils;
 
+import accord.utils.async.AsyncChains;
+import accord.utils.async.AsyncResult;
+import accord.utils.async.AsyncResults;
+
+import java.time.Duration;
 import java.util.Arrays;
 import java.util.Objects;
+import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import javax.annotation.Nullable;
 
 public class Property
 {
@@ -30,6 +39,8 @@
         protected int examples = 1000;
 
         protected boolean pure = true;
+        @Nullable
+        protected Duration timeout = null;
 
         protected Common() {
         }
@@ -38,6 +49,7 @@
             this.seed = other.seed;
             this.examples = other.examples;
             this.pure = other.pure;
+            this.timeout = other.timeout;
         }
 
         public T withSeed(long seed)
@@ -59,6 +71,51 @@
             this.pure = pure;
             return (T) this;
         }
+
+        public T withTimeout(Duration timeout)
+        {
+            this.timeout = timeout;
+            return (T) this;
+        }
+
+        protected void checkWithTimeout(Runnable fn)
+        {
+            AsyncResult.Settable<?> promise = AsyncResults.settable();
+            Thread t = new Thread(() -> {
+                try
+                {
+                    fn.run();
+                    promise.setSuccess(null);
+                }
+                catch (Throwable e)
+                {
+                    promise.setFailure(e);
+                }
+            });
+            t.setName("property with timeout");
+            t.setDaemon(true);
+            try
+            {
+                t.start();
+                AsyncChains.getBlocking(promise, timeout.toNanos(), TimeUnit.NANOSECONDS);
+            }
+            catch (ExecutionException e)
+            {
+                throw new PropertyError(propertyError(this, e.getCause()));
+            }
+            catch (InterruptedException e)
+            {
+                t.interrupt();
+                throw new PropertyError(propertyError(this, e));
+            }
+            catch (TimeoutException e)
+            {
+                t.interrupt();
+                TimeoutException override = new TimeoutException("property test did not complete within " + this.timeout);
+                override.setStackTrace(new StackTraceElement[0]);
+                throw new PropertyError(propertyError(this, override));
+            }
+        }
     }
 
     public static class ForBuilder extends Common<ForBuilder>
@@ -162,6 +219,16 @@
 
         public void check(FailingConsumer<T> fn)
         {
+            if (timeout != null)
+            {
+                checkWithTimeout(() -> checkInternal(fn));
+                return;
+            }
+            checkInternal(fn);
+        }
+
+        private void checkInternal(FailingConsumer<T> fn)
+        {
             RandomSource random = new DefaultRandom(seed);
             for (int i = 0; i < examples; i++)
             {
@@ -202,6 +269,16 @@
 
         public void check(FailingBiConsumer<A, B> fn)
         {
+            if (timeout != null)
+            {
+                checkWithTimeout(() -> checkInternal(fn));
+                return;
+            }
+            checkInternal(fn);
+        }
+
+        private void checkInternal(FailingBiConsumer<A, B> fn)
+        {
             RandomSource random = new DefaultRandom(seed);
             for (int i = 0; i < examples; i++)
             {
@@ -246,6 +323,16 @@
 
         public void check(FailingTriConsumer<A, B, C> fn)
         {
+            if (timeout != null)
+            {
+                checkWithTimeout(() -> checkInternal(fn));
+                return;
+            }
+            checkInternal(fn);
+        }
+
+        private void checkInternal(FailingTriConsumer<A, B, C> fn)
+        {
             RandomSource random = new DefaultRandom(seed);
             for (int i = 0; i < examples; i++)
             {
@@ -270,16 +357,23 @@
         }
     }
 
-    private static void checkInterrupted() throws InterruptedException {
+    private static void checkInterrupted() throws InterruptedException
+    {
         if (Thread.currentThread().isInterrupted())
             throw new InterruptedException();
     }
 
     public static class PropertyError extends AssertionError
     {
-        public PropertyError(String message, Throwable cause) {
+        public PropertyError(String message, Throwable cause)
+        {
             super(message, cause);
         }
+
+        public PropertyError(String message)
+        {
+            super(message);
+        }
     }
 
     public static ForBuilder qt()