Accord: PreLoadContext must properly and consistently support ranges

patch by David Capwell; reviewed by Benedict Elliott Smith for CASSANDRA-19355
diff --git a/accord-core/src/main/java/accord/local/CommandsForKey.java b/accord-core/src/main/java/accord/local/CommandsForKey.java
index 0e24d67..9e628b9 100644
--- a/accord-core/src/main/java/accord/local/CommandsForKey.java
+++ b/accord-core/src/main/java/accord/local/CommandsForKey.java
@@ -1698,7 +1698,6 @@
         if (o == null || getClass() != o.getClass()) return false;
         CommandsForKey that = (CommandsForKey) o;
         return Objects.equals(key, that.key)
-               && Objects.equals(redundantBefore, that.redundantBefore)
                && Arrays.equals(txns, that.txns);
     }
 
diff --git a/accord-core/src/main/java/accord/utils/CheckpointIntervalArray.java b/accord-core/src/main/java/accord/utils/CheckpointIntervalArray.java
new file mode 100644
index 0000000..84063da
--- /dev/null
+++ b/accord-core/src/main/java/accord/utils/CheckpointIntervalArray.java
@@ -0,0 +1,222 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package accord.utils;
+
+import java.util.Arrays;
+
+import accord.utils.CheckpointIntervalArrayBuilder.Accessor;
+import net.nicoulaj.compilecommand.annotations.Inline;
+
+import static accord.utils.SortedArrays.Search.CEIL;
+
+public class CheckpointIntervalArray<Ranges, Range, Key>
+{
+    // scan distance can be kept very small as we guarantee to use at most linear extra space even with a scan distance of zero
+    static final int MAX_SCAN_DISTANCE = 255;
+    protected static final int BIT30 = 0x40000000;
+    protected static final int BIT29 = 0x20000000;
+
+    final Ranges ranges;
+
+    /**
+     * The lower bound for each checkpoint.
+     * The checkpoint {@code i} applies to all ranges (incl) starting from {@code lowerBounds[i]},
+     * but before (excl) {@code lowerBounds[i+1]}.
+     */
+    final int[] lowerBounds;
+
+    /**
+     * Logically one entry per checkpoint, mapping {@link #lowerBounds} to {@link #checkpointLists},
+     * however we also encode an additional byte per entry representing the scan distance for the
+     * ranges handled by this checkpoint. These are grouped into an integer per four mappings, i.e.
+     * we encode batches of five ints, with the first int containing the four scan distances for the
+     * next four checkpoints, and the following four ints containing the respective offsets into
+     * {@link #checkpointLists}.
+     * <p>
+     * [0.........32b.........64b.........96b........128b........160b........192b]
+     * [ d1 d2 d3 d4  mapping1    mapping2   mapping3    mapping4    d5 d6 d7 d8 ]
+     */
+    final int[] headers;
+
+    /**
+     * A list of indexes in {@link #ranges} contained by each checkpoint; checkpoints are
+     * mapped from {@link #lowerBounds} by {@link #headers}.
+     * <p>
+     * Entries are sorted in descending order by the end of the range they cover, so that
+     * a search of this collection my terminate as soon as it encounters a range that does
+     * not cover the item we are searching for.
+     * <p>
+     * This collection may contain negative values, in which case these point to other
+     * checkpoints, whose <i>direct</i> contents (i.e. the positive values of) we may
+     * search.
+     * <ul> if negative, points to an earlier checkpoint, and:
+     *   <li>if the 30th bit is set, the low 20 bits point to checkpointsList,
+     *       and the 9 bits in-between provide the length of the range</li>
+     *   <li>otherwise, if the 29th bit is set, the lower 29 bits points to checkpointsList,
+     *       and can be iterated safely without an endIndex</li>
+     *   <li>otherwise, the low 29 bits provide the length of the run, and the low 31 bits
+     *       of the following entry (which will also be negative) provide a pointer to
+     *       checkpointsList</li>
+     * </ul>
+     */
+    final int[] checkpointLists;
+
+    public final int maxScanAndCheckpointMatches;
+    private final Accessor<Ranges, Range, Key> accessor;
+
+    public CheckpointIntervalArray(Accessor<Ranges, Range, Key> accessor, Ranges ranges,
+                                   int[] lowerBounds, int[] headers, int[] checkpointLists, int maxScanAndCheckpointMatches)
+    {
+        this.accessor = accessor;
+        this.ranges = ranges;
+        this.lowerBounds = lowerBounds;
+        this.headers = headers;
+        this.checkpointLists = checkpointLists;
+        this.maxScanAndCheckpointMatches = maxScanAndCheckpointMatches;
+    }
+
+    @Inline
+    public <P1, P2, P3, P4> int forEach(Range range, IndexedQuadConsumer<P1, P2, P3, P4> forEachScanOrCheckpoint, IndexedRangeQuadConsumer<P1, P2, P3, P4> forEachRange, P1 p1, P2 p2, P3 p3, P4 p4, int minIndex)
+    {
+        return forEach(accessor.start(range), accessor.end(range), forEachScanOrCheckpoint, forEachRange, p1, p2, p3, p4, minIndex);
+    }
+
+    public <P1, P2, P3, P4> int forEach(Key startKey, Key endKey, IndexedQuadConsumer<P1, P2, P3, P4> forEachScanOrCheckpoint, IndexedRangeQuadConsumer<P1, P2, P3, P4> forEachRange, P1 p1, P2 p2, P3 p3, P4 p4, int minIndex)
+    {
+        if (accessor.size(ranges) == 0 || minIndex == accessor.size(ranges))
+            return minIndex;
+
+        var c = accessor.keyComparator();
+        int end = accessor.binarySearch(ranges, minIndex, accessor.size(ranges), endKey, (a, b) -> c.compare(a, accessor.start(b)), CEIL);
+        if (end < 0) end = -1 - end;
+        if (end <= minIndex) return minIndex;
+
+        int floor = accessor.binarySearch(ranges, minIndex, accessor.size(ranges), startKey, (a, b) -> c.compare(a, accessor.start(b)), CEIL);
+        int start = floor;
+        if (floor < 0)
+        {
+            // if there's no precise match on start, step backwards;
+            // if this range does not overlap us, step forwards again for start
+            // but retain the floor index for performing scan and checkpoint searches from
+            // as this contains all ranges that might overlap us (whereas those that end
+            // after us but before the next range's start would be missed by the next range index)
+            start = floor = -2 - floor;
+            if (start < 0)
+                start = floor = 0;
+            else if (c.compare(accessor.end(ranges, start), startKey) <= 0)
+                ++start;
+        }
+
+        // Since endInclusive() != startInclusive(), so no need to adjust start/end comparisons
+        return forEach(start, end, floor, startKey, 0, forEachScanOrCheckpoint, forEachRange, p1, p2, p3, p4, minIndex);
+    }
+
+    @Inline
+    protected <P1, P2, P3, P4> int forEach(int start, int end, int floor, Key startBound, int cmpStartBoundWithEnd,
+                                           IndexedQuadConsumer<P1, P2, P3, P4> forEachScanOrCheckpoint, IndexedRangeQuadConsumer<P1, P2, P3, P4> forEachRange,
+                                           P1 p1, P2 p2, P3 p3, P4 p4, int minIndex)
+    {
+        if (start < minIndex) start = minIndex;
+
+        // find the checkpoint array, so we know how far to step back
+        int checkpoint = Arrays.binarySearch(lowerBounds, floor);
+        if (checkpoint < 0) checkpoint = -2 - checkpoint;
+        if (checkpoint < 0) return end;
+
+        int headerBaseIndex = (checkpoint / 4) * 5;
+        int headerSubIndex = checkpoint & 3;
+        int headerListIndex = headerBaseIndex + 1 + headerSubIndex;
+
+        int scanDistance = (headers[headerBaseIndex] >>> (8 * headerSubIndex)) & 0xff;
+        int checkpointStart = headers[headerListIndex];
+        int checkpointEnd = headers[headerListIndex + (headerSubIndex + 5)/4]; // skip the next header
+
+        if (scanDistance == MAX_SCAN_DISTANCE)
+        {
+            scanDistance = -checkpointLists[checkpointStart++];
+            Invariants.checkState(scanDistance >= MAX_SCAN_DISTANCE);
+        }
+
+        // NOTE: we visit in approximately ascending order, and this is a requirement for correctness of RangeDeps builders
+        //       Only the checkpoint is visited in uncertain order, but it is visited entirely, before the scan matches
+        //       and the range matches
+        int minScanIndex = Math.max(floor - scanDistance, minIndex);
+        var c = accessor.keyComparator();
+        for (int i = checkpointStart; i < checkpointEnd ; ++i)
+        {
+            int ri = checkpointLists[i];
+            if (ri < 0)
+            {
+                int subStart, subEnd;
+                if ((ri & BIT30) != 0)
+                {
+                    subStart = ri & 0xfffff;
+                    subEnd = subStart + ((ri >>> 20) & 0x1ff);
+                }
+                else if ((ri & BIT29) != 0)
+                {
+                    subStart = ri & 0x1fffffff;
+                    subEnd = Integer.MAX_VALUE;
+                }
+                else
+                {
+                    int length = ri & 0x1fffffff;
+                    subStart = checkpointLists[++i];
+                    subEnd = subStart + length;
+                }
+
+                for (int j = subStart ; j < subEnd ; ++j)
+                {
+                    ri = checkpointLists[j];
+                    if (ri < 0)
+                        continue;
+
+                    if (c.compare(accessor.end(ranges, ri), startBound) <= cmpStartBoundWithEnd)
+                        break;
+
+                    if (ri >= minIndex && ri < minScanIndex)
+                        forEachScanOrCheckpoint.accept(p1, p2, p3, p4, ri);
+                }
+            }
+            else
+            {
+                // if startBound is key, we cannot be equal to it;
+                // if startBound is a Range start, we also cannot be equal to it due to the requirement that
+                // endInclusive() != startInclusive(), so equality really means inequality
+                if (c.compare(accessor.end(ranges, ri), startBound) <= cmpStartBoundWithEnd)
+                    break;
+
+                if (ri >= minIndex && ri < minScanIndex)
+                    forEachScanOrCheckpoint.accept(p1, p2, p3, p4, ri);
+            }
+        }
+
+        for (int i = minScanIndex; i < floor ; ++i)
+        {
+            if (c.compare(accessor.end(ranges, i), startBound) > cmpStartBoundWithEnd)
+                forEachScanOrCheckpoint.accept(p1, p2, p3, p4, i);
+        }
+
+        if (start == end)
+            return end;
+
+        forEachRange.accept(p1, p2, p3, p4, start, end);
+        return end;
+    }
+}
diff --git a/accord-core/src/main/java/accord/utils/CheckpointIntervalArrayBuilder.java b/accord-core/src/main/java/accord/utils/CheckpointIntervalArrayBuilder.java
new file mode 100644
index 0000000..95e1d86
--- /dev/null
+++ b/accord-core/src/main/java/accord/utils/CheckpointIntervalArrayBuilder.java
@@ -0,0 +1,1133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package accord.utils;
+
+import java.util.Arrays;
+import java.util.Comparator;
+import java.util.Objects;
+import java.util.TreeSet;
+import java.util.stream.Collectors;
+import javax.annotation.Nonnull;
+
+import static accord.utils.ArrayBuffers.cachedInts;
+import static accord.utils.CheckpointIntervalArray.MAX_SCAN_DISTANCE;
+import static accord.utils.SortedArrays.Search.CEIL;
+
+public class CheckpointIntervalArrayBuilder<Ranges, Range, RoutingKey>
+{
+    public enum Strategy
+    {
+        /**
+         * Do not tenure any ranges that are scannable from the currently in-effect max scan distance.
+         * This means we probably do less work on construction, but that our measure of the match count
+         * at any point is inaccurate, and so our heuristics for when to write checkpoints may be wrong,
+         * leading to more checkpoints than necessary.
+         */
+        FAST,
+
+        /**
+         * Tenure every range covering more than goalScanDistance. Any within max scan distance will also
+         * update the scan distance, so that they will be filtered when a checkpoint is written. But
+         * in the meantime they permit accurate tracking of the number of matches a query can return,
+         * permitting our complexity calculations (that determine when checkpoints should be written, and
+         * what our maximum scan distance should be), to be accurate. This can avoid bouncing between
+         * two extremes, where a low max scan distance tenures and correctly detects a desirable larger scan
+         * distance, which we rollover and this prevents us tenuring and tracking the number of matches, so
+         * that we then pick a low max scan distance (and thereby also write a new checkpoint)
+         */
+        ACCURATE
+    }
+
+    /**
+     * Should we maintain pointers to prior checkpoints that we may reference instead of reserializing
+     * the remaining contents. This is cheap to visit as we stop enumerating as soon as we encounter
+     * an entry that no longer covers us. We use some simple heuristics when deciding whether to do
+     * this, namely that there are at least two entries (so we save one checkpoint entry) and that
+     * there is at least one direct entry for each indirect/link entry in the range we will link.
+     */
+    public enum Links
+    {
+        LINKS,
+        NO_LINKS
+    }
+
+    public interface Accessor<Ranges, Range, RoutingKey>
+    {
+        int size(Ranges ranges);
+        Range get(Ranges ranges, int index);
+        RoutingKey start(Ranges ranges, int index);
+        RoutingKey start(Range range);
+        RoutingKey end(Ranges ranges, int index);
+        RoutingKey end(Range range);
+        Comparator<RoutingKey> keyComparator();
+        int binarySearch(Ranges ranges, int from, int to, RoutingKey find, AsymmetricComparator<RoutingKey, Range> comparator, SortedArrays.Search op);
+    }
+
+    private static final int BIT31 = 0x80000000;
+    private static final int BIT30 = 0x40000000;
+    private static final int BIT29 = 0x20000000;
+    static final int MIN_INDIRECT_LINK_LENGTH = 2;
+
+    final Accessor<Ranges, Range, RoutingKey> accessor;
+    final boolean isAccurate;
+    final boolean withLinks;
+    final Ranges ranges;
+
+    int[] bounds;
+    int[] headers;
+    int[] lists;
+    int checkpointCount, headerPointer, listCount;
+
+    final Scan<Ranges, Range, RoutingKey> scan;
+    final TenuredSet<Ranges, Range, RoutingKey> tenured;
+    final PendingCheckpoint<Ranges, Range, RoutingKey> pending = new PendingCheckpoint<>();
+
+    // track the maximum possible number of entries we can match with both a scan + checkpoint lookup
+    // this is an over-estimate and may be used by consumers to allocate out-of-order buffers for visitations
+    int maxScanAndCheckpointMatches;
+
+    public CheckpointIntervalArrayBuilder(Accessor<Ranges, Range, RoutingKey> accessor,
+                                          Ranges ranges,
+                                          Strategy strategy, Links links)
+    {
+        this(accessor, ranges, Math.min(MAX_SCAN_DISTANCE, 34 - Integer.numberOfLeadingZeros(accessor.size(ranges))), strategy, links);
+    }
+
+    public CheckpointIntervalArrayBuilder(Accessor<Ranges, Range, RoutingKey> accessor,
+                                          Ranges ranges,
+                                          int goalScanDistance,
+                                          Strategy strategy, Links links)
+    {
+        this.accessor = accessor;
+        this.isAccurate = strategy == Strategy.ACCURATE;
+        this.withLinks = links == Links.LINKS;
+        Invariants.checkArgument(goalScanDistance <= MAX_SCAN_DISTANCE);
+        Invariants.checkArgument(goalScanDistance > 0);
+        this.ranges = ranges;
+        this.scan = new Scan<>(accessor);
+        this.tenured = new TenuredSet<>(accessor);
+        init(ranges, goalScanDistance);
+    }
+
+    void init(Ranges ranges, int goalScanDistance)
+    {
+        // we write checkpoints at least goalScanDistance apart
+        scan.init(goalScanDistance);
+        ArrayBuffers.IntBuffers cachedInts = cachedInts();
+        // ask for int buffers in descending order of size
+        int size = accessor.size(ranges);
+        this.lists = cachedInts.getInts(size); // this one might need to grow
+        // +2 to round-up each division, and +2 to account for the final entry (which might require an empty scan distance header)
+        this.headers = cachedInts.getInts(((size / goalScanDistance) * 5) / 4 + 4);
+        this.bounds = cachedInts.getInts(size / goalScanDistance + 1);
+    }
+
+    public interface Factory<T, Ranges>
+    {
+        T build(Ranges ranges, int[] bounds, int[] headers, int[] lists, int maxScanAndCheckpointMatches);
+    }
+
+    /**
+     * Walk over each range, looking ahead by {@link #maxScanDistance} to decide if a range should
+     * be tenured (written to a checkpoint) or scanned; the maximum scan distance is determined by the
+     * number of open tenured entries, i.e. the minimum number of results we can expect to be returned
+     * (or, if greater, the logarithm of the number of ranges in the collection).
+     * <p>
+     * Once we encounter a range that should be tenured, either write a checkpoint immediately
+     * or make a note of the position we must scan to from the last entry in this checkpoint
+     * and wait until it is permitted to write a checkpoint. This range will be tenured either
+     * way for the following checkpoint.
+     * <p>
+     * The only reason not to write a checkpoint immediately is in the case we would breach
+     * our linear space complexity limit, which is imposed by ensuring we have a space between
+     * checkpoints at least as large as the number of entries written to the last checkpoint,
+     * discounted by the number of entries we have removed from the tenured collection since
+     * the last checkpoint.
+     */
+    public CheckpointIntervalArray<Ranges, Range, RoutingKey> build()
+    {
+        return build((ranges, bounds, headers, lists, maxScanAndCheckpointMatches) -> new CheckpointIntervalArray<>(accessor, ranges, bounds, headers, lists, maxScanAndCheckpointMatches));
+    }
+
+    public <T> T build(Factory<T, Ranges> factory)
+    {
+        int size = accessor.size(ranges);
+        for (int ri = 0 ; ri < size ; ++ri)
+        {
+            // write a checkpoint if we meet our linear space complexity requirements
+            // and we either have a tenured range that we must scan,
+            // or the scan distance is now much larger than the minimum number of search results
+            if (shouldWriteCheckpoint(ri))
+                writeCheckpoint(ri);
+
+            // either tenure or update scan distance, potentially writing a checkpoint
+            tenureOrScan(ri);
+            tenured.untenure(ri);
+        }
+
+        // write our final pending checkpoint
+        writeCheckpoint(size);
+        closeHeaders();
+
+        ArrayBuffers.IntBuffers cachedInts = cachedInts();
+        int[] lists = cachedInts.completeAndDiscard(this.lists, listCount);
+        int[] headers = cachedInts.completeAndDiscard(this.headers, headerPointer);
+        int[] bounds = cachedInts.completeAndDiscard(this.bounds, checkpointCount);
+        return factory.build(ranges, bounds, headers, lists, maxScanAndCheckpointMatches);
+    }
+
+    /**
+     * Categorise the candidateIdx as either scannable, and if so update the scan distance;
+     * or unscannable, in which case add it to the {@link #tenured} collection.
+     * Note, that in ACCURATE mode we tenure the item if it is outside of the goalScanDistance
+     * so we may track O(k) accurately above the O(lg2(N)) search and default scan distance,
+     * but we still update the scan distance so that the checkpoint will exclude this entry.
+     */
+    private void tenureOrScan(int index)
+    {
+        Invariants.checkArgument(index >= 0);
+
+        // then either migrate the index to pendingTenured, or ensure it will be scanned
+        RoutingKey end = accessor.end(ranges, index);
+        int scanLimit = scanLimit(index, isAccurate ? scan.goal : maxScanDistance());
+        if (shouldTenure(end, scanLimit))
+        {
+            int lastIndex = tenured.tenure(end, index, ranges, scanLimit + 1);
+            if (lastIndex - index > maxScanDistance()) scan.tenured(index);
+            else if (!isAccurate) throw new IllegalStateException();
+            else scan.updateScanDistance(index, lastIndex - index, this);
+        }
+        else
+        {
+            // TODO (low priority, efficiency): if the prior checkpoint has a scan distance >= this one,
+            //  and <= 50% more than this one and there's no scanMustReachIndex nor tenuredRanges, don't
+            //  write a new checkpoint (perhaps split shouldWriteCheckpoint logic in two)
+            scan.update(end, index, ranges, scanLimit, this);
+        }
+    }
+
+    /**
+     * We are forbidden from writing a checkpoint nearer than this to a prior checkpoint.
+     * This imposes our linear space complexity bounds, while not harming our O(log2(N) + K)
+     * complexity bounds, as we guarantee minimumSpan is never more than the number of query
+     * results.
+     */
+    private int minimumSpan()
+    {
+        return Math.max(scan.goal(), tenured.minimumSpan());
+    }
+
+    private int maxScanDistance()
+    {
+        // minimumSpan() reduces overtime, but there is no reason to reduce our scan distance
+        // for tenuring below the scan distance we will write
+        return Math.max(scan.watermark(), minimumSpan());
+    }
+
+    /**
+     * The index after the last index we can scan from {@code atIndex} with at most {@code maxScanDistance}.
+     */
+    private int scanLimit(int atIndex, int maxScanDistance)
+    {
+        return Math.min(1 + atIndex + maxScanDistance, accessor.size(ranges));
+    }
+
+    private boolean shouldTenure(RoutingKey end, int scanLimit)
+    {
+        return scanLimit < accessor.size(ranges) && accessor.keyComparator().compare(end, accessor.start(ranges, scanLimit)) > 0;
+    }
+
+    private boolean canWriteCheckpoint(int atIndex)
+    {
+        return atIndex - pending.atIndex >= minimumSpan();
+    }
+
+    private boolean shouldWriteCheckpoint(int atIndex)
+    {
+        if (!canWriteCheckpoint(atIndex))
+            return false;
+
+        // TODO (desired, efficiency): consider these triggers
+        if (scan.mustCheckpointToScanTenured(atIndex, maxScanDistance()))
+            return true;
+
+        return scan.hasMaybeDivergedFromMatchSize(tenured);
+    }
+
+    /**
+     * Write a checkpoint for ranges[prevCheckpointIndex...ri)
+     *
+     * 1) Finalise the scan distance
+     * 2) Write the header
+     * 3) Filter the pending tenured ranges to remove those we can scan
+     * 4) Write this list out
+     * 5) Setup a link to this list, if it is large enough
+     * 6) Rollover the scan, tenured and pending structures for the new pending checkpoint
+     */
+    private void writeCheckpoint(int nextCheckpointIndex)
+    {
+        int lastIndex = nextCheckpointIndex - 1;
+        int scanDistance = scan.finalise(lastIndex);
+        scanDistance = extendScanDistance(lastIndex, scanDistance);
+
+        if (pending.atIndex < 0)
+        {
+            // we don't have any checkpoints pending, so don't try to finalise it
+            // but if the new checkpoint doesn't cover index 0, insert a new empty
+            // checkpoint for the scan distance
+            if (nextCheckpointIndex > 0)
+            {
+                // setup an initial empty checkpoint to store the first scan distance
+                maxScanAndCheckpointMatches = scanDistance;
+                writeHeader(scanDistance, 0);
+            }
+        }
+        else
+        {
+            writeHeader(scanDistance, pending.atIndex);
+            int maxCheckpointMatchCount = pending.filter(scanDistance, lastIndex);
+            int listIndex = writeList(pending);
+            if (withLinks)
+                pending.setupLinkChain(tenured, listIndex, listCount);
+            maxScanAndCheckpointMatches = Math.max(maxScanAndCheckpointMatches, scanDistance + maxCheckpointMatchCount);
+        }
+
+        savePendingCheckpointAndResetScanDistance(nextCheckpointIndex);
+    }
+
+    private void savePendingCheckpointAndResetScanDistance(int checkpointIndex)
+    {
+        // use the tail of checkpointListBuf to buffer ranges we plan to tenure
+        ensureCapacity(tenured.count() + scan.watermark());
+
+        scan.reset();
+
+        if (isAccurate)
+        {
+            // TODO (low priority, efficiency): we can shift back the existing scanDistance if it's far enough from
+            //  the next checkpoint. this might permit us to skip some comparisons
+            scan.resetPeakMax(tenured);
+            for (Tenured<Ranges, Range, RoutingKey> tenured : this.tenured)
+            {
+                int distanceToEnd = (tenured.lastIndex - checkpointIndex);
+                if (distanceToEnd >= scan.peakMax)
+                    break;
+
+                int scanDistance = tenured.lastIndex - tenured.index;
+                if (scanDistance <= scan.peakMax)
+                    scan.updateScanDistance(tenured.index, scanDistance, null);
+            }
+
+            if (scan.watermark() < scan.goal)
+            {
+                int ri = Scan.minScanIndex(checkpointIndex, scan.goal);
+                while (ri < checkpointIndex)
+                {
+                    RoutingKey end = accessor.end(ranges, ri);
+                    int scanLimit = scanLimit(ri, scan.peakMax);
+                    if (!shouldTenure(end, scanLimit))
+                        scan.update(end, ri, ranges, scanLimit, null);
+                    ++ri;
+                }
+            }
+        }
+        else
+        {
+            // the maximum scan distance that could ever have been adopted for last chunk
+            int oldPeakMax = scan.peakMax();
+            // the minimum scan distance we will start with for processing the proceeding ranges
+            // note: this may increase if we decide to tenure additional ranges, at which point it will be the actual newPeakMax
+            int newMinPeakMax = scan.newPeakMax(tenured);
+            int minUntenuredIndex = scan.minUntenuredIndex(checkpointIndex, tenured);
+            int minScanIndex = Scan.minScanIndex(checkpointIndex, newMinPeakMax);
+
+            // we now make sure tenured and scan are correct for the new parameters.
+            // 1) if our peakMax is lower then we need to go back and find items to tenure that we previously marked for scanning
+            // 2) we must also reset our scan distances
+
+            // since our peakMax is determined by tenured.count(), but we are tenuring items here we keep things simple
+            // and do not account for those items we tenure but would later permit to scan as our peakMax grows
+
+            int ri = Math.min(minUntenuredIndex, minScanIndex);
+            while (ri < checkpointIndex)
+            {
+                RoutingKey end = accessor.end(ranges, ri);
+                int newPeakMax = scan.newPeakMax(tenured);
+                int scanLimit = scanLimit(ri, newPeakMax);
+                if (shouldTenure(end, scanLimit))
+                {
+                    // note: might have already been tenured
+                    // in this case our untenureLimit may be incorrect, but we won't use it
+                    if (ri >= minUntenuredIndex && newPeakMax < oldPeakMax)
+                        tenured.tenure(end, ri, ranges, scanLimit + 1, scanLimit(ri, oldPeakMax));
+                }
+                else
+                {
+                    // this might effectively remove a previously tenured item
+                    scan.update(end, ri, ranges, scanLimit, null);
+                }
+                ++ri;
+            }
+
+            scan.resetPeakMax(tenured);
+        }
+
+        pending.atIndex = checkpointIndex;
+        pending.clear();
+        tenured.rollover(pending);
+    }
+
+    private int extendScanDistance(int lastIndex, int scanDistance)
+    {
+        // now we've established our lower bound on scan distance, see how many checkpoints we can remove
+        // by increasing our scan distance so that it remains proportional to the number of results returned
+        // TODO (low priority, efficiency): can reduce cost here by using scanDistances array for upper bounds to scan distance
+        int maxScanDistance = scan.goal() + 2 * Math.min(tenured.count(), tenured.countAtPrevCheckpoint());
+        if (maxScanDistance >= 1 + scanDistance + scanDistance/4 && pending.count() >= (maxScanDistance - scanDistance)/2)
+        {
+            int removeCount = 0;
+            int extendedScanDistance = scanDistance;
+            int target = (maxScanDistance - scanDistance)/2;
+            for (int i = 0 ; i < pending.count() ; ++i)
+            {
+                Tenured<Ranges, Range, RoutingKey> t = pending.get(i);
+                if (t.index < 0)
+                    continue;
+
+                int distance = Math.min(lastIndex, t.lastIndex) - t.index;
+                if (distance <= scanDistance)
+                    continue; // already scanned or untenured
+
+                if (distance <= maxScanDistance)
+                {
+                    ++removeCount;
+                    extendedScanDistance = Math.max(extendedScanDistance, distance);
+                    if (extendedScanDistance == maxScanDistance && removeCount >= target)
+                        break;
+                }
+            }
+
+            // TODO (low priority, efficiency): should perhaps also gate this decision on the span we're covering
+            //  algorithmically, however, so long as we are under maxScanDistance we are fine
+            if (removeCount >= (extendedScanDistance - scanDistance)/2)
+                scanDistance = extendedScanDistance;
+        }
+        return scanDistance;
+    }
+
+    int writeList(PendingCheckpoint<Ranges, Range, RoutingKey> pending)
+    {
+        int startIndex = listCount;
+        for (int i = pending.count() - 1 ; i >= 0 ; --i)
+        {
+            Tenured<Ranges, Range, RoutingKey> t = pending.get(i);
+            if (t.index >= 0)
+            {
+                lists[listCount++] = t.index;
+            }
+            else
+            {
+                int index = t.index & ~BIT31;
+                int length = t.linkLength & ~BIT31;
+                if (length <= 0xff && index <= 0xfffff)
+                {
+                    lists[listCount++] = BIT31 | BIT30 | (length << 20) | index;
+                }
+                else if (t.linkLength >= 0 && length < BIT30)
+                {
+                    lists[listCount++] = BIT31 | BIT29 | index;
+                }
+                else
+                {
+                    lists[listCount++] = BIT31 | length;
+                    lists[listCount++] = BIT31 | pending.count();
+                }
+            }
+        }
+        return startIndex;
+    }
+
+    void writeHeader(int scanDistance, int lowerBound)
+    {
+        int headerScanDistance = Math.min(scanDistance, MAX_SCAN_DISTANCE);
+
+        if ((checkpointCount & 3) == 0)
+            headers[headerPointer++] = headerScanDistance;
+        else
+            headers[headerPointer - (1 + (checkpointCount & 3))] |= headerScanDistance << (8 * (checkpointCount & 3));
+
+        bounds[checkpointCount++] = lowerBound;
+        headers[headerPointer++] = listCount;
+
+        if (scanDistance >= MAX_SCAN_DISTANCE)
+            lists[listCount++] = -scanDistance; // serialize as a negative value so we ignore it in most cases automatically
+    }
+
+    void closeHeaders()
+    {
+        // write our final checkpoint header
+        if ((checkpointCount & 3) == 0) headers[headerPointer++] = 0;
+        headers[headerPointer++] = listCount;
+    }
+
+    void ensureCapacity(int maxPendingSize)
+    {
+        if (listCount + maxPendingSize >= lists.length)
+            lists = cachedInts().resize(lists, listCount, lists.length + lists.length/2 + maxPendingSize);
+    }
+
+    static class Scan<Ranges, Range, RoutingKey>
+    {
+        final Accessor<Ranges, Range, RoutingKey> accessor;
+        /** the scan distance we are aiming for; should be proportional to log2(N) */
+        int goal;
+
+        /** the indexes at which we increased the scan distance, and the new scan distance */
+        int[] distances = new int[16];
+        /** the number of unique scan distances we have adopted since the last checkpoint */
+        int count;
+        /** the highest scan distance we have adopted (==scanDistance(scanDistanceCount-1)) */
+        int watermark;
+        /**
+         * the first index we have tenured a range from, but for which we did not immediately write a new checkpoint
+         * we *must* scan at least from the last index in the checkpoint to here
+         */
+        int scanTenuredAtIndex = -1;
+
+        /** The maximum (i.e. initial) scan distance limit we have used since the last attempted checkpoint write */
+        int peakMax;
+
+        Scan(Accessor<Ranges, Range, RoutingKey> accessor)
+        {
+            this.accessor = accessor;
+        }
+
+        void init(int goalScanDistance)
+        {
+            goal = peakMax = goalScanDistance;
+        }
+
+        private void update(RoutingKey end, int atIndex, Ranges ranges, int scanLimit, CheckpointIntervalArrayBuilder<Ranges, Range, RoutingKey> checkpoint)
+        {
+            int newScanDistance = find(end, atIndex, ranges, scanLimit, watermark);
+            updateScanDistance(atIndex, newScanDistance, checkpoint);
+        }
+
+        private void updateScanDistance(int atIndex, int newScanDistance, CheckpointIntervalArrayBuilder<Ranges, Range, RoutingKey> checkpoint)
+        {
+            if (newScanDistance > watermark)
+            {
+                // TODO (desired, efficiency): we don't mind slight increases to the watermark;
+                //  should really look at scan distance history and ensure we haven't e.g. doubled since
+                //  some earlier point (and should track the match count + scan distance at each bump
+                //  to check overall work hasn't increased too much)
+                if (checkpoint != null && checkpoint.canWriteCheckpoint(atIndex))
+                    checkpoint.writeCheckpoint(atIndex);
+
+                watermark = newScanDistance;
+                if (count * 2 == distances.length)
+                    distances = Arrays.copyOf(distances, distances.length * 2);
+                distances[count * 2] = newScanDistance;
+                distances[count * 2 + 1] = atIndex;
+                ++count;
+            }
+        }
+
+        private int find(RoutingKey end, int atIndex, Ranges ranges, int scanLimit, int currentScanDistance)
+        {
+            var c = accessor.keyComparator();
+            int lowerIndex = accessor.binarySearch(ranges, atIndex + currentScanDistance, scanLimit, end, (e, s) -> c.compare(e, accessor.start(s)), CEIL);
+            if (lowerIndex < 0) lowerIndex = -2 - lowerIndex;
+            else lowerIndex -= 1;
+            return lowerIndex - atIndex;
+        }
+
+        boolean isAboveGoal()
+        {
+            return watermark > goal;
+        }
+
+        int watermark()
+        {
+            return watermark;
+        }
+
+        int goal()
+        {
+            return goal;
+        }
+
+        int distanceToTenured(int lastIndex)
+        {
+            return scanTenuredAtIndex >= 0 ? lastIndex - scanTenuredAtIndex : 0;
+        }
+
+        boolean mustCheckpointToScanTenured(int checkpointIndex, int maxScanDistance)
+        {
+            return scanTenuredAtIndex >= 0 && checkpointIndex - scanTenuredAtIndex >= maxScanDistance;
+        }
+
+        /**
+         * Are we scanning a much longer distance than the minimum number of matches we know a query will return?
+         * Note: with Strategy.FAST, {@code tenured.count()} gets less accurate as scan distance increases, so this
+         * will bounce around triggering checkpoints due to the larger scan distance, resetting the scan distance
+         * and starting again
+         */
+        boolean hasMaybeDivergedFromMatchSize(TenuredSet<Ranges, Range, RoutingKey> tenured)
+        {
+            return isAboveGoal() && tenured.count() < watermark()/2;
+        }
+
+        private int distance(int i)
+        {
+            return distances[i*2];
+        }
+
+        private int index(int i)
+        {
+            return distances[i*2+1];
+        }
+
+        int finalise(int lastIndex)
+        {
+            Invariants.checkState(distanceToTenured(lastIndex) <= Math.max(watermark(), peakMax()));
+
+            int scanDistance = watermark;
+            // then, compute the minimum scan distance implied by any tenured ranges we did not immediately
+            // write a checkpoint for - we *must* scan back as far as this record
+            int minScanDistance = scanTenuredAtIndex >= 0 ? lastIndex - scanTenuredAtIndex : 0;
+            if (minScanDistance > scanDistance)
+            {
+                // if this minimum is larger than the largest scan distance we picked up for non-tenured ranges
+                // then we are done, as there's nothing we can save
+                scanDistance = minScanDistance;
+            }
+            else if (scanDistance > 0)
+            {
+                // otherwise, we can look to see if any of the scan distances we computed overflow the checkpoint,
+                // i.e. where no records served by this checkpoint need to scan the full distance to reach it
+                int distanceToLastScanIndex = lastIndex - index(count -1);
+                // if the distance to the last scan index is larger than its scan distance, we have overflowed;
+                if (distanceToLastScanIndex < scanDistance)
+                {
+                    minScanDistance = Math.max(distanceToLastScanIndex, minScanDistance);
+                    // loop until we find one that doesn't overflow, as this is another minimum scan distance
+                    int i = count - 1;
+                    while (--i >= 0)
+                    {
+                        int distance = lastIndex - index(i);
+                        if (distance >= distance(i)) break;
+                        else if (distance > minScanDistance) minScanDistance = distance;
+                    }
+                    if (i >= 0) scanDistance = Math.max(minScanDistance, distance(i));
+                    else scanDistance = minScanDistance;
+                }
+            }
+
+            return scanDistance;
+        }
+
+        void reset()
+        {
+            // we could in theory reset our scan distance using the contents of scanDistance[]
+            // but it's a bit complicated, as we want to have the first item to increment the scan distance
+            // so that we can use it in writeScanDistance to shrink the scan distance;
+            // jumping straight to the highest scan distance breaks this
+            count = 0;
+            scanTenuredAtIndex = -1;
+            watermark = 0;
+        }
+
+        void resetPeakMax(TenuredSet<Ranges, Range, RoutingKey> tenured)
+        {
+            peakMax = newPeakMax(tenured);
+        }
+
+        int peakMax()
+        {
+            return peakMax;
+        }
+
+        int newPeakMax(TenuredSet<Ranges, Range, RoutingKey> tenured)
+        {
+            return Math.max(goal, tenured.count());
+        }
+
+        /**
+         * The minimum index containing a range that might need to be tenured, if we have a smaller max scan distance than before
+         */
+        int minUntenuredIndex(int checkpointIndex, TenuredSet<Ranges, Range, RoutingKey> tenured)
+        {
+            int minUntenuredIndex = Math.max(0, (checkpointIndex - 1) - watermark());
+            // the maximum scan distance that cxould ever have been adopted for the ranges processed since last checkpoint
+            int oldPeakMax = peakMax;
+            int newMinPeakMax = newPeakMax(tenured);
+            if (newMinPeakMax < oldPeakMax)
+            {
+                // minimise range we unnecessarily re-tenure over
+                // TODO (low priority, efficiency): see if can also use to reduce range we re-scan e.g. can recycle
+                //  scanDistances contents if we know we won't need to step back further at next checkpoint
+                for (int i = count - 1; i >= 0 ; --i)
+                {
+                    if (index(i) <= minUntenuredIndex)
+                        break;
+                    if (distance(i) <= newMinPeakMax)
+                        return i + 1 == count ? index(i) : index(i + 1) - 1;
+                }
+            }
+            return minUntenuredIndex;
+        }
+
+        /**
+         * Record that a range at this index has been tenured, so that we can track how far back
+         * we need to scan to determine how long we can defer writing a new checkpoint while still
+         * being able to scan it.
+         *
+         * TODO (low priority, efficiency): when a checkpoint is written, we should consider moving it
+         *  earlier if the scan distance is increased primarily because of this index, and the tenured
+         *  collection is otherwise unchanged (so can be written with minimal overhead)
+         */
+        void tenured(int atIndex)
+        {
+            if (scanTenuredAtIndex < 0)
+                scanTenuredAtIndex = atIndex;
+        }
+
+        static int minScanIndex(int checkpointIndex, int scanDistance)
+        {
+            return Math.max(0, (checkpointIndex - 1) - scanDistance);
+        }
+
+        @Override
+        public String toString()
+        {
+            return "Scan{watermark=" + watermark + ", tenured=" + scanTenuredAtIndex + '}';
+        }
+    }
+
+    /**
+     * Record-keeping for a range we have decided is not scannable
+     */
+    static class Tenured<Ranges, Range, RoutingKey> implements Comparable<Tenured<Ranges, Range, RoutingKey>>
+    {
+        final Accessor<Ranges, Range, RoutingKey> accessor;
+        /**
+         * The end of the tenured range covered by the contents referred to be {@link #index}
+         */
+        RoutingKey end;
+
+        /**
+         * <ul>
+         * <li>If positive, this points to {@code ranges[index]}</li>
+         * <li>If negative, this points to an entry in {@link #lists};
+         * see {@link SearchableRangeList#checkpointLists}</li>
+         * </ul>
+         */
+        int index;
+
+        /**
+         * The last index in {@link #ranges} covered by this tenured range
+         */
+        int lastIndex;
+
+        /**
+         * set when this record is serialized in a checkpoint list to either:
+         * <ul>
+         * <li>point to itself, in which case no action should be
+         *     taken on removal (it is only retained for size bookkeeping); or</li>
+         * <li>point to the next item in the checkpoint list; the first
+         *     such element removed triggers the clearing of the checkpoint
+         *     list so that its entries are re-inserted in the next checkpoint</li>
+         * </ul>
+         */
+        Tenured<Ranges, Range, RoutingKey> next;
+
+        /**
+         * Only set for link entries, i.e. where {@code index < 0}.
+         * <ul>
+         * <li>if positive, the length is optional as we will terminate safely using the end bound filtering</li>
+         * <li>if negative, the low 31 bits <b>must</b> be retrieved as the length for safe iteration</li>
+         * </ul>
+         */
+        int linkLength;
+
+        Tenured(Accessor<Ranges, Range, RoutingKey> accessor, RoutingKey end, int index)
+        {
+            this.accessor = accessor;
+            this.end = end;
+            this.index = index;
+        }
+
+        @Override
+        public int compareTo(@Nonnull Tenured<Ranges, Range, RoutingKey> that)
+        {
+            int c = accessor.keyComparator().compare(this.end, that.end);
+            // we sort indexes in reverse order so later tenured items find the earlier ones with same end when searching
+            // for higher entries for the range of indexes to search, and
+            if (c == 0) c = -Integer.compare(this.index, that.index);
+            return c;
+        }
+
+        @Override
+        public String toString()
+        {
+            return "Tenured{end=" + end + ", index=" + index + '}';
+        }
+    }
+
+    /**
+     * The set of ranges that we intend to write to checkpoints that remain open at the current point in the iteration
+     * This collection may be filtered before serialization, but every member will be visited either by scanning
+     * or visiting the checkpoint list
+     * TODO (low priority, efficiency): save garbage by using an insertion-sorted array for collections where
+     *  this is sufficient. later, introduce a mutable b-tree supporting object recycling. we would also like
+     *  to use a collection that permits us to insert and return a finger into the tree so we can find the
+     *  successor as part of insertion, and that permits constant-time first() calls
+     */
+    static class TenuredSet<Ranges, Range, RoutingKey> extends TreeSet<Tenured<Ranges, Range, RoutingKey>>
+    {
+        final Accessor<Ranges, Range, RoutingKey> accessor;
+        /**
+         * the number of direct tenured entries (i.e. ignoring link entries)
+         * this is used to provide a minimum bound on the number of results a range query can return
+         * note: with Strategy.FAST this gets less accurate as the span distance increases
+         */
+        int directCount;
+        int directCountAtPrevCheckpoint;
+        int minSpan;
+
+        // a stack of recently used EndAndIndex objects - used only for the duration of a single build
+        Tenured<Ranges, Range, RoutingKey> reuse, pendingReuse, pendingReuseTail;
+
+        TenuredSet(Accessor<Ranges, Range, RoutingKey> accessor)
+        {
+            this.accessor = accessor;
+        }
+
+        int count()
+        {
+            return directCount;
+        }
+
+        int countAtPrevCheckpoint()
+        {
+            return directCountAtPrevCheckpoint;
+        }
+
+        /**
+         * We require a checkpoint to cover a distance at least as large as the number of tenured ranges leftover
+         * since the prior checkpoint, to ensure these require at most linear additional space, while not requiring
+         * more than O(k) additional complexity on search (i.e., we will scan a number of elements at most equal
+         * to the number we have to visit in the checkpoint).
+         *
+         * We achieve this by recording the minimum number of match results as of the prior checkpoint (i.e. {@link #count()})
+         * and discounting it by one each time we untenure a range, so that for each tenured range from the prior checkpoint
+         * we have either untenured a range or processed at least one additional input.
+         */
+        int minimumSpan()
+        {
+            return minSpan;
+        }
+
+        private int tenure(RoutingKey end, int index, Ranges ranges, int minUntenureIndex)
+        {
+            return tenure(newTenured(end, index), ranges, minUntenureIndex, accessor.size(ranges));
+        }
+
+        private void tenure(RoutingKey end, int index, Ranges ranges, int minUntenureIndex, int untenureLimit)
+        {
+            tenure(newTenured(end, index), ranges, minUntenureIndex, untenureLimit);
+        }
+
+        private int tenure(Tenured<Ranges, Range, RoutingKey> tenure, Ranges ranges, int untenureMinIndex, int untenureLimit)
+        {
+            if (!add(tenure))
+                return tenure.lastIndex;
+
+            Tenured<Ranges, Range, RoutingKey> next = higher(tenure);
+            if (next != null)
+                untenureLimit = Math.min(untenureLimit, next.lastIndex + 1);
+            var c = accessor.keyComparator();
+            int untenureIndex = accessor.binarySearch(ranges, untenureMinIndex, untenureLimit, tenure.end, (e, s) -> c.compare(e, accessor.start(s)), CEIL);
+            if (untenureIndex < 0) untenureIndex = -1 - untenureIndex;
+            tenure.lastIndex = untenureIndex - 1;
+            Invariants.checkState(c.compare(tenure.end, accessor.start(ranges, tenure.lastIndex)) > 0);
+            Invariants.checkState(tenure.lastIndex + 1 == accessor.size(ranges) || c.compare(tenure.end, accessor.start(ranges, tenure.lastIndex + 1)) <= 0);
+            ++directCount;
+            return untenureIndex - 1;
+        }
+
+        private Tenured<Ranges, Range, RoutingKey> newTenured(RoutingKey end, int index)
+        {
+            Tenured<Ranges, Range, RoutingKey> result = reuse;
+            if (result == null)
+                return new Tenured<>(accessor, end, index);
+
+            reuse = result.next;
+            result.end = end;
+            result.index = index;
+            result.lastIndex = 0;
+            result.next = null;
+            return result;
+        }
+
+        private Tenured<Ranges, Range, RoutingKey> addLinkEntry(RoutingKey end, int index, int lastIndex, int length)
+        {
+            Invariants.checkArgument(index < 0);
+            Tenured<Ranges, Range, RoutingKey> result = newTenured(end, index);
+            result.linkLength = length;
+            result.lastIndex = lastIndex;
+            add(result);
+            return result;
+        }
+
+        /**
+         * Retire any active tenured ranges that no longer cover the pointer into ranges;
+         * if this crosses our checkpoint threshold, write a new checkpoint.
+         */
+        void untenure(int index)
+        {
+            while (!isEmpty() && first().lastIndex < index)
+            {
+                Tenured<Ranges, Range, RoutingKey> removed = pollFirst();
+
+                // if removed.next == null, this is not referenced by a link
+                // if removed.next == removed, it is referenced by a link but does not modify the link on removal
+                if (removed.next != null && removed.next != removed)
+                {
+                    // this is a member of a link's chain, which may serve one of two purposes:
+                    // 1) it may be the entry nominated to invalidate the link, due to the link
+                    //    membership shrinking below the required threshold; in which case we
+                    //    must clear the chain to reactivate its members for insertion into the
+                    //    next checkpoint, and remove the chain link itself
+                    // 2) it may be nominated as an entry to update the chain link info, to make
+                    //    it more succinct: if every entry of the chain remains active, and there
+                    //    are *many* entries then we need two integers to represent the chain, but
+                    //    as soon as any entry is invalid we can rely on this entry to terminate
+                    //    iteration, so we update the bookkeeping on the first entry we remove in
+                    //    this case
+
+                    // first clear the chain starting at the removed entry
+                    Tenured<Ranges, Range, RoutingKey> prev = removed, next = removed.next;
+                    while (next.next != null)
+                    {
+                        prev = next;
+                        next = next.next;
+                        prev.next = null;
+                    }
+                    Invariants.checkState(next.index < 0);
+                    if (prev.end == next.end)
+                    {
+                        // if this is the last entry in the link, the link is expired and should be removed/reused
+                        remove(next);
+                        if (pendingReuseTail == null)
+                            pendingReuseTail = next;
+                        next.next = pendingReuse;
+                        pendingReuse = next;
+                    }
+                    else if (next.linkLength < 0)
+                    {
+                        // otherwise, flag the link as safely consumed without knowing the length
+                        next.linkLength = next.linkLength & Integer.MAX_VALUE;
+                    }
+                }
+
+                // this was not a link reference; update our bookkeeping and save it for reuse
+                Invariants.checkState(removed.index >= 0);
+                --directCount;
+                --minSpan;
+                if (pendingReuseTail == null)
+                    pendingReuseTail = removed;
+                removed.next = pendingReuse;
+                pendingReuse = removed;
+            }
+        }
+
+        /**
+         * Write out any direct entries that are not pointed to by a chain entry, and any chain entries;
+         * rollover any per-checkpoint data and free up for reuse discarded Tenured objects
+         */
+        void rollover(PendingCheckpoint<Ranges, Range, RoutingKey> pending)
+        {
+            for (Tenured<Ranges, Range, RoutingKey> tenured : this)
+            {
+                if (tenured.next == null)
+                    pending.add(tenured);
+            }
+            // make freed Tenured objects available for reuse
+            if (pendingReuse != null)
+            {
+                pendingReuseTail.next = reuse;
+                reuse = pendingReuse;
+                pendingReuseTail = pendingReuse = null;
+            }
+            directCountAtPrevCheckpoint = minSpan = directCount;
+        }
+    }
+
+    /**
+     * we write checkpoints out before knowing the scan distance needed for the range, as a checkpoint precedes
+     * the ranges it covers; so we record the position and contents of the checkpoint, and once the scan distance is
+     * known (i.e. when the next checkpoint is written) we re-process the list to remove items we can now scan before
+     * serializing to checkpointListsBuf.
+     */
+    static class PendingCheckpoint<Ranges, Range, RoutingKey>
+    {
+        int atIndex = -1;
+        int count;
+
+        Tenured<Ranges, Range, RoutingKey>[] contents = new Tenured[10];
+
+        int openDirectCount, firstOpenDirect, openIndirectCount;
+        boolean hasClosedDirect;
+
+        int count()
+        {
+            return count;
+        }
+
+        Tenured<Ranges, Range, RoutingKey> get(int i)
+        {
+            return contents[i];
+        }
+
+        void add(Tenured<Ranges, Range, RoutingKey> tenured)
+        {
+            if (contents.length == count)
+                contents = Arrays.copyOf(contents, 2 * contents.length);
+            contents[count++] = tenured;
+        }
+
+        void clear()
+        {
+            count = 0;
+        }
+
+        /**
+         * Remove pending entries that will be scanned by the scanDistance, and update
+         * our bookkeeping for creating links
+         */
+        int filter(int scanDistance, int lastIndex)
+        {
+            int matchCountModifier = 0;
+            int maxi = count;
+            count = 0;
+            openDirectCount = 0;
+            openIndirectCount = 0;
+            firstOpenDirect = -1;
+//            lastClosedDirect = -1;
+
+            for (int i = 0; i < maxi ; ++i)
+            {
+                Tenured<Ranges, Range, RoutingKey> t = get(i);
+                if (t.index >= 0)
+                {
+                    if (t.index + scanDistance >= lastIndex)
+                        continue; // last index will find it with a scan
+
+                    if (t.lastIndex <= t.index + scanDistance)
+                        continue; // all indexes will find it with a scan
+
+                    if (t.lastIndex > lastIndex)
+                    {
+                        // this range remains open for the next checkpoint;
+                        // we may want to reference this list from there
+                        // so track count and position of first one to make a determination
+                        ++openDirectCount;
+                        if (firstOpenDirect < 0) firstOpenDirect = count;
+                    }
+                    else hasClosedDirect = true;
+                }
+                else
+                {
+                    // note: we over count here, as we count pointers within the chain
+                    matchCountModifier += (t.linkLength & Integer.MAX_VALUE) - 1; // (subtract 1 to discount the pointer)
+                    if (t.lastIndex > lastIndex)
+                        ++openIndirectCount;
+                }
+
+                if (i == count) ++count;
+                else contents[count++] = t;
+            }
+
+            return count + matchCountModifier;
+        }
+
+        /**
+         * Setup a link for referencing this chain later, if permitted.
+         * Must have at least two items, and at least as many direct records as indirect
+         */
+        void setupLinkChain(TenuredSet<Ranges, Range, RoutingKey> tenured, int startIndex, int endIndex)
+        {
+            int minSizeToReference = openIndirectCount + MIN_INDIRECT_LINK_LENGTH;
+            if (openDirectCount >= minSizeToReference)
+            {
+                int i = firstOpenDirect;
+                Tenured<Ranges, Range, RoutingKey> prev = get(i++);
+
+                while (openDirectCount > minSizeToReference)
+                {
+                    Tenured<Ranges, Range, RoutingKey> e = get(i++);
+                    if (e.index < 0)
+                    {
+                        --minSizeToReference;
+                        continue;
+                    }
+
+                    Invariants.checkState(prev.next == null);
+                    prev.next = prev;
+                    prev = e;
+                    --openDirectCount;
+                }
+
+                while (i < count)
+                {
+                    Tenured<Ranges, Range, RoutingKey> next = get(i++);
+                    if (next.index < 0)
+                        continue;
+
+                    Invariants.checkState(prev.next == null);
+                    prev.next = next;
+                    prev = next;
+                }
+
+                // may be more than one entry per item (though usually not)
+                int length = endIndex - startIndex;
+                Tenured<Ranges, Range, RoutingKey> chainEntry = tenured.addLinkEntry(prev.end, BIT31 | startIndex, prev.lastIndex, length);
+                prev.next = chainEntry;
+                if (hasClosedDirect && (startIndex > 0xfffff || (length > 0xff)))
+                {
+                    // TODO (expected, testing): make sure this is tested, as not a common code path (may never be executed in normal operation)
+                    // we have no closed ranges so iteration needs to know the end bound, but we cannot encode our bounds cheaply
+                    // so link the first bound to the chain entry, so that on removal it triggers an update of endIndex to note
+                    // that it can be iterated safely without an end bound
+                    get(firstOpenDirect).next = chainEntry;
+                }
+            }
+        }
+
+        @Override
+        public String toString()
+        {
+            return Arrays.stream(contents, 0, count)
+                         .map(Objects::toString)
+                         .collect(Collectors.joining(",", "[", "]"));
+        }
+    }
+}
diff --git a/accord-core/src/main/java/accord/utils/RandomSource.java b/accord-core/src/main/java/accord/utils/RandomSource.java
index 067b872..ed971aa 100644
--- a/accord-core/src/main/java/accord/utils/RandomSource.java
+++ b/accord-core/src/main/java/accord/utils/RandomSource.java
@@ -29,6 +29,8 @@
 import java.util.function.LongSupplier;
 import java.util.function.Supplier;
 
+import com.google.common.collect.Iterables;
+
 import accord.utils.random.Picker;
 
 // TODO (expected): merge with C* RandomSource
@@ -286,7 +288,13 @@
         return array[nextInt(offset, offset + length)];
     }
 
-    default <T extends Comparable<T>> T pick(Set<T> set)
+    default <T> T pick(NavigableSet<T> set)
+    {
+        int offset = nextInt(0, set.size());
+        return Iterables.get(set, offset);
+    }
+
+    default <T extends Comparable<? super T>> T pick(Set<T> set)
     {
         List<T> values = new ArrayList<>(set);
         // Non-ordered sets may have different iteration order on different environments, which would make a seed produce different histories!
diff --git a/accord-core/src/main/java/accord/utils/SearchableRangeList.java b/accord-core/src/main/java/accord/utils/SearchableRangeList.java
index 22aeb3a..65e4465 100644
--- a/accord-core/src/main/java/accord/utils/SearchableRangeList.java
+++ b/accord-core/src/main/java/accord/utils/SearchableRangeList.java
@@ -18,17 +18,14 @@
 
 package accord.utils;
 
-import accord.api.RoutingKey;
 import accord.primitives.Range;
 import accord.primitives.RoutableKey;
-import accord.utils.SearchableRangeListBuilder.Links;
-import accord.utils.SearchableRangeListBuilder.Strategy;
+import accord.utils.CheckpointIntervalArrayBuilder.Links;
+import accord.utils.CheckpointIntervalArrayBuilder.Strategy;
 import net.nicoulaj.compilecommand.annotations.Inline;
 
-import java.util.*;
-
-import static accord.utils.SearchableRangeListBuilder.Links.LINKS;
-import static accord.utils.SearchableRangeListBuilder.Strategy.ACCURATE;
+import static accord.utils.CheckpointIntervalArrayBuilder.Links.LINKS;
+import static accord.utils.CheckpointIntervalArrayBuilder.Strategy.ACCURATE;
 import static accord.utils.SortedArrays.Search.*;
 
 /**
@@ -79,104 +76,13 @@
  *        earlier checkpoints.
  * </ul>
  */
-public class SearchableRangeList
+public class SearchableRangeList extends CheckpointIntervalArray<Range[], Range, RoutableKey>
 {
-    // scan distance can be kept very small as we guarantee to use at most linear extra space even with a scan distance of zero
-    static final int MAX_SCAN_DISTANCE = 255;
-    private static final int BIT30 = 0x40000000;
-    private static final int BIT29 = 0x20000000;
-
     private static final SearchableRangeList EMPTY_CHECKPOINTS = new SearchableRangeList(new Range[0], new int[0], new int[] { 0, 0 }, new int[0], 0);
 
-    final Range[] ranges;
-
-    /**
-     * The lower bound for each checkpoint.
-     * The checkpoint {@code i} applies to all ranges (incl) starting from {@code lowerBounds[i]},
-     * but before (excl) {@code lowerBounds[i+1]}.
-     */
-    final int[] lowerBounds;
-
-    /**
-     * Logically one entry per checkpoint, mapping {@link #lowerBounds} to {@link #checkpointLists},
-     * however we also encode an additional byte per entry representing the scan distance for the
-     * ranges handled by this checkpoint. These are grouped into an integer per four mappings, i.e.
-     * we encode batches of five ints, with the first int containing the four scan distances for the
-     * next four checkpoints, and the following four ints containing the respective offsets into
-     * {@link #checkpointLists}.
-     * <p>
-     * [0.........32b.........64b.........96b........128b........160b........192b]
-     * [ d1 d2 d3 d4  mapping1    mapping2   mapping3    mapping4    d5 d6 d7 d8 ]
-     */
-    final int[] headers;
-
-    /**
-     * A list of indexes in {@link #ranges} contained by each checkpoint; checkpoints are
-     * mapped from {@link #lowerBounds} by {@link #headers}.
-     * <p>
-     * Entries are sorted in descending order by the end of the range they cover, so that
-     * a search of this collection my terminate as soon as it encounters a range that does
-     * not cover the item we are searching for.
-     * <p>
-     * This collection may contain negative values, in which case these point to other
-     * checkpoints, whose <i>direct</i> contents (i.e. the positive values of) we may
-     * search.
-     * <ul> if negative, points to an earlier checkpoint, and:
-     *   <li>if the 30th bit is set, the low 20 bits point to checkpointsList,
-     *       and the 9 bits in-between provide the length of the range</li>
-     *   <li>otherwise, if the 29th bit is set, the lower 29 bits points to checkpointsList,
-     *       and can be iterated safely without an endIndex</li>
-     *   <li>otherwise, the low 29 bits provide the length of the run, and the low 31 bits
-     *       of the following entry (which will also be negative) provide a pointer to
-     *       checkpointsList</li>
-     * </ul>
-     */
-    final int[] checkpointLists;
-
-    public final int maxScanAndCheckpointMatches;
-
-    SearchableRangeList(Range[] ranges, int[] lowerBounds, int[] headers, int[] checkpointLists, int maxScanAndCheckpointMatches)
+    public SearchableRangeList(Range[] ranges, int[] lowerBounds, int[] headers, int[] checkpointLists, int maxScanAndCheckpointMatches)
     {
-        this.ranges = ranges;
-        this.lowerBounds = lowerBounds;
-        this.headers = headers;
-        this.checkpointLists = checkpointLists;
-        this.maxScanAndCheckpointMatches = maxScanAndCheckpointMatches;
-    }
-
-    @Inline
-    public <P1, P2, P3, P4> int forEach(Range range, IndexedQuadConsumer<P1, P2, P3, P4> forEachScanOrCheckpoint, IndexedRangeQuadConsumer<P1, P2, P3, P4> forEachRange, P1 p1, P2 p2, P3 p3, P4 p4, int minIndex)
-    {
-        return forEach(range.start(), range.end(), forEachScanOrCheckpoint, forEachRange, p1, p2, p3, p4, minIndex);
-    }
-
-    public <P1, P2, P3, P4> int forEach(RoutingKey startKey, RoutingKey endKey, IndexedQuadConsumer<P1, P2, P3, P4> forEachScanOrCheckpoint, IndexedRangeQuadConsumer<P1, P2, P3, P4> forEachRange, P1 p1, P2 p2, P3 p3, P4 p4, int minIndex)
-    {
-        if (ranges.length == 0 || minIndex == ranges.length)
-            return minIndex;
-
-        int end = SortedArrays.binarySearch(ranges, minIndex, ranges.length, endKey, (a, b) -> a.compareTo(b.start()), CEIL);
-        if (end < 0) end = -1 - end;
-        if (end <= minIndex) return minIndex;
-
-        int floor = SortedArrays.binarySearch(ranges, minIndex, ranges.length, startKey, (a, b) -> a.compareTo(b.start()), CEIL);
-        int start = floor;
-        if (floor < 0)
-        {
-            // if there's no precise match on start, step backwards;
-            // if this range does not overlap us, step forwards again for start
-            // but retain the floor index for performing scan and checkpoint searches from
-            // as this contains all ranges that might overlap us (whereas those that end
-            // after us but before the next range's start would be missed by the next range index)
-            start = floor = -2 - floor;
-            if (start < 0)
-                start = floor = 0;
-            else if (ranges[start].end().compareTo(startKey) <= 0)
-                ++start;
-        }
-
-        // Since endInclusive() != startInclusive(), so no need to adjust start/end comparisons
-        return forEach(start, end, floor, startKey, 0, forEachScanOrCheckpoint, forEachRange, p1, p2, p3, p4, minIndex);
+        super(SearchableRangeListBuilder.RANGE_ACCESSOR, ranges, lowerBounds, headers, checkpointLists, maxScanAndCheckpointMatches);
     }
 
     @Inline
@@ -209,95 +115,6 @@
         return forEach(start, end, floor, key, bound, forEachScanOrCheckpoint, forEachRange, p1, p2, p3, p4, minIndex);
     }
 
-    @Inline
-    private <P1, P2, P3, P4> int forEach(int start, int end, int floor, RoutableKey startBound, int cmpStartBoundWithEnd,
-                                     IndexedQuadConsumer<P1, P2, P3, P4> forEachScanOrCheckpoint, IndexedRangeQuadConsumer<P1, P2, P3, P4> forEachRange,
-                                     P1 p1, P2 p2, P3 p3, P4 p4, int minIndex)
-    {
-        if (start < minIndex) start = minIndex;
-
-        // find the checkpoint array, so we know how far to step back
-        int checkpoint = Arrays.binarySearch(lowerBounds, floor);
-        if (checkpoint < 0) checkpoint = -2 - checkpoint;
-        if (checkpoint < 0) return end;
-
-        int headerBaseIndex = (checkpoint / 4) * 5;
-        int headerSubIndex = checkpoint & 3;
-        int headerListIndex = headerBaseIndex + 1 + headerSubIndex;
-
-        int scanDistance = (headers[headerBaseIndex] >>> (8 * headerSubIndex)) & 0xff;
-        int checkpointStart = headers[headerListIndex];
-        int checkpointEnd = headers[headerListIndex + (headerSubIndex + 5)/4]; // skip the next header
-
-        if (scanDistance == MAX_SCAN_DISTANCE)
-        {
-            scanDistance = -checkpointLists[checkpointStart++];
-            Invariants.checkState(scanDistance >= MAX_SCAN_DISTANCE);
-        }
-
-        // NOTE: we visit in approximately ascending order, and this is a requirement for correctness of RangeDeps builders
-        //       Only the checkpoint is visited in uncertain order, but it is visited entirely, before the scan matches
-        //       and the range matches
-        int minScanIndex = Math.max(floor - scanDistance, minIndex);
-        for (int i = checkpointStart; i < checkpointEnd ; ++i)
-        {
-            int ri = checkpointLists[i];
-            if (ri < 0)
-            {
-                int subStart, subEnd;
-                if ((ri & BIT30) != 0)
-                {
-                    subStart = ri & 0xfffff;
-                    subEnd = subStart + ((ri >>> 20) & 0x1ff);
-                }
-                else if ((ri & BIT29) != 0)
-                {
-                    subStart = ri & 0x1fffffff;
-                    subEnd = Integer.MAX_VALUE;
-                }
-                else
-                {
-                    int length = ri & 0x1fffffff;
-                    subStart = checkpointLists[++i];
-                    subEnd = subStart + length;
-                }
-
-                for (int j = subStart ; j < subEnd ; ++j)
-                {
-                    ri = checkpointLists[j];
-                    if (ri < 0)
-                        continue;
-
-                    if (ranges[ri].end().compareTo(startBound) <= cmpStartBoundWithEnd)
-                        break;
-
-                    if (ri >= minIndex && ri < minScanIndex)
-                        forEachScanOrCheckpoint.accept(p1, p2, p3, p4, ri);
-                }
-            }
-            else
-            {
-                // if startBound is key, we cannot be equal to it;
-                // if startBound is a Range start, we also cannot be equal to it due to the requirement that
-                // endInclusive() != startInclusive(), so equality really means inequality
-                if (ranges[ri].end().compareTo(startBound) <= cmpStartBoundWithEnd)
-                    break;
-
-                if (ri >= minIndex && ri < minScanIndex)
-                    forEachScanOrCheckpoint.accept(p1, p2, p3, p4, ri);
-            }
-        }
-
-        for (int i = minScanIndex; i < floor ; ++i)
-        {
-            if (ranges[i].end().compareTo(startBound) > cmpStartBoundWithEnd)
-                forEachScanOrCheckpoint.accept(p1, p2, p3, p4, i);
-        }
-
-        forEachRange.accept(p1, p2, p3, p4, start, end);
-        return end;
-    }
-
     public static SearchableRangeList build(Range[] ranges)
     {
         if (ranges.length == 0)
diff --git a/accord-core/src/main/java/accord/utils/SearchableRangeListBuilder.java b/accord-core/src/main/java/accord/utils/SearchableRangeListBuilder.java
index 77d351a..546ee16 100644
--- a/accord-core/src/main/java/accord/utils/SearchableRangeListBuilder.java
+++ b/accord-core/src/main/java/accord/utils/SearchableRangeListBuilder.java
@@ -18,1075 +18,84 @@
 
 package accord.utils;
 
-import accord.api.RoutingKey;
+import java.util.Comparator;
+
 import accord.primitives.Range;
-import accord.utils.ArrayBuffers.IntBuffers;
+import accord.primitives.RoutableKey;
 
-import javax.annotation.Nonnull;
-import java.util.Arrays;
-import java.util.Objects;
-import java.util.TreeSet;
-import java.util.stream.Collectors;
-
-import static accord.utils.ArrayBuffers.cachedInts;
 import static accord.utils.SearchableRangeList.MAX_SCAN_DISTANCE;
-import static accord.utils.SearchableRangeListBuilder.Links.LINKS;
-import static accord.utils.SearchableRangeListBuilder.Strategy.ACCURATE;
-import static accord.utils.SortedArrays.Search.CEIL;
 
 /**
  * Builder for {@link SearchableRangeList}
  */
-public class SearchableRangeListBuilder
+public class SearchableRangeListBuilder extends CheckpointIntervalArrayBuilder<Range[], Range, RoutableKey>
 {
-    public enum Strategy
+    public static final Accessor<Range[], Range, RoutableKey> RANGE_ACCESSOR = new Accessor<>()
     {
-        /**
-         * Do not tenure any ranges that are scannable from the currently in-effect max scan distance.
-         * This means we probably do less work on construction, but that our measure of the match count
-         * at any point is inaccurate, and so our heuristics for when to write checkpoints may be wrong,
-         * leading to more checkpoints than necessary.
-         */
-        FAST,
+        @Override
+        public int size(Range[] ranges)
+        {
+            return ranges.length;
+        }
 
-        /**
-         * Tenure every range covering more than goalScanDistance. Any within max scan distance will also
-         * update the scan distance, so that they will be filtered when a checkpoint is written. But
-         * in the meantime they permit accurate tracking of the number of matches a query can return,
-         * permitting our complexity calculations (that determine when checkpoints should be written, and
-         * what our maximum scan distance should be), to be accurate. This can avoid bouncing between
-         * two extremes, where a low max scan distance tenures and correctly detects a desirable larger scan
-         * distance, which we rollover and this prevents us tenuring and tracking the number of matches, so
-         * that we then pick a low max scan distance (and thereby also write a new checkpoint)
-         */
-        ACCURATE
-    }
+        @Override
+        public Range get(Range[] ranges, int index)
+        {
+            return ranges[index];
+        }
 
-    /**
-     * Should we maintain pointers to prior checkpoints that we may reference instead of reserializing
-     * the remaining contents. This is cheap to visit as we stop enumerating as soon as we encounter
-     * an entry that no longer covers us. We use some simple heuristics when deciding whether to do
-     * this, namely that there are at least two entries (so we save one checkpoint entry) and that
-     * there is at least one direct entry for each indirect/link entry in the range we will link.
-     */
-    public enum Links
-    {
-        LINKS,
-        NO_LINKS
-    }
+        @Override
+        public RoutableKey start(Range[] ranges, int index)
+        {
+            return ranges[index].start();
+        }
 
-    private static final int BIT31 = 0x80000000;
-    private static final int BIT30 = 0x40000000;
-    private static final int BIT29 = 0x20000000;
-    static final int MIN_INDIRECT_LINK_LENGTH = 2;
+        @Override
+        public RoutableKey start(Range range)
+        {
+            return range.start();
+        }
 
-    final boolean isAccurate;
-    final boolean withLinks;
-    final Range[] ranges;
+        @Override
+        public RoutableKey end(Range[] ranges, int index)
+        {
+            return ranges[index].end();
+        }
 
-    int[] bounds;
-    int[] headers;
-    int[] lists;
-    int checkpointCount, headerPointer, listCount;
+        @Override
+        public RoutableKey end(Range range)
+        {
+            return range.end();
+        }
 
-    final Scan scan = new Scan();
-    final TenuredSet tenured = new TenuredSet();
-    final PendingCheckpoint pending = new PendingCheckpoint();
+        @Override
+        public Comparator<RoutableKey> keyComparator()
+        {
+            return Comparator.naturalOrder();
+        }
 
-    // track the maximum possible number of entries we can match with both a scan + checkpoint lookup
-    // this is an over-estimate and may be used by consumers to allocate out-of-order buffers for visitations
-    int maxScanAndCheckpointMatches;
+        @Override
+        public int binarySearch(Range[] ranges, int from, int to, RoutableKey find, AsymmetricComparator<RoutableKey, Range> comparator, SortedArrays.Search op)
+        {
+            return SortedArrays.binarySearch(ranges, from, to, find, comparator, op);
+        }
+    };
 
     public SearchableRangeListBuilder(Range[] ranges, Strategy strategy, Links links)
     {
-        this(ranges, Math.min(MAX_SCAN_DISTANCE, 34 - Integer.numberOfLeadingZeros(ranges.length)), strategy, links);
+        super(RANGE_ACCESSOR, ranges, strategy, links);
     }
 
     public SearchableRangeListBuilder(Range[] ranges, int goalScanDistance, Strategy strategy, Links links)
     {
-        this.isAccurate = strategy == ACCURATE;
-        this.withLinks = links == LINKS;
+        super(RANGE_ACCESSOR, ranges, goalScanDistance, strategy, links);
         Invariants.checkArgument(goalScanDistance <= MAX_SCAN_DISTANCE);
-        Invariants.checkArgument(goalScanDistance > 0);
-        this.ranges = ranges;
-        init(ranges, goalScanDistance);
     }
 
-    void init(Range[] ranges, int goalScanDistance)
-    {
-        // we write checkpoints at least goalScanDistance apart
-        scan.init(goalScanDistance);
-        IntBuffers cachedInts = cachedInts();
-        // ask for int buffers in descending order of size
-        this.lists = cachedInts.getInts(ranges.length); // this one might need to grow
-        // +2 to round-up each division, and +2 to account for the final entry (which might require an empty scan distance header)
-        this.headers = cachedInts.getInts(((ranges.length / goalScanDistance) * 5) / 4 + 4);
-        this.bounds = cachedInts.getInts(ranges.length / goalScanDistance + 1);
-    }
-
-    /**
-     * Walk over each range, looking ahead by {@link #maxScanDistance} to decide if a range should
-     * be tenured (written to a checkpoint) or scanned; the maximum scan distance is determined by the
-     * number of open tenured entries, i.e. the minimum number of results we can expect to be returned
-     * (or, if greater, the logarithm of the number of ranges in the collection).
-     * <p>
-     * Once we encounter a range that should be tenured, either write a checkpoint immediately
-     * or make a note of the position we must scan to from the last entry in this checkpoint
-     * and wait until it is permitted to write a checkpoint. This range will be tenured either
-     * way for the following checkpoint.
-     * <p>
-     * The only reason not to write a checkpoint immediately is in the case we would breach
-     * our linear space complexity limit, which is imposed by ensuring we have a space between
-     * checkpoints at least as large as the number of entries written to the last checkpoint,
-     * discounted by the number of entries we have removed from the tenured collection since
-     * the last checkpoint.
-     */
+    @Override
     public SearchableRangeList build()
     {
-        for (int ri = 0 ; ri < ranges.length ; ++ri)
-        {
-            // write a checkpoint if we meet our linear space complexity requirements
-            // and we either have a tenured range that we must scan,
-            // or the scan distance is now much larger than the minimum number of search results
-            if (shouldWriteCheckpoint(ri))
-                writeCheckpoint(ri);
-
-            // either tenure or update scan distance, potentially writing a checkpoint
-            tenureOrScan(ri);
-            tenured.untenure(ri);
-        }
-
-        // write our final pending checkpoint
-        writeCheckpoint(ranges.length);
-        closeHeaders();
-
-        IntBuffers cachedInts = cachedInts();
-        int[] lists = cachedInts.completeAndDiscard(this.lists, listCount);
-        int[] headers = cachedInts.completeAndDiscard(this.headers, headerPointer);
-        int[] bounds = cachedInts.completeAndDiscard(this.bounds, checkpointCount);
-        return new SearchableRangeList(ranges, bounds, headers, lists, maxScanAndCheckpointMatches);
-    }
-
-    /**
-     * Categorise the candidateIdx as either scannable, and if so update the scan distance;
-     * or unscannable, in which case add it to the {@link #tenured} collection.
-     * Note, that in ACCURATE mode we tenure the item if it is outside of the goalScanDistance
-     * so we may track O(k) accurately above the O(lg2(N)) search and default scan distance,
-     * but we still update the scan distance so that the checkpoint will exclude this entry.
-     */
-    private void tenureOrScan(int index)
-    {
-        Invariants.checkArgument(index >= 0);
-
-        // then either migrate the index to pendingTenured, or ensure it will be scanned
-        RoutingKey end = ranges[index].end();
-        int scanLimit = scanLimit(index, isAccurate ? scan.goal : maxScanDistance());
-        if (shouldTenure(end, scanLimit))
-        {
-            int lastIndex = tenured.tenure(end, index, ranges, scanLimit + 1);
-            if (lastIndex - index > maxScanDistance()) scan.tenured(index);
-            else if (!isAccurate) throw new IllegalStateException();
-            else scan.updateScanDistance(index, lastIndex - index, this);
-        }
-        else
-        {
-            // TODO (low priority, efficiency): if the prior checkpoint has a scan distance >= this one,
-            //  and <= 50% more than this one and there's no scanMustReachIndex nor tenuredRanges, don't
-            //  write a new checkpoint (perhaps split shouldWriteCheckpoint logic in two)
-            scan.update(end, index, ranges, scanLimit, this);
-        }
-    }
-
-    /**
-     * We are forbidden from writing a checkpoint nearer than this to a prior checkpoint.
-     * This imposes our linear space complexity bounds, while not harming our O(log2(N) + K)
-     * complexity bounds, as we guarantee minimumSpan is never more than the number of query
-     * results.
-     */
-    private int minimumSpan()
-    {
-        return Math.max(scan.goal(), tenured.minimumSpan());
-    }
-
-    private int maxScanDistance()
-    {
-        // minimumSpan() reduces overtime, but there is no reason to reduce our scan distance
-        // for tenuring below the scan distance we will write
-        return Math.max(scan.watermark(), minimumSpan());
-    }
-
-    /**
-     * The index after the last index we can scan from {@code atIndex} with at most {@code maxScanDistance}.
-     */
-    private int scanLimit(int atIndex, int maxScanDistance)
-    {
-        return Math.min(1 + atIndex + maxScanDistance, ranges.length);
-    }
-
-    private boolean shouldTenure(RoutingKey end, int scanLimit)
-    {
-        return scanLimit < ranges.length && end.compareTo(ranges[scanLimit].start()) > 0;
-    }
-
-    private boolean canWriteCheckpoint(int atIndex)
-    {
-        return atIndex - pending.atIndex >= minimumSpan();
-    }
-
-    private boolean shouldWriteCheckpoint(int atIndex)
-    {
-        if (!canWriteCheckpoint(atIndex))
-            return false;
-
-        // TODO (desired, efficiency): consider these triggers
-        if (scan.mustCheckpointToScanTenured(atIndex, maxScanDistance()))
-            return true;
-
-        return scan.hasMaybeDivergedFromMatchSize(tenured);
-    }
-
-    /**
-     * Write a checkpoint for ranges[prevCheckpointIndex...ri)
-     *
-     * 1) Finalise the scan distance
-     * 2) Write the header
-     * 3) Filter the pending tenured ranges to remove those we can scan
-     * 4) Write this list out
-     * 5) Setup a link to this list, if it is large enough
-     * 6) Rollover the scan, tenured and pending structures for the new pending checkpoint
-     */
-    private void writeCheckpoint(int nextCheckpointIndex)
-    {
-        int lastIndex = nextCheckpointIndex - 1;
-        int scanDistance = scan.finalise(lastIndex);
-        scanDistance = extendScanDistance(lastIndex, scanDistance);
-
-        if (pending.atIndex < 0)
-        {
-            // we don't have any checkpoints pending, so don't try to finalise it
-            // but if the new checkpoint doesn't cover index 0, insert a new empty
-            // checkpoint for the scan distance
-            if (nextCheckpointIndex > 0)
-            {
-                // setup an initial empty checkpoint to store the first scan distance
-                maxScanAndCheckpointMatches = scanDistance;
-                writeHeader(scanDistance, 0);
-            }
-        }
-        else
-        {
-            writeHeader(scanDistance, pending.atIndex);
-            int maxCheckpointMatchCount = pending.filter(scanDistance, lastIndex);
-            int listIndex = writeList(pending);
-            if (withLinks)
-                pending.setupLinkChain(tenured, listIndex, listCount);
-            maxScanAndCheckpointMatches = Math.max(maxScanAndCheckpointMatches, scanDistance + maxCheckpointMatchCount);
-        }
-
-        savePendingCheckpointAndResetScanDistance(nextCheckpointIndex);
-    }
-
-    private void savePendingCheckpointAndResetScanDistance(int checkpointIndex)
-    {
-        // use the tail of checkpointListBuf to buffer ranges we plan to tenure
-        ensureCapacity(tenured.count() + scan.watermark());
-
-        scan.reset();
-
-        if (isAccurate)
-        {
-            // TODO (low priority, efficiency): we can shift back the existing scanDistance if it's far enough from
-            //  the next checkpoint. this might permit us to skip some comparisons
-            scan.resetPeakMax(tenured);
-            for (Tenured tenured : this.tenured)
-            {
-                int distanceToEnd = (tenured.lastIndex - checkpointIndex);
-                if (distanceToEnd >= scan.peakMax)
-                    break;
-
-                int scanDistance = tenured.lastIndex - tenured.index;
-                if (scanDistance <= scan.peakMax)
-                    scan.updateScanDistance(tenured.index, scanDistance, null);
-            }
-
-            if (scan.watermark() < scan.goal)
-            {
-                int ri = Scan.minScanIndex(checkpointIndex, scan.goal);
-                while (ri < checkpointIndex)
-                {
-                    RoutingKey end = ranges[ri].end();
-                    int scanLimit = scanLimit(ri, scan.peakMax);
-                    if (!shouldTenure(end, scanLimit))
-                        scan.update(end, ri, ranges, scanLimit, null);
-                    ++ri;
-                }
-            }
-        }
-        else
-        {
-            // the maximum scan distance that could ever have been adopted for last chunk
-            int oldPeakMax = scan.peakMax();
-            // the minimum scan distance we will start with for processing the proceeding ranges
-            // note: this may increase if we decide to tenure additional ranges, at which point it will be the actual newPeakMax
-            int newMinPeakMax = scan.newPeakMax(tenured);
-            int minUntenuredIndex = scan.minUntenuredIndex(checkpointIndex, tenured);
-            int minScanIndex = Scan.minScanIndex(checkpointIndex, newMinPeakMax);
-
-            // we now make sure tenured and scan are correct for the new parameters.
-            // 1) if our peakMax is lower then we need to go back and find items to tenure that we previously marked for scanning
-            // 2) we must also reset our scan distances
-
-            // since our peakMax is determined by tenured.count(), but we are tenuring items here we keep things simple
-            // and do not account for those items we tenure but would later permit to scan as our peakMax grows
-
-            int ri = Math.min(minUntenuredIndex, minScanIndex);
-            while (ri < checkpointIndex)
-            {
-                RoutingKey end = ranges[ri].end();
-                int newPeakMax = scan.newPeakMax(tenured);
-                int scanLimit = scanLimit(ri, newPeakMax);
-                if (shouldTenure(end, scanLimit))
-                {
-                    // note: might have already been tenured
-                    // in this case our untenureLimit may be incorrect, but we won't use it
-                    if (ri >= minUntenuredIndex && newPeakMax < oldPeakMax)
-                        tenured.tenure(end, ri, ranges, scanLimit + 1, scanLimit(ri, oldPeakMax));
-                }
-                else
-                {
-                    // this might effectively remove a previously tenured item
-                    scan.update(end, ri, ranges, scanLimit, null);
-                }
-                ++ri;
-            }
-
-            scan.resetPeakMax(tenured);
-        }
-
-        pending.atIndex = checkpointIndex;
-        pending.clear();
-        tenured.rollover(pending);
-    }
-
-    private int extendScanDistance(int lastIndex, int scanDistance)
-    {
-        // now we've established our lower bound on scan distance, see how many checkpoints we can remove
-        // by increasing our scan distance so that it remains proportional to the number of results returned
-        // TODO (low priority, efficiency): can reduce cost here by using scanDistances array for upper bounds to scan distance
-        int maxScanDistance = scan.goal() + 2 * Math.min(tenured.count(), tenured.countAtPrevCheckpoint());
-        if (maxScanDistance >= 1 + scanDistance + scanDistance/4 && pending.count() >= (maxScanDistance - scanDistance)/2)
-        {
-            int removeCount = 0;
-            int extendedScanDistance = scanDistance;
-            int target = (maxScanDistance - scanDistance)/2;
-            for (int i = 0 ; i < pending.count() ; ++i)
-            {
-                Tenured t = pending.get(i);
-                if (t.index < 0)
-                    continue;
-
-                int distance = Math.min(lastIndex, t.lastIndex) - t.index;
-                if (distance <= scanDistance)
-                    continue; // already scanned or untenured
-
-                if (distance <= maxScanDistance)
-                {
-                    ++removeCount;
-                    extendedScanDistance = Math.max(extendedScanDistance, distance);
-                    if (extendedScanDistance == maxScanDistance && removeCount >= target)
-                        break;
-                }
-            }
-
-            // TODO (low priority, efficiency): should perhaps also gate this decision on the span we're covering
-            //  algorithmically, however, so long as we are under maxScanDistance we are fine
-            if (removeCount >= (extendedScanDistance - scanDistance)/2)
-                scanDistance = extendedScanDistance;
-        }
-        return scanDistance;
-    }
-
-    int writeList(PendingCheckpoint pending)
-    {
-        int startIndex = listCount;
-        for (int i = pending.count() - 1 ; i >= 0 ; --i)
-        {
-            Tenured t = pending.get(i);
-            if (t.index >= 0)
-            {
-                lists[listCount++] = t.index;
-            }
-            else
-            {
-                int index = t.index & ~BIT31;
-                int length = t.linkLength & ~BIT31;
-                if (length <= 0xff && index <= 0xfffff)
-                {
-                    lists[listCount++] = BIT31 | BIT30 | (length << 20) | index;
-                }
-                else if (t.linkLength >= 0 && length < BIT30)
-                {
-                    lists[listCount++] = BIT31 | BIT29 | index;
-                }
-                else
-                {
-                    lists[listCount++] = BIT31 | length;
-                    lists[listCount++] = BIT31 | pending.count();
-                }
-            }
-        }
-        return startIndex;
-    }
-
-    void writeHeader(int scanDistance, int lowerBound)
-    {
-        int headerScanDistance = Math.min(scanDistance, MAX_SCAN_DISTANCE);
-
-        if ((checkpointCount & 3) == 0)
-            headers[headerPointer++] = headerScanDistance;
-        else
-            headers[headerPointer - (1 + (checkpointCount & 3))] |= headerScanDistance << (8 * (checkpointCount & 3));
-
-        bounds[checkpointCount++] = lowerBound;
-        headers[headerPointer++] = listCount;
-
-        if (scanDistance >= MAX_SCAN_DISTANCE)
-            lists[listCount++] = -scanDistance; // serialize as a negative value so we ignore it in most cases automatically
-    }
-
-    void closeHeaders()
-    {
-        // write our final checkpoint header
-        if ((checkpointCount & 3) == 0) headers[headerPointer++] = 0;
-        headers[headerPointer++] = listCount;
-    }
-
-    void ensureCapacity(int maxPendingSize)
-    {
-        if (listCount + maxPendingSize >= lists.length)
-            lists = cachedInts().resize(lists, listCount, lists.length + lists.length/2 + maxPendingSize);
-    }
-
-    static class Scan
-    {
-        /** the scan distance we are aiming for; should be proportional to log2(N) */
-        int goal;
-
-        /** the indexes at which we increased the scan distance, and the new scan distance */
-        int[] distances = new int[16];
-        /** the number of unique scan distances we have adopted since the last checkpoint */
-        int count;
-        /** the highest scan distance we have adopted (==scanDistance(scanDistanceCount-1)) */
-        int watermark;
-        /**
-         * the first index we have tenured a range from, but for which we did not immediately write a new checkpoint
-         * we *must* scan at least from the last index in the checkpoint to here
-         */
-        int scanTenuredAtIndex = -1;
-
-        /** The maximum (i.e. initial) scan distance limit we have used since the last attempted checkpoint write */
-        int peakMax;
-
-        void init(int goalScanDistance)
-        {
-            goal = peakMax = goalScanDistance;
-        }
-
-        private void update(RoutingKey end, int atIndex, Range[] ranges, int scanLimit, SearchableRangeListBuilder checkpoint)
-        {
-            int newScanDistance = find(end, atIndex, ranges, scanLimit, watermark);
-            updateScanDistance(atIndex, newScanDistance, checkpoint);
-        }
-
-        private void updateScanDistance(int atIndex, int newScanDistance, SearchableRangeListBuilder checkpoint)
-        {
-            if (newScanDistance > watermark)
-            {
-                // TODO (desired, efficiency): we don't mind slight increases to the watermark;
-                //  should really look at scan distance history and ensure we haven't e.g. doubled since
-                //  some earlier point (and should track the match count + scan distance at each bump
-                //  to check overall work hasn't increased too much)
-                if (checkpoint != null && checkpoint.canWriteCheckpoint(atIndex))
-                    checkpoint.writeCheckpoint(atIndex);
-
-                watermark = newScanDistance;
-                if (count * 2 == distances.length)
-                    distances = Arrays.copyOf(distances, distances.length * 2);
-                distances[count * 2] = newScanDistance;
-                distances[count * 2 + 1] = atIndex;
-                ++count;
-            }
-        }
-
-        private int find(RoutingKey end, int atIndex, Range[] ranges, int scanLimit, int currentScanDistance)
-        {
-            int lowerIndex = SortedArrays.exponentialSearch(ranges, atIndex + currentScanDistance, scanLimit, end, (e, s) -> e.compareTo(s.start()), CEIL);
-            if (lowerIndex < 0) lowerIndex = -2 - lowerIndex;
-            else lowerIndex -= 1;
-            return lowerIndex - atIndex;
-        }
-
-        boolean isAboveGoal()
-        {
-            return watermark > goal;
-        }
-
-        int watermark()
-        {
-            return watermark;
-        }
-
-        int goal()
-        {
-            return goal;
-        }
-
-        int distanceToTenured(int lastIndex)
-        {
-            return scanTenuredAtIndex >= 0 ? lastIndex - scanTenuredAtIndex : 0;
-        }
-
-        boolean mustCheckpointToScanTenured(int checkpointIndex, int maxScanDistance)
-        {
-            return scanTenuredAtIndex >= 0 && checkpointIndex - scanTenuredAtIndex >= maxScanDistance;
-        }
-
-        /**
-         * Are we scanning a much longer distance than the minimum number of matches we know a query will return?
-         * Note: with Strategy.FAST, {@code tenured.count()} gets less accurate as scan distance increases, so this
-         * will bounce around triggering checkpoints due to the larger scan distance, resetting the scan distance
-         * and starting again
-         */
-        boolean hasMaybeDivergedFromMatchSize(TenuredSet tenured)
-        {
-            return isAboveGoal() && tenured.count() < watermark()/2;
-        }
-
-        private int distance(int i)
-        {
-            return distances[i*2];
-        }
-
-        private int index(int i)
-        {
-            return distances[i*2+1];
-        }
-
-        int finalise(int lastIndex)
-        {
-            Invariants.checkState(distanceToTenured(lastIndex) <= Math.max(watermark(), peakMax()));
-
-            int scanDistance = watermark;
-            // then, compute the minimum scan distance implied by any tenured ranges we did not immediately
-            // write a checkpoint for - we *must* scan back as far as this record
-            int minScanDistance = scanTenuredAtIndex >= 0 ? lastIndex - scanTenuredAtIndex : 0;
-            if (minScanDistance > scanDistance)
-            {
-                // if this minimum is larger than the largest scan distance we picked up for non-tenured ranges
-                // then we are done, as there's nothing we can save
-                scanDistance = minScanDistance;
-            }
-            else if (scanDistance > 0)
-            {
-                // otherwise, we can look to see if any of the scan distances we computed overflow the checkpoint,
-                // i.e. where no records served by this checkpoint need to scan the full distance to reach it
-                int distanceToLastScanIndex = lastIndex - index(count -1);
-                // if the distance to the last scan index is larger than its scan distance, we have overflowed;
-                if (distanceToLastScanIndex < scanDistance)
-                {
-                    minScanDistance = Math.max(distanceToLastScanIndex, minScanDistance);
-                    // loop until we find one that doesn't overflow, as this is another minimum scan distance
-                    int i = count - 1;
-                    while (--i >= 0)
-                    {
-                        int distance = lastIndex - index(i);
-                        if (distance >= distance(i)) break;
-                        else if (distance > minScanDistance) minScanDistance = distance;
-                    }
-                    if (i >= 0) scanDistance = Math.max(minScanDistance, distance(i));
-                    else scanDistance = minScanDistance;
-                }
-            }
-
-            return scanDistance;
-        }
-
-        void reset()
-        {
-            // we could in theory reset our scan distance using the contents of scanDistance[]
-            // but it's a bit complicated, as we want to have the first item to increment the scan distance
-            // so that we can use it in writeScanDistance to shrink the scan distance;
-            // jumping straight to the highest scan distance breaks this
-            count = 0;
-            scanTenuredAtIndex = -1;
-            watermark = 0;
-        }
-
-        void resetPeakMax(TenuredSet tenured)
-        {
-            peakMax = newPeakMax(tenured);
-        }
-
-        int peakMax()
-        {
-            return peakMax;
-        }
-
-        int newPeakMax(TenuredSet tenured)
-        {
-            return Math.max(goal, tenured.count());
-        }
-
-        /**
-         * The minimum index containing a range that might need to be tenured, if we have a smaller max scan distance than before
-         */
-        int minUntenuredIndex(int checkpointIndex, TenuredSet tenured)
-        {
-            int minUntenuredIndex = Math.max(0, (checkpointIndex - 1) - watermark());
-            // the maximum scan distance that cxould ever have been adopted for the ranges processed since last checkpoint
-            int oldPeakMax = peakMax;
-            int newMinPeakMax = newPeakMax(tenured);
-            if (newMinPeakMax < oldPeakMax)
-            {
-                // minimise range we unnecessarily re-tenure over
-                // TODO (low priority, efficiency): see if can also use to reduce range we re-scan e.g. can recycle
-                //  scanDistances contents if we know we won't need to step back further at next checkpoint
-                for (int i = count - 1; i >= 0 ; --i)
-                {
-                    if (index(i) <= minUntenuredIndex)
-                        break;
-                    if (distance(i) <= newMinPeakMax)
-                        return i + 1 == count ? index(i) : index(i + 1) - 1;
-                }
-            }
-            return minUntenuredIndex;
-        }
-
-        /**
-         * Record that a range at this index has been tenured, so that we can track how far back
-         * we need to scan to determine how long we can defer writing a new checkpoint while still
-         * being able to scan it.
-         *
-         * TODO (low priority, efficiency): when a checkpoint is written, we should consider moving it
-         *  earlier if the scan distance is increased primarily because of this index, and the tenured
-         *  collection is otherwise unchanged (so can be written with minimal overhead)
-         */
-        void tenured(int atIndex)
-        {
-            if (scanTenuredAtIndex < 0)
-                scanTenuredAtIndex = atIndex;
-        }
-
-        static int minScanIndex(int checkpointIndex, int scanDistance)
-        {
-            return Math.max(0, (checkpointIndex - 1) - scanDistance);
-        }
-
-        @Override
-        public String toString()
-        {
-            return "Scan{watermark=" + watermark + ", tenured=" + scanTenuredAtIndex + '}';
-        }
-    }
-
-    /**
-     * Record-keeping for a range we have decided is not scannable
-     */
-    static class Tenured implements Comparable<Tenured>
-    {
-        /**
-         * The end of the tenured range covered by the contents referred to be {@link #index}
-         */
-        RoutingKey end;
-
-        /**
-         * <ul>
-         * <li>If positive, this points to {@code ranges[index]}</li>
-         * <li>If negative, this points to an entry in {@link #lists};
-         * see {@link SearchableRangeList#checkpointLists}</li>
-         * </ul>
-         */
-        int index;
-
-        /**
-         * The last index in {@link #ranges} covered by this tenured range
-         */
-        int lastIndex;
-
-        /**
-         * set when this record is serialized in a checkpoint list to either:
-         * <ul>
-         * <li>point to itself, in which case no action should be
-         *     taken on removal (it is only retained for size bookkeeping); or</li>
-         * <li>point to the next item in the checkpoint list; the first
-         *     such element removed triggers the clearing of the checkpoint
-         *     list so that its entries are re-inserted in the next checkpoint</li>
-         * </ul>
-         */
-        Tenured next;
-
-        /**
-         * Only set for link entries, i.e. where {@code index < 0}.
-         * <ul>
-         * <li>if positive, the length is optional as we will terminate safely using the end bound filtering</li>
-         * <li>if negative, the low 31 bits <b>must</b> be retrieved as the length for safe iteration</li>
-         * </ul>
-         */
-        int linkLength;
-
-        Tenured(RoutingKey end, int index)
-        {
-            this.end = end;
-            this.index = index;
-        }
-
-        @Override
-        public int compareTo(@Nonnull Tenured that)
-        {
-            int c = this.end.compareTo(that.end);
-            // we sort indexes in reverse order so later tenured items find the earlier ones with same end when searching
-            // for higher entries for the range of indexes to search, and
-            if (c == 0) c = -Integer.compare(this.index, that.index);
-            return c;
-        }
-
-        @Override
-        public String toString()
-        {
-            return "Tenured{end=" + end + ", index=" + index + '}';
-        }
-    }
-
-    /**
-     * The set of ranges that we intend to write to checkpoints that remain open at the current point in the iteration
-     * This collection may be filtered before serialization, but every member will be visited either by scanning
-     * or visiting the checkpoint list
-     * TODO (low priority, efficiency): save garbage by using an insertion-sorted array for collections where
-     *  this is sufficient. later, introduce a mutable b-tree supporting object recycling. we would also like
-     *  to use a collection that permits us to insert and return a finger into the tree so we can find the
-     *  successor as part of insertion, and that permits constant-time first() calls
-     */
-    static class TenuredSet extends TreeSet<Tenured>
-    {
-        /**
-         * the number of direct tenured entries (i.e. ignoring link entries)
-         * this is used to provide a minimum bound on the number of results a range query can return
-         * note: with Strategy.FAST this gets less accurate as the span distance increases
-         */
-        int directCount;
-        int directCountAtPrevCheckpoint;
-        int minSpan;
-
-        // a stack of recently used EndAndIndex objects - used only for the duration of a single build
-        Tenured reuse, pendingReuse, pendingReuseTail;
-
-        int count()
-        {
-            return directCount;
-        }
-
-        int countAtPrevCheckpoint()
-        {
-            return directCountAtPrevCheckpoint;
-        }
-
-        /**
-         * We require a checkpoint to cover a distance at least as large as the number of tenured ranges leftover
-         * since the prior checkpoint, to ensure these require at most linear additional space, while not requiring
-         * more than O(k) additional complexity on search (i.e., we will scan a number of elements at most equal
-         * to the number we have to visit in the checkpoint).
-         *
-         * We achieve this by recording the minimum number of match results as of the prior checkpoint (i.e. {@link #count()})
-         * and discounting it by one each time we untenure a range, so that for each tenured range from the prior checkpoint
-         * we have either untenured a range or processed at least one additional input.
-         */
-        int minimumSpan()
-        {
-            return minSpan;
-        }
-
-        private int tenure(RoutingKey end, int index, Range[] ranges, int minUntenureIndex)
-        {
-            return tenure(newTenured(end, index), ranges, minUntenureIndex, ranges.length);
-        }
-
-        private void tenure(RoutingKey end, int index, Range[] ranges, int minUntenureIndex, int untenureLimit)
-        {
-            tenure(newTenured(end, index), ranges, minUntenureIndex, untenureLimit);
-        }
-
-        private int tenure(Tenured tenure, Range[] ranges, int untenureMinIndex, int untenureLimit)
-        {
-            if (!add(tenure))
-                return tenure.lastIndex;
-
-            Tenured next = higher(tenure);
-            if (next != null)
-                untenureLimit = Math.min(untenureLimit, next.lastIndex + 1);
-            int untenureIndex = SortedArrays.binarySearch(ranges, untenureMinIndex, untenureLimit, tenure.end, (e, s) -> e.compareTo(s.start()), CEIL);
-            if (untenureIndex < 0) untenureIndex = -1 - untenureIndex;
-            tenure.lastIndex = untenureIndex - 1;
-            Invariants.checkState(tenure.end.compareTo(ranges[tenure.lastIndex].start()) > 0);
-            Invariants.checkState(tenure.lastIndex + 1 == ranges.length || tenure.end.compareTo(ranges[tenure.lastIndex + 1].start()) <= 0);
-            ++directCount;
-            return untenureIndex - 1;
-        }
-
-        private Tenured newTenured(RoutingKey end, int index)
-        {
-            Tenured result = reuse;
-            if (result == null)
-                return new Tenured(end, index);
-
-            reuse = result.next;
-            result.end = end;
-            result.index = index;
-            result.lastIndex = 0;
-            result.next = null;
-            return result;
-        }
-
-        private Tenured addLinkEntry(RoutingKey end, int index, int lastIndex, int length)
-        {
-            Invariants.checkArgument(index < 0);
-            Tenured result = newTenured(end, index);
-            result.linkLength = length;
-            result.lastIndex = lastIndex;
-            add(result);
-            return result;
-        }
-
-        /**
-         * Retire any active tenured ranges that no longer cover the pointer into ranges;
-         * if this crosses our checkpoint threshold, write a new checkpoint.
-         */
-        void untenure(int index)
-        {
-            while (!isEmpty() && first().lastIndex < index)
-            {
-                Tenured removed = pollFirst();
-
-                // if removed.next == null, this is not referenced by a link
-                // if removed.next == removed, it is referenced by a link but does not modify the link on removal
-                if (removed.next != null && removed.next != removed)
-                {
-                    // this is a member of a link's chain, which may serve one of two purposes:
-                    // 1) it may be the entry nominated to invalidate the link, due to the link
-                    //    membership shrinking below the required threshold; in which case we
-                    //    must clear the chain to reactivate its members for insertion into the
-                    //    next checkpoint, and remove the chain link itself
-                    // 2) it may be nominated as an entry to update the chain link info, to make
-                    //    it more succinct: if every entry of the chain remains active, and there
-                    //    are *many* entries then we need two integers to represent the chain, but
-                    //    as soon as any entry is invalid we can rely on this entry to terminate
-                    //    iteration, so we update the bookkeeping on the first entry we remove in
-                    //    this case
-
-                    // first clear the chain starting at the removed entry
-                    Tenured prev = removed, next = removed.next;
-                    while (next.next != null)
-                    {
-                        prev = next;
-                        next = next.next;
-                        prev.next = null;
-                    }
-                    Invariants.checkState(next.index < 0);
-                    if (prev.end == next.end)
-                    {
-                        // if this is the last entry in the link, the link is expired and should be removed/reused
-                        remove(next);
-                        if (pendingReuseTail == null)
-                            pendingReuseTail = next;
-                        next.next = pendingReuse;
-                        pendingReuse = next;
-                    }
-                    else if (next.linkLength < 0)
-                    {
-                        // otherwise, flag the link as safely consumed without knowing the length
-                        next.linkLength = next.linkLength & Integer.MAX_VALUE;
-                    }
-                }
-
-                // this was not a link reference; update our bookkeeping and save it for reuse
-                Invariants.checkState(removed.index >= 0);
-                --directCount;
-                --minSpan;
-                if (pendingReuseTail == null)
-                    pendingReuseTail = removed;
-                removed.next = pendingReuse;
-                pendingReuse = removed;
-            }
-        }
-
-        /**
-         * Write out any direct entries that are not pointed to by a chain entry, and any chain entries;
-         * rollover any per-checkpoint data and free up for reuse discarded Tenured objects
-         */
-        void rollover(PendingCheckpoint pending)
-        {
-            for (Tenured tenured : this)
-            {
-                if (tenured.next == null)
-                    pending.add(tenured);
-            }
-            // make freed Tenured objects available for reuse
-            if (pendingReuse != null)
-            {
-                pendingReuseTail.next = reuse;
-                reuse = pendingReuse;
-                pendingReuseTail = pendingReuse = null;
-            }
-            directCountAtPrevCheckpoint = minSpan = directCount;
-        }
-    }
-
-    /**
-     * we write checkpoints out before knowing the scan distance needed for the range, as a checkpoint precedes
-     * the ranges it covers; so we record the position and contents of the checkpoint, and once the scan distance is
-     * known (i.e. when the next checkpoint is written) we re-process the list to remove items we can now scan before
-     * serializing to checkpointListsBuf.
-     */
-    static class PendingCheckpoint
-    {
-        int atIndex = -1;
-        int count;
-
-        Tenured[] contents = new Tenured[10];
-
-        int openDirectCount, firstOpenDirect, openIndirectCount;
-        boolean hasClosedDirect;
-
-        int count()
-        {
-            return count;
-        }
-
-        Tenured get(int i)
-        {
-            return contents[i];
-        }
-
-        void add(Tenured tenured)
-        {
-            if (contents.length == count)
-                contents = Arrays.copyOf(contents, 2 * contents.length);
-            contents[count++] = tenured;
-        }
-
-        void clear()
-        {
-            count = 0;
-        }
-
-        /**
-         * Remove pending entries that will be scanned by the scanDistance, and update
-         * our bookkeeping for creating links
-         */
-        int filter(int scanDistance, int lastIndex)
-        {
-            int matchCountModifier = 0;
-            int maxi = count;
-            count = 0;
-            openDirectCount = 0;
-            openIndirectCount = 0;
-            firstOpenDirect = -1;
-//            lastClosedDirect = -1;
-
-            for (int i = 0; i < maxi ; ++i)
-            {
-                Tenured t = get(i);
-                if (t.index >= 0)
-                {
-                    if (t.index + scanDistance >= lastIndex)
-                        continue; // last index will find it with a scan
-
-                    if (t.lastIndex <= t.index + scanDistance)
-                        continue; // all indexes will find it with a scan
-
-                    if (t.lastIndex > lastIndex)
-                    {
-                        // this range remains open for the next checkpoint;
-                        // we may want to reference this list from there
-                        // so track count and position of first one to make a determination
-                        ++openDirectCount;
-                        if (firstOpenDirect < 0) firstOpenDirect = count;
-                    }
-                    else hasClosedDirect = true;
-                }
-                else
-                {
-                    // note: we over count here, as we count pointers within the chain
-                    matchCountModifier += (t.linkLength & Integer.MAX_VALUE) - 1; // (subtract 1 to discount the pointer)
-                    if (t.lastIndex > lastIndex)
-                        ++openIndirectCount;
-                }
-
-                if (i == count) ++count;
-                else contents[count++] = t;
-            }
-
-            return count + matchCountModifier;
-        }
-
-        /**
-         * Setup a link for referencing this chain later, if permitted.
-         * Must have at least two items, and at least as many direct records as indirect
-         */
-        void setupLinkChain(TenuredSet tenured, int startIndex, int endIndex)
-        {
-            int minSizeToReference = openIndirectCount + MIN_INDIRECT_LINK_LENGTH;
-            if (openDirectCount >= minSizeToReference)
-            {
-                int i = firstOpenDirect;
-                Tenured prev = get(i++);
-
-                while (openDirectCount > minSizeToReference)
-                {
-                    Tenured e = get(i++);
-                    if (e.index < 0)
-                    {
-                        --minSizeToReference;
-                        continue;
-                    }
-
-                    Invariants.checkState(prev.next == null);
-                    prev.next = prev;
-                    prev = e;
-                    --openDirectCount;
-                }
-
-                while (i < count)
-                {
-                    Tenured next = get(i++);
-                    if (next.index < 0)
-                        continue;
-
-                    Invariants.checkState(prev.next == null);
-                    prev.next = next;
-                    prev = next;
-                }
-
-                // may be more than one entry per item (though usually not)
-                int length = endIndex - startIndex;
-                Tenured chainEntry = tenured.addLinkEntry(prev.end, BIT31 | startIndex, prev.lastIndex, length);
-                prev.next = chainEntry;
-                if (hasClosedDirect && (startIndex > 0xfffff || (length > 0xff)))
-                {
-                    // TODO (expected, testing): make sure this is tested, as not a common code path (may never be executed in normal operation)
-                    // we have no closed ranges so iteration needs to know the end bound, but we cannot encode our bounds cheaply
-                    // so link the first bound to the chain entry, so that on removal it triggers an update of endIndex to note
-                    // that it can be iterated safely without an end bound
-                    get(firstOpenDirect).next = chainEntry;
-                }
-            }
-        }
-
-        @Override
-        public String toString()
-        {
-            return Arrays.stream(contents, 0, count)
-                    .map(Objects::toString)
-                    .collect(Collectors.joining(",", "[", "]"));
-        }
+        return build((ranges, bounds, headers, lists, maxScanAndCheckpointMatches) ->
+                     new SearchableRangeList(ranges, bounds, headers, lists, maxScanAndCheckpointMatches));
     }
 }
diff --git a/accord-core/src/main/java/accord/utils/async/AsyncChains.java b/accord-core/src/main/java/accord/utils/async/AsyncChains.java
index 579bbf9..a18f782 100644
--- a/accord-core/src/main/java/accord/utils/async/AsyncChains.java
+++ b/accord-core/src/main/java/accord/utils/async/AsyncChains.java
@@ -831,6 +831,18 @@
         }
     }
 
+    public static <V> V getUnchecked(AsyncChain<V> chain)
+    {
+        try
+        {
+            return getUninterruptibly(chain);
+        }
+        catch (ExecutionException e)
+        {
+            throw new RuntimeException(e);
+        }
+    }
+
     public static void awaitUninterruptibly(AsyncChain<?> chain)
     {
         try
diff --git a/accord-core/src/main/java/accord/utils/random/Picker.java b/accord-core/src/main/java/accord/utils/random/Picker.java
index f6f07d7..272f17e 100644
--- a/accord-core/src/main/java/accord/utils/random/Picker.java
+++ b/accord-core/src/main/java/accord/utils/random/Picker.java
@@ -26,6 +26,18 @@
 
 public class Picker
 {
+    public static float[] randomWeights(RandomSource random, int length)
+    {
+        float[] weights = new float[length - 1];
+        float sum = 0;
+        for (int i = 0 ; i < weights.length ; ++i)
+            weights[i] = sum += random.nextFloat();
+        sum += random.nextFloat();
+        for (int i = 0 ; i < weights.length ; ++i)
+            weights[i] /= sum;
+        return weights;
+    }
+
     static abstract class Weighted
     {
         final RandomSource random;
@@ -33,7 +45,7 @@
 
         public Weighted(RandomSource random, int length)
         {
-            this(random, randomWeights(random, length));
+            this(random, Picker.randomWeights(random, length));
         }
 
         public Weighted(RandomSource random, float[] weights)
@@ -42,17 +54,6 @@
             this.weights = weights;
         }
 
-        static float[] randomWeights(RandomSource random, int length)
-        {
-            float[] weights = new float[length - 1];
-            float sum = 0;
-            for (int i = 0 ; i < weights.length ; ++i)
-                weights[i] = sum += random.nextFloat();
-            sum += random.nextFloat();
-            for (int i = 0 ; i < weights.length ; ++i)
-                weights[i] /= sum;
-            return weights;
-        }
 
         static float[] randomWeights(RandomSource random, float[] bias)
         {
@@ -104,7 +105,7 @@
 
         public static <T> WeightedObjectPicker<T> randomWeighted(RandomSource random, T[] values)
         {
-            return new WeightedObjectPicker<>(random, values, randomWeights(random, values.length));
+            return new WeightedObjectPicker<>(random, values, Picker.randomWeights(random, values.length));
         }
 
         public static <T> WeightedObjectPicker<T> randomWeighted(RandomSource random, T[] values, float[] bias)
diff --git a/accord-core/src/test/java/accord/utils/Gen.java b/accord-core/src/test/java/accord/utils/Gen.java
index af86340..04b6e63 100644
--- a/accord-core/src/test/java/accord/utils/Gen.java
+++ b/accord-core/src/test/java/accord/utils/Gen.java
@@ -99,6 +99,21 @@
         return Stream.generate(() -> next(rs));
     }
 
+    interface Int2IntMapFunction
+    {
+        int applyAsInt(RandomSource rs, int value);
+    }
+
+    interface Int2LongMapFunction
+    {
+        long applyAsLong(RandomSource rs, int value);
+    }
+
+    interface Long2LongMapFunction
+    {
+        long applyAsLong(RandomSource rs, long value);
+    }
+
     interface IntGen extends Gen<Integer>
     {
         int nextInt(RandomSource random);
@@ -114,6 +129,16 @@
             return r -> fn.applyAsInt(nextInt(r));
         }
 
+        default IntGen mapAsInt(Int2IntMapFunction fn)
+        {
+            return r -> fn.applyAsInt(r, nextInt(r));
+        }
+
+        default LongGen mapAsLong(Int2LongMapFunction fn)
+        {
+            return r -> fn.applyAsLong(r, nextInt(r));
+        }
+
         default Gen.IntGen filterAsInt(IntPredicate fn)
         {
             return rs -> {
@@ -159,6 +184,11 @@
             return r -> fn.applyAsLong(nextLong(r));
         }
 
+        default LongGen mapAsLong(Long2LongMapFunction fn)
+        {
+            return r -> fn.applyAsLong(r, nextLong(r));
+        }
+
         default Gen.LongGen filterAsLong(LongPredicate fn)
         {
             return rs -> {
diff --git a/accord-core/src/test/java/accord/utils/Gens.java b/accord-core/src/test/java/accord/utils/Gens.java
index 35e4567..3723fc6 100644
--- a/accord-core/src/test/java/accord/utils/Gens.java
+++ b/accord-core/src/test/java/accord/utils/Gens.java
@@ -35,8 +35,12 @@
 import java.util.function.Function;
 import java.util.function.Supplier;
 import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+import java.util.stream.LongStream;
 import java.util.stream.Stream;
 
+import accord.utils.random.Picker;
+
 public class Gens {
     private Gens() {
     }
@@ -51,6 +55,27 @@
         return ignore -> constant.get();
     }
 
+    public static <T> Gen<T> oneOf(Gen<T>... gens)
+    {
+        return oneOf(Arrays.asList(gens));
+    }
+
+    public static <T> Gen<T> oneOf(List<Gen<T>> gens)
+    {
+        return rs -> rs.pick(gens).next(rs);
+    }
+
+    public static <T> Gen<T> oneOf(Map<Gen<T>, Integer> values)
+    {
+        Gen<Gen<T>> gen = pick(values);
+        return rs -> gen.next(rs).next(rs);
+    }
+
+    public static Gen.IntGen pickInt(int... ts)
+    {
+        return rs -> ts[rs.nextInt(0, ts.length)];
+    }
+
     public static <T> Gen<T> pick(T... ts)
     {
         return pick(Arrays.asList(ts));
@@ -116,6 +141,256 @@
         };
     }
 
+    public static Gen.LongGen pickZipf(long[] array)
+    {
+        if (array == null || array.length == 0)
+            throw new IllegalArgumentException("Empty array given");
+        if (array.length == 1)
+            return ignore -> array[0];
+        BigDecimal[] weights = new BigDecimal[array.length];
+        BigDecimal base = BigDecimal.valueOf(Math.pow(2, array.length));
+        weights[0] = base;
+        for (int i = 1; i < array.length; i++)
+            weights[i] = base.divide(BigDecimal.valueOf(i + 1), RoundingMode.UP);
+        BigDecimal totalWeights = Stream.of(weights).reduce(BigDecimal.ZERO, BigDecimal::add);
+
+        return rs -> {
+            BigDecimal value = BigDecimal.valueOf(rs.nextDouble()).multiply(totalWeights);
+            for (int i = 0; i < weights.length; i++)
+            {
+                value = value.subtract(weights[i]);
+                if (value.compareTo(BigDecimal.ZERO) <= 0)
+                    return array[i];
+            }
+            return array[array.length - 1];
+        };
+    }
+
+    public static <T> Gen<T> pickZipf(T... array)
+    {
+        return pickZipf(Arrays.asList(array));
+    }
+
+    public static <T> Gen<T> pickZipf(List<T> array)
+    {
+        if (array == null || array.isEmpty())
+            throw new IllegalArgumentException("Empty array given");
+        if (array.size() == 1)
+            return ignore -> array.get(0);
+        BigDecimal[] weights = new BigDecimal[array.size()];
+        BigDecimal base = BigDecimal.valueOf(Math.pow(2, array.size()));
+        weights[0] = base;
+        for (int i = 1; i < array.size(); i++)
+            weights[i] = base.divide(BigDecimal.valueOf(i + 1), RoundingMode.UP);
+        BigDecimal totalWeights = Stream.of(weights).reduce(BigDecimal.ZERO, BigDecimal::add);
+
+        return rs -> {
+            BigDecimal value = BigDecimal.valueOf(rs.nextDouble()).multiply(totalWeights);
+            for (int i = 0; i < weights.length; i++)
+            {
+                value = value.subtract(weights[i]);
+                if (value.compareTo(BigDecimal.ZERO) <= 0)
+                    return array.get(i);
+            }
+            return array.get(array.size() - 1);
+        };
+    }
+
+    public static Gen<Gen.IntGen> randomWeights(int[] array)
+    {
+        return rs -> {
+            float[] weights = Picker.randomWeights(rs, array.length);
+            return r -> array[index(r, weights)];
+        };
+    }
+
+    public static Gen<Gen.LongGen> randomWeights(long[] array)
+    {
+        return rs -> {
+            float[] weights = Picker.randomWeights(rs, array.length);
+            return r -> array[index(r, weights)];
+        };
+    }
+
+    public static <T> Gen<Gen<T>> randomWeights(T[] array)
+    {
+        return rs -> {
+            float[] weights = Picker.randomWeights(rs, array.length);
+            return r -> array[index(r, weights)];
+        };
+    }
+
+    public static <T> Gen<Gen<T>> randomWeights(List<T> array)
+    {
+        return rs -> {
+            float[] weights = Picker.randomWeights(rs, array.size());
+            return r -> array.get(index(r, weights));
+        };
+    }
+
+    private static int index(RandomSource rs, float[] weights)
+    {
+        int i = Arrays.binarySearch(weights, rs.nextFloat());
+        if (i < 0) i = -1 - i;
+        return i;
+    }
+
+    public static Gen<Gen.IntGen> mixedDistribution(int minInclusive, int maxExclusive)
+    {
+        int domainSize = (maxExclusive - minInclusive + 1);
+        if (domainSize < 0)
+            throw new IllegalArgumentException("Range is too large; min=" + minInclusive + ", max=" + maxExclusive);
+        int[] array, indexes;
+        if (domainSize > 200) // randomly selected
+        {
+            int numBuckets = 10;
+            int delta = domainSize / numBuckets;
+            array = new int[numBuckets];
+            for (int i = 0; i < numBuckets; i++)
+                array[i] = minInclusive + i * delta;
+            indexes = IntStream.range(0, array.length).toArray();
+        }
+        else
+        {
+            array = IntStream.range(minInclusive, maxExclusive).toArray();
+            indexes = null;
+        }
+        return rs -> {
+            switch (rs.nextInt(0, 4))
+            {
+                case 0: // uniform
+                    return r -> r.nextInt(minInclusive, maxExclusive);
+                case 1: // median biased
+                    int median = rs.nextInt(minInclusive, maxExclusive);
+                    return r -> r.nextBiasedInt(minInclusive, median, maxExclusive);
+                case 2: // zipf
+                    if (indexes == null)
+                        return Gens.pickZipf(rs.nextBoolean() ? reverseAndCopy(array) : array);
+                    return Gens.pickZipf(rs.nextBoolean() ? reverseAndCopy(indexes) : indexes).mapAsInt((r, index) -> {
+                        int start = array[index];
+                        int end = index == array.length - 1 ? maxExclusive : array[index + 1];
+                        return r.nextInt(start, end);
+                    });
+                case 3: // random weight
+                    if (indexes == null)
+                        return randomWeights(array).next(rs);
+                    return randomWeights(indexes).next(rs).mapAsInt((r, index) -> {
+                        int start = array[index];
+                        int end = index == array.length - 1 ? maxExclusive : array[index + 1];
+                        return r.nextInt(start, end);
+                    });
+                default:
+                    throw new AssertionError();
+            }
+        };
+    }
+
+    private static int[] reverseAndCopy(int[] array)
+    {
+        array = Arrays.copyOf(array, array.length);
+        for (int i = 0, mid = array.length / 2, j = array.length - 1; i < mid; i++, j--)
+        {
+            int tmp = array[i];
+            array[i] = array[j];
+            array[j] = tmp;
+        }
+        return array;
+    }
+
+    public static Gen<Gen.LongGen> mixedDistribution(long minInclusive, long maxExclusive)
+    {
+        long domainSize = (maxExclusive - minInclusive + 1);
+        if (domainSize < 0)
+            throw new IllegalArgumentException("Range is too large; min=" + minInclusive + ", max=" + maxExclusive);
+        long[] array;
+        int[] indexes;
+        if (domainSize > 200) // randomly selected
+        {
+            int numBuckets = 10;
+            long delta = domainSize / numBuckets;
+            array = new long[numBuckets];
+            for (int i = 0; i < numBuckets; i++)
+                array[i] = minInclusive + i * delta;
+            indexes = IntStream.range(0, array.length).toArray();
+        }
+        else
+        {
+            array = LongStream.range(minInclusive, maxExclusive).toArray();
+            indexes = null;
+        }
+        return rs -> {
+            switch (rs.nextInt(0, 4))
+            {
+                case 0: // uniform
+                    return r -> r.nextLong(minInclusive, maxExclusive);
+                case 1: // median biased
+                    long median = rs.nextLong(minInclusive, maxExclusive);
+                    return r -> r.nextBiasedLong(minInclusive, median, maxExclusive);
+                case 2: // zipf
+                    if (indexes == null)
+                        return Gens.pickZipf(rs.nextBoolean() ? reverseAndCopy(array) : array);
+                    return Gens.pickZipf(rs.nextBoolean() ? reverseAndCopy(indexes) : indexes).mapAsLong((r, index) -> {
+                        long start = array[index];
+                        long end = index == array.length - 1 ? maxExclusive : array[index + 1];
+                        return r.nextLong(start, end);
+                    });
+                case 3: // random weight
+                    if (indexes == null)
+                        return randomWeights(array).next(rs);
+                    return randomWeights(indexes).next(rs).mapAsLong((r, index) -> {
+                        long start = array[index];
+                        long end = index == array.length - 1 ? maxExclusive : array[index + 1];
+                        return r.nextLong(start, end);
+                    });
+                default:
+                    throw new AssertionError();
+            }
+        };
+    }
+
+    private static long[] reverseAndCopy(long[] array)
+    {
+        array = Arrays.copyOf(array, array.length);
+        for (int i = 0, mid = array.length / 2, j = array.length - 1; i < mid; i++, j--)
+        {
+            long tmp = array[i];
+            array[i] = array[j];
+            array[j] = tmp;
+        }
+        return array;
+    }
+
+    public static <T> Gen<Gen<T>> mixedDistribution(T... list)
+    {
+        return mixedDistribution(Arrays.asList(list));
+    }
+
+    public static <T> Gen<Gen<T>> mixedDistribution(List<T> list)
+    {
+        return rs -> {
+            switch (rs.nextInt(0, 4))
+            {
+                case 0: // uniform
+                    return r -> list.get(rs.nextInt(0, list.size()));
+                case 1: // median biased
+                    int median = rs.nextInt(0, list.size());
+                    return r -> list.get(r.nextBiasedInt(0, median, list.size()));
+                case 2: // zipf
+                    List<T> array = list;
+                    if (rs.nextBoolean())
+                    {
+                        array = new ArrayList<>(list);
+                        Collections.reverse(array);
+                    }
+                    return pickZipf(array);
+                case 3: // random weight
+                    return randomWeights(list).next(rs);
+                default:
+                    throw new AssertionError();
+            }
+        };
+    }
+
     public static Gen<char[]> charArray(Gen.IntGen sizes, char[] domain)
     {
         return charArray(sizes, domain, (a, b) -> true);
@@ -240,6 +515,28 @@
                 }
             };
         }
+
+        public Gen<Gen<Boolean>> mixedDistribution()
+        {
+            return rs -> {
+                int selection = rs.nextInt(0, 4);
+                switch (selection)
+                {
+                    case 0: // uniform 50/50
+                        return r -> r.nextBoolean();
+                    case 1: // variable frequency
+                        var freq = rs.nextFloat();
+                        return r -> r.decide(freq);
+                    case 2: // fixed result
+                        boolean result = rs.nextBoolean();
+                        return ignore -> result;
+                    case 3: // biased repeating runs
+                        return biasedRepeatingRuns(rs.nextDouble(), rs.nextInt(1, 100));
+                    default:
+                        throw new IllegalStateException("Unexpected int for bool selection: " + selection);
+                }
+            };
+        }
     }
 
     public static class IntDSL
@@ -265,6 +562,11 @@
                 return r -> r.nextInt(min, max);
             return r -> r.nextInt(min, max + 1);
         }
+
+        public Gen<Gen.IntGen> mixedDistribution(int minInclusive, int maxExclusive)
+        {
+            return Gens.mixedDistribution(minInclusive, maxExclusive);
+        }
     }
 
     public static class LongDSL {
@@ -296,6 +598,11 @@
             return pick(klass.getEnumConstants());
         }
 
+        public <T extends Enum<T>> Gen<Gen<T>> allMixedDistribution(Class<T> klass)
+        {
+            return mixedDistribution(klass.getEnumConstants());
+        }
+
         public <T extends Enum<T>> Gen<T> allWithWeights(Class<T> klass, int... weights)
         {
             T[] constants = klass.getEnumConstants();
diff --git a/accord-core/src/test/java/accord/utils/Property.java b/accord-core/src/test/java/accord/utils/Property.java
index 9c81375..50eba0c 100644
--- a/accord-core/src/test/java/accord/utils/Property.java
+++ b/accord-core/src/test/java/accord/utils/Property.java
@@ -23,12 +23,16 @@
 import accord.utils.async.AsyncResults;
 
 import java.time.Duration;
+import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collection;
+import java.util.List;
 import java.util.Objects;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ThreadLocalRandom;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
+import java.util.stream.Collectors;
 import javax.annotation.Nullable;
 
 public class Property
@@ -75,6 +79,7 @@
         public T withTimeout(Duration timeout)
         {
             this.timeout = timeout;
+            this.pure = false;
             return (T) this;
         }
 
@@ -168,7 +173,10 @@
         }
         try
         {
-            return value.toString();
+            String result = value.toString();
+            if (result != null && result.length() > 100 && value instanceof Collection)
+                result = ((Collection<?>) value).stream().map(o -> "\n\t     " + o).collect(Collectors.joining(",", "[", "]"));
+            return result;
         }
         catch (Throwable t)
         {
@@ -176,7 +184,7 @@
         }
     }
 
-    private static String propertyError(Common<?> input, Throwable cause, Object... values)
+    private static StringBuilder propertyErrorCommon(Common<?> input, Throwable cause)
     {
         StringBuilder sb = new StringBuilder();
         // return "Seed=" + seed + "\nExamples=" + examples;
@@ -194,15 +202,33 @@
                 msg = cause.getClass().getCanonicalName();
             sb.append(msg).append('\n');
         }
+        return sb;
+    }
+
+    private static String propertyError(Common<?> input, Throwable cause, Object... values)
+    {
+        StringBuilder sb = propertyErrorCommon(input, cause);
         if (values != null)
         {
             sb.append("Values:\n");
             for (int i = 0; i < values.length; i++)
-                sb.append('\t').append(i).append(" = ").append(normalizeValue(values[i])).append('\n');
+                sb.append('\t').append(i).append(" = ").append(normalizeValue(values[i])).append(": ").append(values[i] == null ? "unknown type" : values[i].getClass().getCanonicalName()).append('\n');
         }
         return sb.toString();
     }
 
+    private static String statefulPropertyError(StatefulBuilder input, Throwable cause, Object state, List<String> history)
+    {
+        StringBuilder sb = propertyErrorCommon(input, cause);
+        sb.append("Steps: ").append(input.steps).append('\n');
+        sb.append("Values:\n");
+        sb.append("\tState: ").append(state).append(": ").append(state == null ? "unknown type" : state.getClass().getCanonicalName()).append('\n');
+        sb.append("\tHistory:").append('\n');
+        for (var event : history)
+            sb.append("\t\t").append(event).append('\n');
+        return sb.toString();
+    }
+
     public interface FailingConsumer<A>
     {
         void accept(A value) throws Exception;
@@ -380,4 +406,116 @@
     {
         return new ForBuilder();
     }
+
+    public static StatefulBuilder stateful()
+    {
+        return new StatefulBuilder();
+    }
+
+    public static class StatefulBuilder extends Common<StatefulBuilder>
+    {
+        protected int steps = 1000;
+
+        public StatefulBuilder()
+        {
+            examples = 500;
+        }
+
+        public StatefulBuilder withSteps(int steps)
+        {
+            this.steps = steps;
+            return this;
+        }
+
+        @SuppressWarnings("rawtypes")
+        public <State, SystemUnderTest> void check(Commands<State, SystemUnderTest> commands)
+        {
+            RandomSource rs = new DefaultRandom(seed);
+            for (int i = 0; i < examples; i++)
+            {
+                State state = null;
+                List<String> history = new ArrayList<>(steps);
+                try
+                {
+                    checkInterrupted();
+
+                    state = commands.genInitialState().next(rs);
+                    SystemUnderTest sut = commands.createSut(state);
+
+                    try
+                    {
+                        for (int j = 0; j < steps; j++)
+                        {
+                            Gen<Command<State, SystemUnderTest, ?>> cmdGen = commands.commands(state);
+                            Command cmd = cmdGen.next(rs);
+                            for (int a = 0; cmd.checkPreconditions(state) != PreCheckResult.Ok && a < 42; a++)
+                            {
+                                if (a == 41)
+                                    throw new IllegalArgumentException("Unable to find next command");
+                                cmd = cmdGen.next(rs);
+                            }
+                            history.add(cmd.detailed(state));
+                            Object stateResult = cmd.apply(state);
+                            cmd.checkPostconditions(state, stateResult,
+                                                    sut, cmd.run(sut));
+                        }
+                    }
+                    finally
+                    {
+                        commands.destroySut(sut);
+                        commands.destroyState(state);
+                    }
+                }
+                catch (Throwable t)
+                {
+                    throw new PropertyError(statefulPropertyError(this, t, state, history), t);
+                }
+                if (pure)
+                {
+                    seed = rs.nextLong();
+                    rs.setSeed(seed);
+                }
+            }
+        }
+    }
+
+    public enum PreCheckResult { Ok, Ignore }
+    public interface Command<State, SystemUnderTest, Result>
+    {
+        default PreCheckResult checkPreconditions(State state) {return PreCheckResult.Ok;}
+        Result apply(State state) throws Throwable;
+        Result run(SystemUnderTest sut) throws Throwable;
+        default void checkPostconditions(State state, Result expected,
+                                         SystemUnderTest sut, Result actual) throws Throwable {}
+        default String detailed(State state) {return this.toString();}
+    }
+
+    public interface UnitCommand<State, SystemUnderTest> extends Command<State, SystemUnderTest, Void>
+    {
+        void applyUnit(State state) throws Throwable;
+        void runUnit(SystemUnderTest sut) throws Throwable;
+
+        @Override
+        default Void apply(State state) throws Throwable
+        {
+            applyUnit(state);
+            return null;
+        }
+
+        @Override
+        default Void run(SystemUnderTest sut) throws Throwable
+        {
+            runUnit(sut);
+            return null;
+        }
+    }
+
+    public interface Commands<State, SystemUnderTest>
+    {
+        Gen<State> genInitialState() throws Throwable;
+        SystemUnderTest createSut(State state) throws Throwable;
+        default void destroyState(State state) throws Throwable {}
+        default void destroySut(SystemUnderTest sut) throws Throwable {}
+        Gen<Command<State, SystemUnderTest, ?>> commands(State state) throws Throwable;
+    }
 }
diff --git a/accord-core/src/test/java/accord/utils/SearchableRangeListTest.java b/accord-core/src/test/java/accord/utils/SearchableRangeListTest.java
new file mode 100644
index 0000000..d31c95c
--- /dev/null
+++ b/accord-core/src/test/java/accord/utils/SearchableRangeListTest.java
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package accord.utils;
+
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.List;
+import java.util.function.BiConsumer;
+
+import org.junit.jupiter.api.Test;
+
+import accord.impl.IntKey;
+import accord.primitives.Range;
+import org.assertj.core.api.Assertions;
+
+import static accord.utils.Property.qt;
+
+class SearchableRangeListTest
+{
+    @Test
+    public void fullWorld()
+    {
+        int numRanges = 1000;
+        List<Range> ranges = new ArrayList<>(numRanges);
+        for (int i = 0; i < numRanges; i++)
+            ranges.add(IntKey.range(i, i + 1));
+
+        SearchableRangeList list = SearchableRangeList.build(ranges.toArray(new Range[0]));
+        class Counter { int value;}
+        BiConsumer<Integer, Integer> test = (rangeStart, rangeEnd) -> {
+            Counter counter = new Counter();
+            list.forEach(IntKey.range(rangeStart, rangeEnd), (a, b, c, d, e) -> {
+                counter.value++;
+            }, (a, b, c, d, start, end) -> {
+                counter.value += (end - start + 1);
+            }, 0, 0, 0, 0, 0);
+            Assertions.assertThat(counter.value).isEqualTo(rangeEnd - rangeStart + 1);
+        };
+        for (int i = 0; i < numRanges; i++)
+            test.accept(i, numRanges);
+        for (int i = 0; i < numRanges; i++)
+            test.accept(0, numRanges - i);
+    }
+
+    @Test
+    public void random()
+    {
+        qt().check(rs -> {
+            int numRanges = rs.nextInt(1000, 10000);
+            List<Range> ranges = new ArrayList<>(numRanges);
+            for (int i = 0; i < numRanges; i++)
+            {
+                int start = rs.nextInt(Integer.MIN_VALUE, Integer.MAX_VALUE - 1000);
+                int offset = rs.nextInt(1, 1000);
+                ranges.add(IntKey.range(start, start + offset));
+            }
+            ranges.sort(Comparator.comparing(Range::start));
+
+            SearchableRangeList list = SearchableRangeList.build(ranges.toArray(new Range[0]));
+            for (int i = 0; i < 1000; i++)
+            {
+                Range range;
+                int selection = rs.nextInt(0, 3);
+                switch (selection)
+                {
+                    case 0:
+                        range = rs.pick(ranges);
+                        break;
+                    case 1:
+                        int rangeStart = rs.nextInt(Integer.MIN_VALUE, Integer.MAX_VALUE - 1000);
+                        int offset = rs.nextInt(1, 1000);
+                        range = IntKey.range(rangeStart, rangeStart + offset);
+                        break;
+                    case 2:
+                        int start = rs.nextInt(0, ranges.size());
+                        int end = start + rs.nextInt(0, (ranges.size() - start));
+                        range = IntKey.range(((IntKey) ranges.get(start).start()).key, ((IntKey) ranges.get(end).end()).key);
+                        break;
+                    default:
+                        throw new IllegalStateException("Unhandled value");
+                }
+                List<Range> expected = new ArrayList<>();
+                for (Range r : ranges)
+                {
+                    if (range.compareIntersecting(r) == 0)
+                        expected.add(r);
+                }
+                List<Range> actual = new ArrayList<>(expected.size());
+                list.forEach(range, (a, b, c, d, idx) -> {
+                    actual.add(list.ranges[idx]);
+                }, (a, b, c, d, start, end) -> {
+                    for (int j = start; j < end; j++)
+                        actual.add(list.ranges[j]);
+                }, 0, 0, 0, 0, 0);
+
+                Assertions.assertThat(actual).isEqualTo(expected);
+            }
+        });
+    }
+}
\ No newline at end of file
diff --git a/buildSrc/src/main/groovy/accord.java-conventions.gradle b/buildSrc/src/main/groovy/accord.java-conventions.gradle
index f77a0c0..32cd21b 100644
--- a/buildSrc/src/main/groovy/accord.java-conventions.gradle
+++ b/buildSrc/src/main/groovy/accord.java-conventions.gradle
@@ -29,7 +29,7 @@
 }
 
 compileJava {
-    sourceCompatibility = JavaVersion.VERSION_1_8
+    sourceCompatibility = JavaVersion.VERSION_11
 
     dependsOn(':rat')
 }