Accord: PreLoadContext must properly and consistently support ranges
patch by David Capwell; reviewed by Benedict Elliott Smith for CASSANDRA-19355
diff --git a/accord-core/src/main/java/accord/local/CommandsForKey.java b/accord-core/src/main/java/accord/local/CommandsForKey.java
index 0e24d67..9e628b9 100644
--- a/accord-core/src/main/java/accord/local/CommandsForKey.java
+++ b/accord-core/src/main/java/accord/local/CommandsForKey.java
@@ -1698,7 +1698,6 @@
if (o == null || getClass() != o.getClass()) return false;
CommandsForKey that = (CommandsForKey) o;
return Objects.equals(key, that.key)
- && Objects.equals(redundantBefore, that.redundantBefore)
&& Arrays.equals(txns, that.txns);
}
diff --git a/accord-core/src/main/java/accord/utils/CheckpointIntervalArray.java b/accord-core/src/main/java/accord/utils/CheckpointIntervalArray.java
new file mode 100644
index 0000000..84063da
--- /dev/null
+++ b/accord-core/src/main/java/accord/utils/CheckpointIntervalArray.java
@@ -0,0 +1,222 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package accord.utils;
+
+import java.util.Arrays;
+
+import accord.utils.CheckpointIntervalArrayBuilder.Accessor;
+import net.nicoulaj.compilecommand.annotations.Inline;
+
+import static accord.utils.SortedArrays.Search.CEIL;
+
+public class CheckpointIntervalArray<Ranges, Range, Key>
+{
+ // scan distance can be kept very small as we guarantee to use at most linear extra space even with a scan distance of zero
+ static final int MAX_SCAN_DISTANCE = 255;
+ protected static final int BIT30 = 0x40000000;
+ protected static final int BIT29 = 0x20000000;
+
+ final Ranges ranges;
+
+ /**
+ * The lower bound for each checkpoint.
+ * The checkpoint {@code i} applies to all ranges (incl) starting from {@code lowerBounds[i]},
+ * but before (excl) {@code lowerBounds[i+1]}.
+ */
+ final int[] lowerBounds;
+
+ /**
+ * Logically one entry per checkpoint, mapping {@link #lowerBounds} to {@link #checkpointLists},
+ * however we also encode an additional byte per entry representing the scan distance for the
+ * ranges handled by this checkpoint. These are grouped into an integer per four mappings, i.e.
+ * we encode batches of five ints, with the first int containing the four scan distances for the
+ * next four checkpoints, and the following four ints containing the respective offsets into
+ * {@link #checkpointLists}.
+ * <p>
+ * [0.........32b.........64b.........96b........128b........160b........192b]
+ * [ d1 d2 d3 d4 mapping1 mapping2 mapping3 mapping4 d5 d6 d7 d8 ]
+ */
+ final int[] headers;
+
+ /**
+ * A list of indexes in {@link #ranges} contained by each checkpoint; checkpoints are
+ * mapped from {@link #lowerBounds} by {@link #headers}.
+ * <p>
+ * Entries are sorted in descending order by the end of the range they cover, so that
+ * a search of this collection my terminate as soon as it encounters a range that does
+ * not cover the item we are searching for.
+ * <p>
+ * This collection may contain negative values, in which case these point to other
+ * checkpoints, whose <i>direct</i> contents (i.e. the positive values of) we may
+ * search.
+ * <ul> if negative, points to an earlier checkpoint, and:
+ * <li>if the 30th bit is set, the low 20 bits point to checkpointsList,
+ * and the 9 bits in-between provide the length of the range</li>
+ * <li>otherwise, if the 29th bit is set, the lower 29 bits points to checkpointsList,
+ * and can be iterated safely without an endIndex</li>
+ * <li>otherwise, the low 29 bits provide the length of the run, and the low 31 bits
+ * of the following entry (which will also be negative) provide a pointer to
+ * checkpointsList</li>
+ * </ul>
+ */
+ final int[] checkpointLists;
+
+ public final int maxScanAndCheckpointMatches;
+ private final Accessor<Ranges, Range, Key> accessor;
+
+ public CheckpointIntervalArray(Accessor<Ranges, Range, Key> accessor, Ranges ranges,
+ int[] lowerBounds, int[] headers, int[] checkpointLists, int maxScanAndCheckpointMatches)
+ {
+ this.accessor = accessor;
+ this.ranges = ranges;
+ this.lowerBounds = lowerBounds;
+ this.headers = headers;
+ this.checkpointLists = checkpointLists;
+ this.maxScanAndCheckpointMatches = maxScanAndCheckpointMatches;
+ }
+
+ @Inline
+ public <P1, P2, P3, P4> int forEach(Range range, IndexedQuadConsumer<P1, P2, P3, P4> forEachScanOrCheckpoint, IndexedRangeQuadConsumer<P1, P2, P3, P4> forEachRange, P1 p1, P2 p2, P3 p3, P4 p4, int minIndex)
+ {
+ return forEach(accessor.start(range), accessor.end(range), forEachScanOrCheckpoint, forEachRange, p1, p2, p3, p4, minIndex);
+ }
+
+ public <P1, P2, P3, P4> int forEach(Key startKey, Key endKey, IndexedQuadConsumer<P1, P2, P3, P4> forEachScanOrCheckpoint, IndexedRangeQuadConsumer<P1, P2, P3, P4> forEachRange, P1 p1, P2 p2, P3 p3, P4 p4, int minIndex)
+ {
+ if (accessor.size(ranges) == 0 || minIndex == accessor.size(ranges))
+ return minIndex;
+
+ var c = accessor.keyComparator();
+ int end = accessor.binarySearch(ranges, minIndex, accessor.size(ranges), endKey, (a, b) -> c.compare(a, accessor.start(b)), CEIL);
+ if (end < 0) end = -1 - end;
+ if (end <= minIndex) return minIndex;
+
+ int floor = accessor.binarySearch(ranges, minIndex, accessor.size(ranges), startKey, (a, b) -> c.compare(a, accessor.start(b)), CEIL);
+ int start = floor;
+ if (floor < 0)
+ {
+ // if there's no precise match on start, step backwards;
+ // if this range does not overlap us, step forwards again for start
+ // but retain the floor index for performing scan and checkpoint searches from
+ // as this contains all ranges that might overlap us (whereas those that end
+ // after us but before the next range's start would be missed by the next range index)
+ start = floor = -2 - floor;
+ if (start < 0)
+ start = floor = 0;
+ else if (c.compare(accessor.end(ranges, start), startKey) <= 0)
+ ++start;
+ }
+
+ // Since endInclusive() != startInclusive(), so no need to adjust start/end comparisons
+ return forEach(start, end, floor, startKey, 0, forEachScanOrCheckpoint, forEachRange, p1, p2, p3, p4, minIndex);
+ }
+
+ @Inline
+ protected <P1, P2, P3, P4> int forEach(int start, int end, int floor, Key startBound, int cmpStartBoundWithEnd,
+ IndexedQuadConsumer<P1, P2, P3, P4> forEachScanOrCheckpoint, IndexedRangeQuadConsumer<P1, P2, P3, P4> forEachRange,
+ P1 p1, P2 p2, P3 p3, P4 p4, int minIndex)
+ {
+ if (start < minIndex) start = minIndex;
+
+ // find the checkpoint array, so we know how far to step back
+ int checkpoint = Arrays.binarySearch(lowerBounds, floor);
+ if (checkpoint < 0) checkpoint = -2 - checkpoint;
+ if (checkpoint < 0) return end;
+
+ int headerBaseIndex = (checkpoint / 4) * 5;
+ int headerSubIndex = checkpoint & 3;
+ int headerListIndex = headerBaseIndex + 1 + headerSubIndex;
+
+ int scanDistance = (headers[headerBaseIndex] >>> (8 * headerSubIndex)) & 0xff;
+ int checkpointStart = headers[headerListIndex];
+ int checkpointEnd = headers[headerListIndex + (headerSubIndex + 5)/4]; // skip the next header
+
+ if (scanDistance == MAX_SCAN_DISTANCE)
+ {
+ scanDistance = -checkpointLists[checkpointStart++];
+ Invariants.checkState(scanDistance >= MAX_SCAN_DISTANCE);
+ }
+
+ // NOTE: we visit in approximately ascending order, and this is a requirement for correctness of RangeDeps builders
+ // Only the checkpoint is visited in uncertain order, but it is visited entirely, before the scan matches
+ // and the range matches
+ int minScanIndex = Math.max(floor - scanDistance, minIndex);
+ var c = accessor.keyComparator();
+ for (int i = checkpointStart; i < checkpointEnd ; ++i)
+ {
+ int ri = checkpointLists[i];
+ if (ri < 0)
+ {
+ int subStart, subEnd;
+ if ((ri & BIT30) != 0)
+ {
+ subStart = ri & 0xfffff;
+ subEnd = subStart + ((ri >>> 20) & 0x1ff);
+ }
+ else if ((ri & BIT29) != 0)
+ {
+ subStart = ri & 0x1fffffff;
+ subEnd = Integer.MAX_VALUE;
+ }
+ else
+ {
+ int length = ri & 0x1fffffff;
+ subStart = checkpointLists[++i];
+ subEnd = subStart + length;
+ }
+
+ for (int j = subStart ; j < subEnd ; ++j)
+ {
+ ri = checkpointLists[j];
+ if (ri < 0)
+ continue;
+
+ if (c.compare(accessor.end(ranges, ri), startBound) <= cmpStartBoundWithEnd)
+ break;
+
+ if (ri >= minIndex && ri < minScanIndex)
+ forEachScanOrCheckpoint.accept(p1, p2, p3, p4, ri);
+ }
+ }
+ else
+ {
+ // if startBound is key, we cannot be equal to it;
+ // if startBound is a Range start, we also cannot be equal to it due to the requirement that
+ // endInclusive() != startInclusive(), so equality really means inequality
+ if (c.compare(accessor.end(ranges, ri), startBound) <= cmpStartBoundWithEnd)
+ break;
+
+ if (ri >= minIndex && ri < minScanIndex)
+ forEachScanOrCheckpoint.accept(p1, p2, p3, p4, ri);
+ }
+ }
+
+ for (int i = minScanIndex; i < floor ; ++i)
+ {
+ if (c.compare(accessor.end(ranges, i), startBound) > cmpStartBoundWithEnd)
+ forEachScanOrCheckpoint.accept(p1, p2, p3, p4, i);
+ }
+
+ if (start == end)
+ return end;
+
+ forEachRange.accept(p1, p2, p3, p4, start, end);
+ return end;
+ }
+}
diff --git a/accord-core/src/main/java/accord/utils/CheckpointIntervalArrayBuilder.java b/accord-core/src/main/java/accord/utils/CheckpointIntervalArrayBuilder.java
new file mode 100644
index 0000000..95e1d86
--- /dev/null
+++ b/accord-core/src/main/java/accord/utils/CheckpointIntervalArrayBuilder.java
@@ -0,0 +1,1133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package accord.utils;
+
+import java.util.Arrays;
+import java.util.Comparator;
+import java.util.Objects;
+import java.util.TreeSet;
+import java.util.stream.Collectors;
+import javax.annotation.Nonnull;
+
+import static accord.utils.ArrayBuffers.cachedInts;
+import static accord.utils.CheckpointIntervalArray.MAX_SCAN_DISTANCE;
+import static accord.utils.SortedArrays.Search.CEIL;
+
+public class CheckpointIntervalArrayBuilder<Ranges, Range, RoutingKey>
+{
+ public enum Strategy
+ {
+ /**
+ * Do not tenure any ranges that are scannable from the currently in-effect max scan distance.
+ * This means we probably do less work on construction, but that our measure of the match count
+ * at any point is inaccurate, and so our heuristics for when to write checkpoints may be wrong,
+ * leading to more checkpoints than necessary.
+ */
+ FAST,
+
+ /**
+ * Tenure every range covering more than goalScanDistance. Any within max scan distance will also
+ * update the scan distance, so that they will be filtered when a checkpoint is written. But
+ * in the meantime they permit accurate tracking of the number of matches a query can return,
+ * permitting our complexity calculations (that determine when checkpoints should be written, and
+ * what our maximum scan distance should be), to be accurate. This can avoid bouncing between
+ * two extremes, where a low max scan distance tenures and correctly detects a desirable larger scan
+ * distance, which we rollover and this prevents us tenuring and tracking the number of matches, so
+ * that we then pick a low max scan distance (and thereby also write a new checkpoint)
+ */
+ ACCURATE
+ }
+
+ /**
+ * Should we maintain pointers to prior checkpoints that we may reference instead of reserializing
+ * the remaining contents. This is cheap to visit as we stop enumerating as soon as we encounter
+ * an entry that no longer covers us. We use some simple heuristics when deciding whether to do
+ * this, namely that there are at least two entries (so we save one checkpoint entry) and that
+ * there is at least one direct entry for each indirect/link entry in the range we will link.
+ */
+ public enum Links
+ {
+ LINKS,
+ NO_LINKS
+ }
+
+ public interface Accessor<Ranges, Range, RoutingKey>
+ {
+ int size(Ranges ranges);
+ Range get(Ranges ranges, int index);
+ RoutingKey start(Ranges ranges, int index);
+ RoutingKey start(Range range);
+ RoutingKey end(Ranges ranges, int index);
+ RoutingKey end(Range range);
+ Comparator<RoutingKey> keyComparator();
+ int binarySearch(Ranges ranges, int from, int to, RoutingKey find, AsymmetricComparator<RoutingKey, Range> comparator, SortedArrays.Search op);
+ }
+
+ private static final int BIT31 = 0x80000000;
+ private static final int BIT30 = 0x40000000;
+ private static final int BIT29 = 0x20000000;
+ static final int MIN_INDIRECT_LINK_LENGTH = 2;
+
+ final Accessor<Ranges, Range, RoutingKey> accessor;
+ final boolean isAccurate;
+ final boolean withLinks;
+ final Ranges ranges;
+
+ int[] bounds;
+ int[] headers;
+ int[] lists;
+ int checkpointCount, headerPointer, listCount;
+
+ final Scan<Ranges, Range, RoutingKey> scan;
+ final TenuredSet<Ranges, Range, RoutingKey> tenured;
+ final PendingCheckpoint<Ranges, Range, RoutingKey> pending = new PendingCheckpoint<>();
+
+ // track the maximum possible number of entries we can match with both a scan + checkpoint lookup
+ // this is an over-estimate and may be used by consumers to allocate out-of-order buffers for visitations
+ int maxScanAndCheckpointMatches;
+
+ public CheckpointIntervalArrayBuilder(Accessor<Ranges, Range, RoutingKey> accessor,
+ Ranges ranges,
+ Strategy strategy, Links links)
+ {
+ this(accessor, ranges, Math.min(MAX_SCAN_DISTANCE, 34 - Integer.numberOfLeadingZeros(accessor.size(ranges))), strategy, links);
+ }
+
+ public CheckpointIntervalArrayBuilder(Accessor<Ranges, Range, RoutingKey> accessor,
+ Ranges ranges,
+ int goalScanDistance,
+ Strategy strategy, Links links)
+ {
+ this.accessor = accessor;
+ this.isAccurate = strategy == Strategy.ACCURATE;
+ this.withLinks = links == Links.LINKS;
+ Invariants.checkArgument(goalScanDistance <= MAX_SCAN_DISTANCE);
+ Invariants.checkArgument(goalScanDistance > 0);
+ this.ranges = ranges;
+ this.scan = new Scan<>(accessor);
+ this.tenured = new TenuredSet<>(accessor);
+ init(ranges, goalScanDistance);
+ }
+
+ void init(Ranges ranges, int goalScanDistance)
+ {
+ // we write checkpoints at least goalScanDistance apart
+ scan.init(goalScanDistance);
+ ArrayBuffers.IntBuffers cachedInts = cachedInts();
+ // ask for int buffers in descending order of size
+ int size = accessor.size(ranges);
+ this.lists = cachedInts.getInts(size); // this one might need to grow
+ // +2 to round-up each division, and +2 to account for the final entry (which might require an empty scan distance header)
+ this.headers = cachedInts.getInts(((size / goalScanDistance) * 5) / 4 + 4);
+ this.bounds = cachedInts.getInts(size / goalScanDistance + 1);
+ }
+
+ public interface Factory<T, Ranges>
+ {
+ T build(Ranges ranges, int[] bounds, int[] headers, int[] lists, int maxScanAndCheckpointMatches);
+ }
+
+ /**
+ * Walk over each range, looking ahead by {@link #maxScanDistance} to decide if a range should
+ * be tenured (written to a checkpoint) or scanned; the maximum scan distance is determined by the
+ * number of open tenured entries, i.e. the minimum number of results we can expect to be returned
+ * (or, if greater, the logarithm of the number of ranges in the collection).
+ * <p>
+ * Once we encounter a range that should be tenured, either write a checkpoint immediately
+ * or make a note of the position we must scan to from the last entry in this checkpoint
+ * and wait until it is permitted to write a checkpoint. This range will be tenured either
+ * way for the following checkpoint.
+ * <p>
+ * The only reason not to write a checkpoint immediately is in the case we would breach
+ * our linear space complexity limit, which is imposed by ensuring we have a space between
+ * checkpoints at least as large as the number of entries written to the last checkpoint,
+ * discounted by the number of entries we have removed from the tenured collection since
+ * the last checkpoint.
+ */
+ public CheckpointIntervalArray<Ranges, Range, RoutingKey> build()
+ {
+ return build((ranges, bounds, headers, lists, maxScanAndCheckpointMatches) -> new CheckpointIntervalArray<>(accessor, ranges, bounds, headers, lists, maxScanAndCheckpointMatches));
+ }
+
+ public <T> T build(Factory<T, Ranges> factory)
+ {
+ int size = accessor.size(ranges);
+ for (int ri = 0 ; ri < size ; ++ri)
+ {
+ // write a checkpoint if we meet our linear space complexity requirements
+ // and we either have a tenured range that we must scan,
+ // or the scan distance is now much larger than the minimum number of search results
+ if (shouldWriteCheckpoint(ri))
+ writeCheckpoint(ri);
+
+ // either tenure or update scan distance, potentially writing a checkpoint
+ tenureOrScan(ri);
+ tenured.untenure(ri);
+ }
+
+ // write our final pending checkpoint
+ writeCheckpoint(size);
+ closeHeaders();
+
+ ArrayBuffers.IntBuffers cachedInts = cachedInts();
+ int[] lists = cachedInts.completeAndDiscard(this.lists, listCount);
+ int[] headers = cachedInts.completeAndDiscard(this.headers, headerPointer);
+ int[] bounds = cachedInts.completeAndDiscard(this.bounds, checkpointCount);
+ return factory.build(ranges, bounds, headers, lists, maxScanAndCheckpointMatches);
+ }
+
+ /**
+ * Categorise the candidateIdx as either scannable, and if so update the scan distance;
+ * or unscannable, in which case add it to the {@link #tenured} collection.
+ * Note, that in ACCURATE mode we tenure the item if it is outside of the goalScanDistance
+ * so we may track O(k) accurately above the O(lg2(N)) search and default scan distance,
+ * but we still update the scan distance so that the checkpoint will exclude this entry.
+ */
+ private void tenureOrScan(int index)
+ {
+ Invariants.checkArgument(index >= 0);
+
+ // then either migrate the index to pendingTenured, or ensure it will be scanned
+ RoutingKey end = accessor.end(ranges, index);
+ int scanLimit = scanLimit(index, isAccurate ? scan.goal : maxScanDistance());
+ if (shouldTenure(end, scanLimit))
+ {
+ int lastIndex = tenured.tenure(end, index, ranges, scanLimit + 1);
+ if (lastIndex - index > maxScanDistance()) scan.tenured(index);
+ else if (!isAccurate) throw new IllegalStateException();
+ else scan.updateScanDistance(index, lastIndex - index, this);
+ }
+ else
+ {
+ // TODO (low priority, efficiency): if the prior checkpoint has a scan distance >= this one,
+ // and <= 50% more than this one and there's no scanMustReachIndex nor tenuredRanges, don't
+ // write a new checkpoint (perhaps split shouldWriteCheckpoint logic in two)
+ scan.update(end, index, ranges, scanLimit, this);
+ }
+ }
+
+ /**
+ * We are forbidden from writing a checkpoint nearer than this to a prior checkpoint.
+ * This imposes our linear space complexity bounds, while not harming our O(log2(N) + K)
+ * complexity bounds, as we guarantee minimumSpan is never more than the number of query
+ * results.
+ */
+ private int minimumSpan()
+ {
+ return Math.max(scan.goal(), tenured.minimumSpan());
+ }
+
+ private int maxScanDistance()
+ {
+ // minimumSpan() reduces overtime, but there is no reason to reduce our scan distance
+ // for tenuring below the scan distance we will write
+ return Math.max(scan.watermark(), minimumSpan());
+ }
+
+ /**
+ * The index after the last index we can scan from {@code atIndex} with at most {@code maxScanDistance}.
+ */
+ private int scanLimit(int atIndex, int maxScanDistance)
+ {
+ return Math.min(1 + atIndex + maxScanDistance, accessor.size(ranges));
+ }
+
+ private boolean shouldTenure(RoutingKey end, int scanLimit)
+ {
+ return scanLimit < accessor.size(ranges) && accessor.keyComparator().compare(end, accessor.start(ranges, scanLimit)) > 0;
+ }
+
+ private boolean canWriteCheckpoint(int atIndex)
+ {
+ return atIndex - pending.atIndex >= minimumSpan();
+ }
+
+ private boolean shouldWriteCheckpoint(int atIndex)
+ {
+ if (!canWriteCheckpoint(atIndex))
+ return false;
+
+ // TODO (desired, efficiency): consider these triggers
+ if (scan.mustCheckpointToScanTenured(atIndex, maxScanDistance()))
+ return true;
+
+ return scan.hasMaybeDivergedFromMatchSize(tenured);
+ }
+
+ /**
+ * Write a checkpoint for ranges[prevCheckpointIndex...ri)
+ *
+ * 1) Finalise the scan distance
+ * 2) Write the header
+ * 3) Filter the pending tenured ranges to remove those we can scan
+ * 4) Write this list out
+ * 5) Setup a link to this list, if it is large enough
+ * 6) Rollover the scan, tenured and pending structures for the new pending checkpoint
+ */
+ private void writeCheckpoint(int nextCheckpointIndex)
+ {
+ int lastIndex = nextCheckpointIndex - 1;
+ int scanDistance = scan.finalise(lastIndex);
+ scanDistance = extendScanDistance(lastIndex, scanDistance);
+
+ if (pending.atIndex < 0)
+ {
+ // we don't have any checkpoints pending, so don't try to finalise it
+ // but if the new checkpoint doesn't cover index 0, insert a new empty
+ // checkpoint for the scan distance
+ if (nextCheckpointIndex > 0)
+ {
+ // setup an initial empty checkpoint to store the first scan distance
+ maxScanAndCheckpointMatches = scanDistance;
+ writeHeader(scanDistance, 0);
+ }
+ }
+ else
+ {
+ writeHeader(scanDistance, pending.atIndex);
+ int maxCheckpointMatchCount = pending.filter(scanDistance, lastIndex);
+ int listIndex = writeList(pending);
+ if (withLinks)
+ pending.setupLinkChain(tenured, listIndex, listCount);
+ maxScanAndCheckpointMatches = Math.max(maxScanAndCheckpointMatches, scanDistance + maxCheckpointMatchCount);
+ }
+
+ savePendingCheckpointAndResetScanDistance(nextCheckpointIndex);
+ }
+
+ private void savePendingCheckpointAndResetScanDistance(int checkpointIndex)
+ {
+ // use the tail of checkpointListBuf to buffer ranges we plan to tenure
+ ensureCapacity(tenured.count() + scan.watermark());
+
+ scan.reset();
+
+ if (isAccurate)
+ {
+ // TODO (low priority, efficiency): we can shift back the existing scanDistance if it's far enough from
+ // the next checkpoint. this might permit us to skip some comparisons
+ scan.resetPeakMax(tenured);
+ for (Tenured<Ranges, Range, RoutingKey> tenured : this.tenured)
+ {
+ int distanceToEnd = (tenured.lastIndex - checkpointIndex);
+ if (distanceToEnd >= scan.peakMax)
+ break;
+
+ int scanDistance = tenured.lastIndex - tenured.index;
+ if (scanDistance <= scan.peakMax)
+ scan.updateScanDistance(tenured.index, scanDistance, null);
+ }
+
+ if (scan.watermark() < scan.goal)
+ {
+ int ri = Scan.minScanIndex(checkpointIndex, scan.goal);
+ while (ri < checkpointIndex)
+ {
+ RoutingKey end = accessor.end(ranges, ri);
+ int scanLimit = scanLimit(ri, scan.peakMax);
+ if (!shouldTenure(end, scanLimit))
+ scan.update(end, ri, ranges, scanLimit, null);
+ ++ri;
+ }
+ }
+ }
+ else
+ {
+ // the maximum scan distance that could ever have been adopted for last chunk
+ int oldPeakMax = scan.peakMax();
+ // the minimum scan distance we will start with for processing the proceeding ranges
+ // note: this may increase if we decide to tenure additional ranges, at which point it will be the actual newPeakMax
+ int newMinPeakMax = scan.newPeakMax(tenured);
+ int minUntenuredIndex = scan.minUntenuredIndex(checkpointIndex, tenured);
+ int minScanIndex = Scan.minScanIndex(checkpointIndex, newMinPeakMax);
+
+ // we now make sure tenured and scan are correct for the new parameters.
+ // 1) if our peakMax is lower then we need to go back and find items to tenure that we previously marked for scanning
+ // 2) we must also reset our scan distances
+
+ // since our peakMax is determined by tenured.count(), but we are tenuring items here we keep things simple
+ // and do not account for those items we tenure but would later permit to scan as our peakMax grows
+
+ int ri = Math.min(minUntenuredIndex, minScanIndex);
+ while (ri < checkpointIndex)
+ {
+ RoutingKey end = accessor.end(ranges, ri);
+ int newPeakMax = scan.newPeakMax(tenured);
+ int scanLimit = scanLimit(ri, newPeakMax);
+ if (shouldTenure(end, scanLimit))
+ {
+ // note: might have already been tenured
+ // in this case our untenureLimit may be incorrect, but we won't use it
+ if (ri >= minUntenuredIndex && newPeakMax < oldPeakMax)
+ tenured.tenure(end, ri, ranges, scanLimit + 1, scanLimit(ri, oldPeakMax));
+ }
+ else
+ {
+ // this might effectively remove a previously tenured item
+ scan.update(end, ri, ranges, scanLimit, null);
+ }
+ ++ri;
+ }
+
+ scan.resetPeakMax(tenured);
+ }
+
+ pending.atIndex = checkpointIndex;
+ pending.clear();
+ tenured.rollover(pending);
+ }
+
+ private int extendScanDistance(int lastIndex, int scanDistance)
+ {
+ // now we've established our lower bound on scan distance, see how many checkpoints we can remove
+ // by increasing our scan distance so that it remains proportional to the number of results returned
+ // TODO (low priority, efficiency): can reduce cost here by using scanDistances array for upper bounds to scan distance
+ int maxScanDistance = scan.goal() + 2 * Math.min(tenured.count(), tenured.countAtPrevCheckpoint());
+ if (maxScanDistance >= 1 + scanDistance + scanDistance/4 && pending.count() >= (maxScanDistance - scanDistance)/2)
+ {
+ int removeCount = 0;
+ int extendedScanDistance = scanDistance;
+ int target = (maxScanDistance - scanDistance)/2;
+ for (int i = 0 ; i < pending.count() ; ++i)
+ {
+ Tenured<Ranges, Range, RoutingKey> t = pending.get(i);
+ if (t.index < 0)
+ continue;
+
+ int distance = Math.min(lastIndex, t.lastIndex) - t.index;
+ if (distance <= scanDistance)
+ continue; // already scanned or untenured
+
+ if (distance <= maxScanDistance)
+ {
+ ++removeCount;
+ extendedScanDistance = Math.max(extendedScanDistance, distance);
+ if (extendedScanDistance == maxScanDistance && removeCount >= target)
+ break;
+ }
+ }
+
+ // TODO (low priority, efficiency): should perhaps also gate this decision on the span we're covering
+ // algorithmically, however, so long as we are under maxScanDistance we are fine
+ if (removeCount >= (extendedScanDistance - scanDistance)/2)
+ scanDistance = extendedScanDistance;
+ }
+ return scanDistance;
+ }
+
+ int writeList(PendingCheckpoint<Ranges, Range, RoutingKey> pending)
+ {
+ int startIndex = listCount;
+ for (int i = pending.count() - 1 ; i >= 0 ; --i)
+ {
+ Tenured<Ranges, Range, RoutingKey> t = pending.get(i);
+ if (t.index >= 0)
+ {
+ lists[listCount++] = t.index;
+ }
+ else
+ {
+ int index = t.index & ~BIT31;
+ int length = t.linkLength & ~BIT31;
+ if (length <= 0xff && index <= 0xfffff)
+ {
+ lists[listCount++] = BIT31 | BIT30 | (length << 20) | index;
+ }
+ else if (t.linkLength >= 0 && length < BIT30)
+ {
+ lists[listCount++] = BIT31 | BIT29 | index;
+ }
+ else
+ {
+ lists[listCount++] = BIT31 | length;
+ lists[listCount++] = BIT31 | pending.count();
+ }
+ }
+ }
+ return startIndex;
+ }
+
+ void writeHeader(int scanDistance, int lowerBound)
+ {
+ int headerScanDistance = Math.min(scanDistance, MAX_SCAN_DISTANCE);
+
+ if ((checkpointCount & 3) == 0)
+ headers[headerPointer++] = headerScanDistance;
+ else
+ headers[headerPointer - (1 + (checkpointCount & 3))] |= headerScanDistance << (8 * (checkpointCount & 3));
+
+ bounds[checkpointCount++] = lowerBound;
+ headers[headerPointer++] = listCount;
+
+ if (scanDistance >= MAX_SCAN_DISTANCE)
+ lists[listCount++] = -scanDistance; // serialize as a negative value so we ignore it in most cases automatically
+ }
+
+ void closeHeaders()
+ {
+ // write our final checkpoint header
+ if ((checkpointCount & 3) == 0) headers[headerPointer++] = 0;
+ headers[headerPointer++] = listCount;
+ }
+
+ void ensureCapacity(int maxPendingSize)
+ {
+ if (listCount + maxPendingSize >= lists.length)
+ lists = cachedInts().resize(lists, listCount, lists.length + lists.length/2 + maxPendingSize);
+ }
+
+ static class Scan<Ranges, Range, RoutingKey>
+ {
+ final Accessor<Ranges, Range, RoutingKey> accessor;
+ /** the scan distance we are aiming for; should be proportional to log2(N) */
+ int goal;
+
+ /** the indexes at which we increased the scan distance, and the new scan distance */
+ int[] distances = new int[16];
+ /** the number of unique scan distances we have adopted since the last checkpoint */
+ int count;
+ /** the highest scan distance we have adopted (==scanDistance(scanDistanceCount-1)) */
+ int watermark;
+ /**
+ * the first index we have tenured a range from, but for which we did not immediately write a new checkpoint
+ * we *must* scan at least from the last index in the checkpoint to here
+ */
+ int scanTenuredAtIndex = -1;
+
+ /** The maximum (i.e. initial) scan distance limit we have used since the last attempted checkpoint write */
+ int peakMax;
+
+ Scan(Accessor<Ranges, Range, RoutingKey> accessor)
+ {
+ this.accessor = accessor;
+ }
+
+ void init(int goalScanDistance)
+ {
+ goal = peakMax = goalScanDistance;
+ }
+
+ private void update(RoutingKey end, int atIndex, Ranges ranges, int scanLimit, CheckpointIntervalArrayBuilder<Ranges, Range, RoutingKey> checkpoint)
+ {
+ int newScanDistance = find(end, atIndex, ranges, scanLimit, watermark);
+ updateScanDistance(atIndex, newScanDistance, checkpoint);
+ }
+
+ private void updateScanDistance(int atIndex, int newScanDistance, CheckpointIntervalArrayBuilder<Ranges, Range, RoutingKey> checkpoint)
+ {
+ if (newScanDistance > watermark)
+ {
+ // TODO (desired, efficiency): we don't mind slight increases to the watermark;
+ // should really look at scan distance history and ensure we haven't e.g. doubled since
+ // some earlier point (and should track the match count + scan distance at each bump
+ // to check overall work hasn't increased too much)
+ if (checkpoint != null && checkpoint.canWriteCheckpoint(atIndex))
+ checkpoint.writeCheckpoint(atIndex);
+
+ watermark = newScanDistance;
+ if (count * 2 == distances.length)
+ distances = Arrays.copyOf(distances, distances.length * 2);
+ distances[count * 2] = newScanDistance;
+ distances[count * 2 + 1] = atIndex;
+ ++count;
+ }
+ }
+
+ private int find(RoutingKey end, int atIndex, Ranges ranges, int scanLimit, int currentScanDistance)
+ {
+ var c = accessor.keyComparator();
+ int lowerIndex = accessor.binarySearch(ranges, atIndex + currentScanDistance, scanLimit, end, (e, s) -> c.compare(e, accessor.start(s)), CEIL);
+ if (lowerIndex < 0) lowerIndex = -2 - lowerIndex;
+ else lowerIndex -= 1;
+ return lowerIndex - atIndex;
+ }
+
+ boolean isAboveGoal()
+ {
+ return watermark > goal;
+ }
+
+ int watermark()
+ {
+ return watermark;
+ }
+
+ int goal()
+ {
+ return goal;
+ }
+
+ int distanceToTenured(int lastIndex)
+ {
+ return scanTenuredAtIndex >= 0 ? lastIndex - scanTenuredAtIndex : 0;
+ }
+
+ boolean mustCheckpointToScanTenured(int checkpointIndex, int maxScanDistance)
+ {
+ return scanTenuredAtIndex >= 0 && checkpointIndex - scanTenuredAtIndex >= maxScanDistance;
+ }
+
+ /**
+ * Are we scanning a much longer distance than the minimum number of matches we know a query will return?
+ * Note: with Strategy.FAST, {@code tenured.count()} gets less accurate as scan distance increases, so this
+ * will bounce around triggering checkpoints due to the larger scan distance, resetting the scan distance
+ * and starting again
+ */
+ boolean hasMaybeDivergedFromMatchSize(TenuredSet<Ranges, Range, RoutingKey> tenured)
+ {
+ return isAboveGoal() && tenured.count() < watermark()/2;
+ }
+
+ private int distance(int i)
+ {
+ return distances[i*2];
+ }
+
+ private int index(int i)
+ {
+ return distances[i*2+1];
+ }
+
+ int finalise(int lastIndex)
+ {
+ Invariants.checkState(distanceToTenured(lastIndex) <= Math.max(watermark(), peakMax()));
+
+ int scanDistance = watermark;
+ // then, compute the minimum scan distance implied by any tenured ranges we did not immediately
+ // write a checkpoint for - we *must* scan back as far as this record
+ int minScanDistance = scanTenuredAtIndex >= 0 ? lastIndex - scanTenuredAtIndex : 0;
+ if (minScanDistance > scanDistance)
+ {
+ // if this minimum is larger than the largest scan distance we picked up for non-tenured ranges
+ // then we are done, as there's nothing we can save
+ scanDistance = minScanDistance;
+ }
+ else if (scanDistance > 0)
+ {
+ // otherwise, we can look to see if any of the scan distances we computed overflow the checkpoint,
+ // i.e. where no records served by this checkpoint need to scan the full distance to reach it
+ int distanceToLastScanIndex = lastIndex - index(count -1);
+ // if the distance to the last scan index is larger than its scan distance, we have overflowed;
+ if (distanceToLastScanIndex < scanDistance)
+ {
+ minScanDistance = Math.max(distanceToLastScanIndex, minScanDistance);
+ // loop until we find one that doesn't overflow, as this is another minimum scan distance
+ int i = count - 1;
+ while (--i >= 0)
+ {
+ int distance = lastIndex - index(i);
+ if (distance >= distance(i)) break;
+ else if (distance > minScanDistance) minScanDistance = distance;
+ }
+ if (i >= 0) scanDistance = Math.max(minScanDistance, distance(i));
+ else scanDistance = minScanDistance;
+ }
+ }
+
+ return scanDistance;
+ }
+
+ void reset()
+ {
+ // we could in theory reset our scan distance using the contents of scanDistance[]
+ // but it's a bit complicated, as we want to have the first item to increment the scan distance
+ // so that we can use it in writeScanDistance to shrink the scan distance;
+ // jumping straight to the highest scan distance breaks this
+ count = 0;
+ scanTenuredAtIndex = -1;
+ watermark = 0;
+ }
+
+ void resetPeakMax(TenuredSet<Ranges, Range, RoutingKey> tenured)
+ {
+ peakMax = newPeakMax(tenured);
+ }
+
+ int peakMax()
+ {
+ return peakMax;
+ }
+
+ int newPeakMax(TenuredSet<Ranges, Range, RoutingKey> tenured)
+ {
+ return Math.max(goal, tenured.count());
+ }
+
+ /**
+ * The minimum index containing a range that might need to be tenured, if we have a smaller max scan distance than before
+ */
+ int minUntenuredIndex(int checkpointIndex, TenuredSet<Ranges, Range, RoutingKey> tenured)
+ {
+ int minUntenuredIndex = Math.max(0, (checkpointIndex - 1) - watermark());
+ // the maximum scan distance that cxould ever have been adopted for the ranges processed since last checkpoint
+ int oldPeakMax = peakMax;
+ int newMinPeakMax = newPeakMax(tenured);
+ if (newMinPeakMax < oldPeakMax)
+ {
+ // minimise range we unnecessarily re-tenure over
+ // TODO (low priority, efficiency): see if can also use to reduce range we re-scan e.g. can recycle
+ // scanDistances contents if we know we won't need to step back further at next checkpoint
+ for (int i = count - 1; i >= 0 ; --i)
+ {
+ if (index(i) <= minUntenuredIndex)
+ break;
+ if (distance(i) <= newMinPeakMax)
+ return i + 1 == count ? index(i) : index(i + 1) - 1;
+ }
+ }
+ return minUntenuredIndex;
+ }
+
+ /**
+ * Record that a range at this index has been tenured, so that we can track how far back
+ * we need to scan to determine how long we can defer writing a new checkpoint while still
+ * being able to scan it.
+ *
+ * TODO (low priority, efficiency): when a checkpoint is written, we should consider moving it
+ * earlier if the scan distance is increased primarily because of this index, and the tenured
+ * collection is otherwise unchanged (so can be written with minimal overhead)
+ */
+ void tenured(int atIndex)
+ {
+ if (scanTenuredAtIndex < 0)
+ scanTenuredAtIndex = atIndex;
+ }
+
+ static int minScanIndex(int checkpointIndex, int scanDistance)
+ {
+ return Math.max(0, (checkpointIndex - 1) - scanDistance);
+ }
+
+ @Override
+ public String toString()
+ {
+ return "Scan{watermark=" + watermark + ", tenured=" + scanTenuredAtIndex + '}';
+ }
+ }
+
+ /**
+ * Record-keeping for a range we have decided is not scannable
+ */
+ static class Tenured<Ranges, Range, RoutingKey> implements Comparable<Tenured<Ranges, Range, RoutingKey>>
+ {
+ final Accessor<Ranges, Range, RoutingKey> accessor;
+ /**
+ * The end of the tenured range covered by the contents referred to be {@link #index}
+ */
+ RoutingKey end;
+
+ /**
+ * <ul>
+ * <li>If positive, this points to {@code ranges[index]}</li>
+ * <li>If negative, this points to an entry in {@link #lists};
+ * see {@link SearchableRangeList#checkpointLists}</li>
+ * </ul>
+ */
+ int index;
+
+ /**
+ * The last index in {@link #ranges} covered by this tenured range
+ */
+ int lastIndex;
+
+ /**
+ * set when this record is serialized in a checkpoint list to either:
+ * <ul>
+ * <li>point to itself, in which case no action should be
+ * taken on removal (it is only retained for size bookkeeping); or</li>
+ * <li>point to the next item in the checkpoint list; the first
+ * such element removed triggers the clearing of the checkpoint
+ * list so that its entries are re-inserted in the next checkpoint</li>
+ * </ul>
+ */
+ Tenured<Ranges, Range, RoutingKey> next;
+
+ /**
+ * Only set for link entries, i.e. where {@code index < 0}.
+ * <ul>
+ * <li>if positive, the length is optional as we will terminate safely using the end bound filtering</li>
+ * <li>if negative, the low 31 bits <b>must</b> be retrieved as the length for safe iteration</li>
+ * </ul>
+ */
+ int linkLength;
+
+ Tenured(Accessor<Ranges, Range, RoutingKey> accessor, RoutingKey end, int index)
+ {
+ this.accessor = accessor;
+ this.end = end;
+ this.index = index;
+ }
+
+ @Override
+ public int compareTo(@Nonnull Tenured<Ranges, Range, RoutingKey> that)
+ {
+ int c = accessor.keyComparator().compare(this.end, that.end);
+ // we sort indexes in reverse order so later tenured items find the earlier ones with same end when searching
+ // for higher entries for the range of indexes to search, and
+ if (c == 0) c = -Integer.compare(this.index, that.index);
+ return c;
+ }
+
+ @Override
+ public String toString()
+ {
+ return "Tenured{end=" + end + ", index=" + index + '}';
+ }
+ }
+
+ /**
+ * The set of ranges that we intend to write to checkpoints that remain open at the current point in the iteration
+ * This collection may be filtered before serialization, but every member will be visited either by scanning
+ * or visiting the checkpoint list
+ * TODO (low priority, efficiency): save garbage by using an insertion-sorted array for collections where
+ * this is sufficient. later, introduce a mutable b-tree supporting object recycling. we would also like
+ * to use a collection that permits us to insert and return a finger into the tree so we can find the
+ * successor as part of insertion, and that permits constant-time first() calls
+ */
+ static class TenuredSet<Ranges, Range, RoutingKey> extends TreeSet<Tenured<Ranges, Range, RoutingKey>>
+ {
+ final Accessor<Ranges, Range, RoutingKey> accessor;
+ /**
+ * the number of direct tenured entries (i.e. ignoring link entries)
+ * this is used to provide a minimum bound on the number of results a range query can return
+ * note: with Strategy.FAST this gets less accurate as the span distance increases
+ */
+ int directCount;
+ int directCountAtPrevCheckpoint;
+ int minSpan;
+
+ // a stack of recently used EndAndIndex objects - used only for the duration of a single build
+ Tenured<Ranges, Range, RoutingKey> reuse, pendingReuse, pendingReuseTail;
+
+ TenuredSet(Accessor<Ranges, Range, RoutingKey> accessor)
+ {
+ this.accessor = accessor;
+ }
+
+ int count()
+ {
+ return directCount;
+ }
+
+ int countAtPrevCheckpoint()
+ {
+ return directCountAtPrevCheckpoint;
+ }
+
+ /**
+ * We require a checkpoint to cover a distance at least as large as the number of tenured ranges leftover
+ * since the prior checkpoint, to ensure these require at most linear additional space, while not requiring
+ * more than O(k) additional complexity on search (i.e., we will scan a number of elements at most equal
+ * to the number we have to visit in the checkpoint).
+ *
+ * We achieve this by recording the minimum number of match results as of the prior checkpoint (i.e. {@link #count()})
+ * and discounting it by one each time we untenure a range, so that for each tenured range from the prior checkpoint
+ * we have either untenured a range or processed at least one additional input.
+ */
+ int minimumSpan()
+ {
+ return minSpan;
+ }
+
+ private int tenure(RoutingKey end, int index, Ranges ranges, int minUntenureIndex)
+ {
+ return tenure(newTenured(end, index), ranges, minUntenureIndex, accessor.size(ranges));
+ }
+
+ private void tenure(RoutingKey end, int index, Ranges ranges, int minUntenureIndex, int untenureLimit)
+ {
+ tenure(newTenured(end, index), ranges, minUntenureIndex, untenureLimit);
+ }
+
+ private int tenure(Tenured<Ranges, Range, RoutingKey> tenure, Ranges ranges, int untenureMinIndex, int untenureLimit)
+ {
+ if (!add(tenure))
+ return tenure.lastIndex;
+
+ Tenured<Ranges, Range, RoutingKey> next = higher(tenure);
+ if (next != null)
+ untenureLimit = Math.min(untenureLimit, next.lastIndex + 1);
+ var c = accessor.keyComparator();
+ int untenureIndex = accessor.binarySearch(ranges, untenureMinIndex, untenureLimit, tenure.end, (e, s) -> c.compare(e, accessor.start(s)), CEIL);
+ if (untenureIndex < 0) untenureIndex = -1 - untenureIndex;
+ tenure.lastIndex = untenureIndex - 1;
+ Invariants.checkState(c.compare(tenure.end, accessor.start(ranges, tenure.lastIndex)) > 0);
+ Invariants.checkState(tenure.lastIndex + 1 == accessor.size(ranges) || c.compare(tenure.end, accessor.start(ranges, tenure.lastIndex + 1)) <= 0);
+ ++directCount;
+ return untenureIndex - 1;
+ }
+
+ private Tenured<Ranges, Range, RoutingKey> newTenured(RoutingKey end, int index)
+ {
+ Tenured<Ranges, Range, RoutingKey> result = reuse;
+ if (result == null)
+ return new Tenured<>(accessor, end, index);
+
+ reuse = result.next;
+ result.end = end;
+ result.index = index;
+ result.lastIndex = 0;
+ result.next = null;
+ return result;
+ }
+
+ private Tenured<Ranges, Range, RoutingKey> addLinkEntry(RoutingKey end, int index, int lastIndex, int length)
+ {
+ Invariants.checkArgument(index < 0);
+ Tenured<Ranges, Range, RoutingKey> result = newTenured(end, index);
+ result.linkLength = length;
+ result.lastIndex = lastIndex;
+ add(result);
+ return result;
+ }
+
+ /**
+ * Retire any active tenured ranges that no longer cover the pointer into ranges;
+ * if this crosses our checkpoint threshold, write a new checkpoint.
+ */
+ void untenure(int index)
+ {
+ while (!isEmpty() && first().lastIndex < index)
+ {
+ Tenured<Ranges, Range, RoutingKey> removed = pollFirst();
+
+ // if removed.next == null, this is not referenced by a link
+ // if removed.next == removed, it is referenced by a link but does not modify the link on removal
+ if (removed.next != null && removed.next != removed)
+ {
+ // this is a member of a link's chain, which may serve one of two purposes:
+ // 1) it may be the entry nominated to invalidate the link, due to the link
+ // membership shrinking below the required threshold; in which case we
+ // must clear the chain to reactivate its members for insertion into the
+ // next checkpoint, and remove the chain link itself
+ // 2) it may be nominated as an entry to update the chain link info, to make
+ // it more succinct: if every entry of the chain remains active, and there
+ // are *many* entries then we need two integers to represent the chain, but
+ // as soon as any entry is invalid we can rely on this entry to terminate
+ // iteration, so we update the bookkeeping on the first entry we remove in
+ // this case
+
+ // first clear the chain starting at the removed entry
+ Tenured<Ranges, Range, RoutingKey> prev = removed, next = removed.next;
+ while (next.next != null)
+ {
+ prev = next;
+ next = next.next;
+ prev.next = null;
+ }
+ Invariants.checkState(next.index < 0);
+ if (prev.end == next.end)
+ {
+ // if this is the last entry in the link, the link is expired and should be removed/reused
+ remove(next);
+ if (pendingReuseTail == null)
+ pendingReuseTail = next;
+ next.next = pendingReuse;
+ pendingReuse = next;
+ }
+ else if (next.linkLength < 0)
+ {
+ // otherwise, flag the link as safely consumed without knowing the length
+ next.linkLength = next.linkLength & Integer.MAX_VALUE;
+ }
+ }
+
+ // this was not a link reference; update our bookkeeping and save it for reuse
+ Invariants.checkState(removed.index >= 0);
+ --directCount;
+ --minSpan;
+ if (pendingReuseTail == null)
+ pendingReuseTail = removed;
+ removed.next = pendingReuse;
+ pendingReuse = removed;
+ }
+ }
+
+ /**
+ * Write out any direct entries that are not pointed to by a chain entry, and any chain entries;
+ * rollover any per-checkpoint data and free up for reuse discarded Tenured objects
+ */
+ void rollover(PendingCheckpoint<Ranges, Range, RoutingKey> pending)
+ {
+ for (Tenured<Ranges, Range, RoutingKey> tenured : this)
+ {
+ if (tenured.next == null)
+ pending.add(tenured);
+ }
+ // make freed Tenured objects available for reuse
+ if (pendingReuse != null)
+ {
+ pendingReuseTail.next = reuse;
+ reuse = pendingReuse;
+ pendingReuseTail = pendingReuse = null;
+ }
+ directCountAtPrevCheckpoint = minSpan = directCount;
+ }
+ }
+
+ /**
+ * we write checkpoints out before knowing the scan distance needed for the range, as a checkpoint precedes
+ * the ranges it covers; so we record the position and contents of the checkpoint, and once the scan distance is
+ * known (i.e. when the next checkpoint is written) we re-process the list to remove items we can now scan before
+ * serializing to checkpointListsBuf.
+ */
+ static class PendingCheckpoint<Ranges, Range, RoutingKey>
+ {
+ int atIndex = -1;
+ int count;
+
+ Tenured<Ranges, Range, RoutingKey>[] contents = new Tenured[10];
+
+ int openDirectCount, firstOpenDirect, openIndirectCount;
+ boolean hasClosedDirect;
+
+ int count()
+ {
+ return count;
+ }
+
+ Tenured<Ranges, Range, RoutingKey> get(int i)
+ {
+ return contents[i];
+ }
+
+ void add(Tenured<Ranges, Range, RoutingKey> tenured)
+ {
+ if (contents.length == count)
+ contents = Arrays.copyOf(contents, 2 * contents.length);
+ contents[count++] = tenured;
+ }
+
+ void clear()
+ {
+ count = 0;
+ }
+
+ /**
+ * Remove pending entries that will be scanned by the scanDistance, and update
+ * our bookkeeping for creating links
+ */
+ int filter(int scanDistance, int lastIndex)
+ {
+ int matchCountModifier = 0;
+ int maxi = count;
+ count = 0;
+ openDirectCount = 0;
+ openIndirectCount = 0;
+ firstOpenDirect = -1;
+// lastClosedDirect = -1;
+
+ for (int i = 0; i < maxi ; ++i)
+ {
+ Tenured<Ranges, Range, RoutingKey> t = get(i);
+ if (t.index >= 0)
+ {
+ if (t.index + scanDistance >= lastIndex)
+ continue; // last index will find it with a scan
+
+ if (t.lastIndex <= t.index + scanDistance)
+ continue; // all indexes will find it with a scan
+
+ if (t.lastIndex > lastIndex)
+ {
+ // this range remains open for the next checkpoint;
+ // we may want to reference this list from there
+ // so track count and position of first one to make a determination
+ ++openDirectCount;
+ if (firstOpenDirect < 0) firstOpenDirect = count;
+ }
+ else hasClosedDirect = true;
+ }
+ else
+ {
+ // note: we over count here, as we count pointers within the chain
+ matchCountModifier += (t.linkLength & Integer.MAX_VALUE) - 1; // (subtract 1 to discount the pointer)
+ if (t.lastIndex > lastIndex)
+ ++openIndirectCount;
+ }
+
+ if (i == count) ++count;
+ else contents[count++] = t;
+ }
+
+ return count + matchCountModifier;
+ }
+
+ /**
+ * Setup a link for referencing this chain later, if permitted.
+ * Must have at least two items, and at least as many direct records as indirect
+ */
+ void setupLinkChain(TenuredSet<Ranges, Range, RoutingKey> tenured, int startIndex, int endIndex)
+ {
+ int minSizeToReference = openIndirectCount + MIN_INDIRECT_LINK_LENGTH;
+ if (openDirectCount >= minSizeToReference)
+ {
+ int i = firstOpenDirect;
+ Tenured<Ranges, Range, RoutingKey> prev = get(i++);
+
+ while (openDirectCount > minSizeToReference)
+ {
+ Tenured<Ranges, Range, RoutingKey> e = get(i++);
+ if (e.index < 0)
+ {
+ --minSizeToReference;
+ continue;
+ }
+
+ Invariants.checkState(prev.next == null);
+ prev.next = prev;
+ prev = e;
+ --openDirectCount;
+ }
+
+ while (i < count)
+ {
+ Tenured<Ranges, Range, RoutingKey> next = get(i++);
+ if (next.index < 0)
+ continue;
+
+ Invariants.checkState(prev.next == null);
+ prev.next = next;
+ prev = next;
+ }
+
+ // may be more than one entry per item (though usually not)
+ int length = endIndex - startIndex;
+ Tenured<Ranges, Range, RoutingKey> chainEntry = tenured.addLinkEntry(prev.end, BIT31 | startIndex, prev.lastIndex, length);
+ prev.next = chainEntry;
+ if (hasClosedDirect && (startIndex > 0xfffff || (length > 0xff)))
+ {
+ // TODO (expected, testing): make sure this is tested, as not a common code path (may never be executed in normal operation)
+ // we have no closed ranges so iteration needs to know the end bound, but we cannot encode our bounds cheaply
+ // so link the first bound to the chain entry, so that on removal it triggers an update of endIndex to note
+ // that it can be iterated safely without an end bound
+ get(firstOpenDirect).next = chainEntry;
+ }
+ }
+ }
+
+ @Override
+ public String toString()
+ {
+ return Arrays.stream(contents, 0, count)
+ .map(Objects::toString)
+ .collect(Collectors.joining(",", "[", "]"));
+ }
+ }
+}
diff --git a/accord-core/src/main/java/accord/utils/RandomSource.java b/accord-core/src/main/java/accord/utils/RandomSource.java
index 067b872..ed971aa 100644
--- a/accord-core/src/main/java/accord/utils/RandomSource.java
+++ b/accord-core/src/main/java/accord/utils/RandomSource.java
@@ -29,6 +29,8 @@
import java.util.function.LongSupplier;
import java.util.function.Supplier;
+import com.google.common.collect.Iterables;
+
import accord.utils.random.Picker;
// TODO (expected): merge with C* RandomSource
@@ -286,7 +288,13 @@
return array[nextInt(offset, offset + length)];
}
- default <T extends Comparable<T>> T pick(Set<T> set)
+ default <T> T pick(NavigableSet<T> set)
+ {
+ int offset = nextInt(0, set.size());
+ return Iterables.get(set, offset);
+ }
+
+ default <T extends Comparable<? super T>> T pick(Set<T> set)
{
List<T> values = new ArrayList<>(set);
// Non-ordered sets may have different iteration order on different environments, which would make a seed produce different histories!
diff --git a/accord-core/src/main/java/accord/utils/SearchableRangeList.java b/accord-core/src/main/java/accord/utils/SearchableRangeList.java
index 22aeb3a..65e4465 100644
--- a/accord-core/src/main/java/accord/utils/SearchableRangeList.java
+++ b/accord-core/src/main/java/accord/utils/SearchableRangeList.java
@@ -18,17 +18,14 @@
package accord.utils;
-import accord.api.RoutingKey;
import accord.primitives.Range;
import accord.primitives.RoutableKey;
-import accord.utils.SearchableRangeListBuilder.Links;
-import accord.utils.SearchableRangeListBuilder.Strategy;
+import accord.utils.CheckpointIntervalArrayBuilder.Links;
+import accord.utils.CheckpointIntervalArrayBuilder.Strategy;
import net.nicoulaj.compilecommand.annotations.Inline;
-import java.util.*;
-
-import static accord.utils.SearchableRangeListBuilder.Links.LINKS;
-import static accord.utils.SearchableRangeListBuilder.Strategy.ACCURATE;
+import static accord.utils.CheckpointIntervalArrayBuilder.Links.LINKS;
+import static accord.utils.CheckpointIntervalArrayBuilder.Strategy.ACCURATE;
import static accord.utils.SortedArrays.Search.*;
/**
@@ -79,104 +76,13 @@
* earlier checkpoints.
* </ul>
*/
-public class SearchableRangeList
+public class SearchableRangeList extends CheckpointIntervalArray<Range[], Range, RoutableKey>
{
- // scan distance can be kept very small as we guarantee to use at most linear extra space even with a scan distance of zero
- static final int MAX_SCAN_DISTANCE = 255;
- private static final int BIT30 = 0x40000000;
- private static final int BIT29 = 0x20000000;
-
private static final SearchableRangeList EMPTY_CHECKPOINTS = new SearchableRangeList(new Range[0], new int[0], new int[] { 0, 0 }, new int[0], 0);
- final Range[] ranges;
-
- /**
- * The lower bound for each checkpoint.
- * The checkpoint {@code i} applies to all ranges (incl) starting from {@code lowerBounds[i]},
- * but before (excl) {@code lowerBounds[i+1]}.
- */
- final int[] lowerBounds;
-
- /**
- * Logically one entry per checkpoint, mapping {@link #lowerBounds} to {@link #checkpointLists},
- * however we also encode an additional byte per entry representing the scan distance for the
- * ranges handled by this checkpoint. These are grouped into an integer per four mappings, i.e.
- * we encode batches of five ints, with the first int containing the four scan distances for the
- * next four checkpoints, and the following four ints containing the respective offsets into
- * {@link #checkpointLists}.
- * <p>
- * [0.........32b.........64b.........96b........128b........160b........192b]
- * [ d1 d2 d3 d4 mapping1 mapping2 mapping3 mapping4 d5 d6 d7 d8 ]
- */
- final int[] headers;
-
- /**
- * A list of indexes in {@link #ranges} contained by each checkpoint; checkpoints are
- * mapped from {@link #lowerBounds} by {@link #headers}.
- * <p>
- * Entries are sorted in descending order by the end of the range they cover, so that
- * a search of this collection my terminate as soon as it encounters a range that does
- * not cover the item we are searching for.
- * <p>
- * This collection may contain negative values, in which case these point to other
- * checkpoints, whose <i>direct</i> contents (i.e. the positive values of) we may
- * search.
- * <ul> if negative, points to an earlier checkpoint, and:
- * <li>if the 30th bit is set, the low 20 bits point to checkpointsList,
- * and the 9 bits in-between provide the length of the range</li>
- * <li>otherwise, if the 29th bit is set, the lower 29 bits points to checkpointsList,
- * and can be iterated safely without an endIndex</li>
- * <li>otherwise, the low 29 bits provide the length of the run, and the low 31 bits
- * of the following entry (which will also be negative) provide a pointer to
- * checkpointsList</li>
- * </ul>
- */
- final int[] checkpointLists;
-
- public final int maxScanAndCheckpointMatches;
-
- SearchableRangeList(Range[] ranges, int[] lowerBounds, int[] headers, int[] checkpointLists, int maxScanAndCheckpointMatches)
+ public SearchableRangeList(Range[] ranges, int[] lowerBounds, int[] headers, int[] checkpointLists, int maxScanAndCheckpointMatches)
{
- this.ranges = ranges;
- this.lowerBounds = lowerBounds;
- this.headers = headers;
- this.checkpointLists = checkpointLists;
- this.maxScanAndCheckpointMatches = maxScanAndCheckpointMatches;
- }
-
- @Inline
- public <P1, P2, P3, P4> int forEach(Range range, IndexedQuadConsumer<P1, P2, P3, P4> forEachScanOrCheckpoint, IndexedRangeQuadConsumer<P1, P2, P3, P4> forEachRange, P1 p1, P2 p2, P3 p3, P4 p4, int minIndex)
- {
- return forEach(range.start(), range.end(), forEachScanOrCheckpoint, forEachRange, p1, p2, p3, p4, minIndex);
- }
-
- public <P1, P2, P3, P4> int forEach(RoutingKey startKey, RoutingKey endKey, IndexedQuadConsumer<P1, P2, P3, P4> forEachScanOrCheckpoint, IndexedRangeQuadConsumer<P1, P2, P3, P4> forEachRange, P1 p1, P2 p2, P3 p3, P4 p4, int minIndex)
- {
- if (ranges.length == 0 || minIndex == ranges.length)
- return minIndex;
-
- int end = SortedArrays.binarySearch(ranges, minIndex, ranges.length, endKey, (a, b) -> a.compareTo(b.start()), CEIL);
- if (end < 0) end = -1 - end;
- if (end <= minIndex) return minIndex;
-
- int floor = SortedArrays.binarySearch(ranges, minIndex, ranges.length, startKey, (a, b) -> a.compareTo(b.start()), CEIL);
- int start = floor;
- if (floor < 0)
- {
- // if there's no precise match on start, step backwards;
- // if this range does not overlap us, step forwards again for start
- // but retain the floor index for performing scan and checkpoint searches from
- // as this contains all ranges that might overlap us (whereas those that end
- // after us but before the next range's start would be missed by the next range index)
- start = floor = -2 - floor;
- if (start < 0)
- start = floor = 0;
- else if (ranges[start].end().compareTo(startKey) <= 0)
- ++start;
- }
-
- // Since endInclusive() != startInclusive(), so no need to adjust start/end comparisons
- return forEach(start, end, floor, startKey, 0, forEachScanOrCheckpoint, forEachRange, p1, p2, p3, p4, minIndex);
+ super(SearchableRangeListBuilder.RANGE_ACCESSOR, ranges, lowerBounds, headers, checkpointLists, maxScanAndCheckpointMatches);
}
@Inline
@@ -209,95 +115,6 @@
return forEach(start, end, floor, key, bound, forEachScanOrCheckpoint, forEachRange, p1, p2, p3, p4, minIndex);
}
- @Inline
- private <P1, P2, P3, P4> int forEach(int start, int end, int floor, RoutableKey startBound, int cmpStartBoundWithEnd,
- IndexedQuadConsumer<P1, P2, P3, P4> forEachScanOrCheckpoint, IndexedRangeQuadConsumer<P1, P2, P3, P4> forEachRange,
- P1 p1, P2 p2, P3 p3, P4 p4, int minIndex)
- {
- if (start < minIndex) start = minIndex;
-
- // find the checkpoint array, so we know how far to step back
- int checkpoint = Arrays.binarySearch(lowerBounds, floor);
- if (checkpoint < 0) checkpoint = -2 - checkpoint;
- if (checkpoint < 0) return end;
-
- int headerBaseIndex = (checkpoint / 4) * 5;
- int headerSubIndex = checkpoint & 3;
- int headerListIndex = headerBaseIndex + 1 + headerSubIndex;
-
- int scanDistance = (headers[headerBaseIndex] >>> (8 * headerSubIndex)) & 0xff;
- int checkpointStart = headers[headerListIndex];
- int checkpointEnd = headers[headerListIndex + (headerSubIndex + 5)/4]; // skip the next header
-
- if (scanDistance == MAX_SCAN_DISTANCE)
- {
- scanDistance = -checkpointLists[checkpointStart++];
- Invariants.checkState(scanDistance >= MAX_SCAN_DISTANCE);
- }
-
- // NOTE: we visit in approximately ascending order, and this is a requirement for correctness of RangeDeps builders
- // Only the checkpoint is visited in uncertain order, but it is visited entirely, before the scan matches
- // and the range matches
- int minScanIndex = Math.max(floor - scanDistance, minIndex);
- for (int i = checkpointStart; i < checkpointEnd ; ++i)
- {
- int ri = checkpointLists[i];
- if (ri < 0)
- {
- int subStart, subEnd;
- if ((ri & BIT30) != 0)
- {
- subStart = ri & 0xfffff;
- subEnd = subStart + ((ri >>> 20) & 0x1ff);
- }
- else if ((ri & BIT29) != 0)
- {
- subStart = ri & 0x1fffffff;
- subEnd = Integer.MAX_VALUE;
- }
- else
- {
- int length = ri & 0x1fffffff;
- subStart = checkpointLists[++i];
- subEnd = subStart + length;
- }
-
- for (int j = subStart ; j < subEnd ; ++j)
- {
- ri = checkpointLists[j];
- if (ri < 0)
- continue;
-
- if (ranges[ri].end().compareTo(startBound) <= cmpStartBoundWithEnd)
- break;
-
- if (ri >= minIndex && ri < minScanIndex)
- forEachScanOrCheckpoint.accept(p1, p2, p3, p4, ri);
- }
- }
- else
- {
- // if startBound is key, we cannot be equal to it;
- // if startBound is a Range start, we also cannot be equal to it due to the requirement that
- // endInclusive() != startInclusive(), so equality really means inequality
- if (ranges[ri].end().compareTo(startBound) <= cmpStartBoundWithEnd)
- break;
-
- if (ri >= minIndex && ri < minScanIndex)
- forEachScanOrCheckpoint.accept(p1, p2, p3, p4, ri);
- }
- }
-
- for (int i = minScanIndex; i < floor ; ++i)
- {
- if (ranges[i].end().compareTo(startBound) > cmpStartBoundWithEnd)
- forEachScanOrCheckpoint.accept(p1, p2, p3, p4, i);
- }
-
- forEachRange.accept(p1, p2, p3, p4, start, end);
- return end;
- }
-
public static SearchableRangeList build(Range[] ranges)
{
if (ranges.length == 0)
diff --git a/accord-core/src/main/java/accord/utils/SearchableRangeListBuilder.java b/accord-core/src/main/java/accord/utils/SearchableRangeListBuilder.java
index 77d351a..546ee16 100644
--- a/accord-core/src/main/java/accord/utils/SearchableRangeListBuilder.java
+++ b/accord-core/src/main/java/accord/utils/SearchableRangeListBuilder.java
@@ -18,1075 +18,84 @@
package accord.utils;
-import accord.api.RoutingKey;
+import java.util.Comparator;
+
import accord.primitives.Range;
-import accord.utils.ArrayBuffers.IntBuffers;
+import accord.primitives.RoutableKey;
-import javax.annotation.Nonnull;
-import java.util.Arrays;
-import java.util.Objects;
-import java.util.TreeSet;
-import java.util.stream.Collectors;
-
-import static accord.utils.ArrayBuffers.cachedInts;
import static accord.utils.SearchableRangeList.MAX_SCAN_DISTANCE;
-import static accord.utils.SearchableRangeListBuilder.Links.LINKS;
-import static accord.utils.SearchableRangeListBuilder.Strategy.ACCURATE;
-import static accord.utils.SortedArrays.Search.CEIL;
/**
* Builder for {@link SearchableRangeList}
*/
-public class SearchableRangeListBuilder
+public class SearchableRangeListBuilder extends CheckpointIntervalArrayBuilder<Range[], Range, RoutableKey>
{
- public enum Strategy
+ public static final Accessor<Range[], Range, RoutableKey> RANGE_ACCESSOR = new Accessor<>()
{
- /**
- * Do not tenure any ranges that are scannable from the currently in-effect max scan distance.
- * This means we probably do less work on construction, but that our measure of the match count
- * at any point is inaccurate, and so our heuristics for when to write checkpoints may be wrong,
- * leading to more checkpoints than necessary.
- */
- FAST,
+ @Override
+ public int size(Range[] ranges)
+ {
+ return ranges.length;
+ }
- /**
- * Tenure every range covering more than goalScanDistance. Any within max scan distance will also
- * update the scan distance, so that they will be filtered when a checkpoint is written. But
- * in the meantime they permit accurate tracking of the number of matches a query can return,
- * permitting our complexity calculations (that determine when checkpoints should be written, and
- * what our maximum scan distance should be), to be accurate. This can avoid bouncing between
- * two extremes, where a low max scan distance tenures and correctly detects a desirable larger scan
- * distance, which we rollover and this prevents us tenuring and tracking the number of matches, so
- * that we then pick a low max scan distance (and thereby also write a new checkpoint)
- */
- ACCURATE
- }
+ @Override
+ public Range get(Range[] ranges, int index)
+ {
+ return ranges[index];
+ }
- /**
- * Should we maintain pointers to prior checkpoints that we may reference instead of reserializing
- * the remaining contents. This is cheap to visit as we stop enumerating as soon as we encounter
- * an entry that no longer covers us. We use some simple heuristics when deciding whether to do
- * this, namely that there are at least two entries (so we save one checkpoint entry) and that
- * there is at least one direct entry for each indirect/link entry in the range we will link.
- */
- public enum Links
- {
- LINKS,
- NO_LINKS
- }
+ @Override
+ public RoutableKey start(Range[] ranges, int index)
+ {
+ return ranges[index].start();
+ }
- private static final int BIT31 = 0x80000000;
- private static final int BIT30 = 0x40000000;
- private static final int BIT29 = 0x20000000;
- static final int MIN_INDIRECT_LINK_LENGTH = 2;
+ @Override
+ public RoutableKey start(Range range)
+ {
+ return range.start();
+ }
- final boolean isAccurate;
- final boolean withLinks;
- final Range[] ranges;
+ @Override
+ public RoutableKey end(Range[] ranges, int index)
+ {
+ return ranges[index].end();
+ }
- int[] bounds;
- int[] headers;
- int[] lists;
- int checkpointCount, headerPointer, listCount;
+ @Override
+ public RoutableKey end(Range range)
+ {
+ return range.end();
+ }
- final Scan scan = new Scan();
- final TenuredSet tenured = new TenuredSet();
- final PendingCheckpoint pending = new PendingCheckpoint();
+ @Override
+ public Comparator<RoutableKey> keyComparator()
+ {
+ return Comparator.naturalOrder();
+ }
- // track the maximum possible number of entries we can match with both a scan + checkpoint lookup
- // this is an over-estimate and may be used by consumers to allocate out-of-order buffers for visitations
- int maxScanAndCheckpointMatches;
+ @Override
+ public int binarySearch(Range[] ranges, int from, int to, RoutableKey find, AsymmetricComparator<RoutableKey, Range> comparator, SortedArrays.Search op)
+ {
+ return SortedArrays.binarySearch(ranges, from, to, find, comparator, op);
+ }
+ };
public SearchableRangeListBuilder(Range[] ranges, Strategy strategy, Links links)
{
- this(ranges, Math.min(MAX_SCAN_DISTANCE, 34 - Integer.numberOfLeadingZeros(ranges.length)), strategy, links);
+ super(RANGE_ACCESSOR, ranges, strategy, links);
}
public SearchableRangeListBuilder(Range[] ranges, int goalScanDistance, Strategy strategy, Links links)
{
- this.isAccurate = strategy == ACCURATE;
- this.withLinks = links == LINKS;
+ super(RANGE_ACCESSOR, ranges, goalScanDistance, strategy, links);
Invariants.checkArgument(goalScanDistance <= MAX_SCAN_DISTANCE);
- Invariants.checkArgument(goalScanDistance > 0);
- this.ranges = ranges;
- init(ranges, goalScanDistance);
}
- void init(Range[] ranges, int goalScanDistance)
- {
- // we write checkpoints at least goalScanDistance apart
- scan.init(goalScanDistance);
- IntBuffers cachedInts = cachedInts();
- // ask for int buffers in descending order of size
- this.lists = cachedInts.getInts(ranges.length); // this one might need to grow
- // +2 to round-up each division, and +2 to account for the final entry (which might require an empty scan distance header)
- this.headers = cachedInts.getInts(((ranges.length / goalScanDistance) * 5) / 4 + 4);
- this.bounds = cachedInts.getInts(ranges.length / goalScanDistance + 1);
- }
-
- /**
- * Walk over each range, looking ahead by {@link #maxScanDistance} to decide if a range should
- * be tenured (written to a checkpoint) or scanned; the maximum scan distance is determined by the
- * number of open tenured entries, i.e. the minimum number of results we can expect to be returned
- * (or, if greater, the logarithm of the number of ranges in the collection).
- * <p>
- * Once we encounter a range that should be tenured, either write a checkpoint immediately
- * or make a note of the position we must scan to from the last entry in this checkpoint
- * and wait until it is permitted to write a checkpoint. This range will be tenured either
- * way for the following checkpoint.
- * <p>
- * The only reason not to write a checkpoint immediately is in the case we would breach
- * our linear space complexity limit, which is imposed by ensuring we have a space between
- * checkpoints at least as large as the number of entries written to the last checkpoint,
- * discounted by the number of entries we have removed from the tenured collection since
- * the last checkpoint.
- */
+ @Override
public SearchableRangeList build()
{
- for (int ri = 0 ; ri < ranges.length ; ++ri)
- {
- // write a checkpoint if we meet our linear space complexity requirements
- // and we either have a tenured range that we must scan,
- // or the scan distance is now much larger than the minimum number of search results
- if (shouldWriteCheckpoint(ri))
- writeCheckpoint(ri);
-
- // either tenure or update scan distance, potentially writing a checkpoint
- tenureOrScan(ri);
- tenured.untenure(ri);
- }
-
- // write our final pending checkpoint
- writeCheckpoint(ranges.length);
- closeHeaders();
-
- IntBuffers cachedInts = cachedInts();
- int[] lists = cachedInts.completeAndDiscard(this.lists, listCount);
- int[] headers = cachedInts.completeAndDiscard(this.headers, headerPointer);
- int[] bounds = cachedInts.completeAndDiscard(this.bounds, checkpointCount);
- return new SearchableRangeList(ranges, bounds, headers, lists, maxScanAndCheckpointMatches);
- }
-
- /**
- * Categorise the candidateIdx as either scannable, and if so update the scan distance;
- * or unscannable, in which case add it to the {@link #tenured} collection.
- * Note, that in ACCURATE mode we tenure the item if it is outside of the goalScanDistance
- * so we may track O(k) accurately above the O(lg2(N)) search and default scan distance,
- * but we still update the scan distance so that the checkpoint will exclude this entry.
- */
- private void tenureOrScan(int index)
- {
- Invariants.checkArgument(index >= 0);
-
- // then either migrate the index to pendingTenured, or ensure it will be scanned
- RoutingKey end = ranges[index].end();
- int scanLimit = scanLimit(index, isAccurate ? scan.goal : maxScanDistance());
- if (shouldTenure(end, scanLimit))
- {
- int lastIndex = tenured.tenure(end, index, ranges, scanLimit + 1);
- if (lastIndex - index > maxScanDistance()) scan.tenured(index);
- else if (!isAccurate) throw new IllegalStateException();
- else scan.updateScanDistance(index, lastIndex - index, this);
- }
- else
- {
- // TODO (low priority, efficiency): if the prior checkpoint has a scan distance >= this one,
- // and <= 50% more than this one and there's no scanMustReachIndex nor tenuredRanges, don't
- // write a new checkpoint (perhaps split shouldWriteCheckpoint logic in two)
- scan.update(end, index, ranges, scanLimit, this);
- }
- }
-
- /**
- * We are forbidden from writing a checkpoint nearer than this to a prior checkpoint.
- * This imposes our linear space complexity bounds, while not harming our O(log2(N) + K)
- * complexity bounds, as we guarantee minimumSpan is never more than the number of query
- * results.
- */
- private int minimumSpan()
- {
- return Math.max(scan.goal(), tenured.minimumSpan());
- }
-
- private int maxScanDistance()
- {
- // minimumSpan() reduces overtime, but there is no reason to reduce our scan distance
- // for tenuring below the scan distance we will write
- return Math.max(scan.watermark(), minimumSpan());
- }
-
- /**
- * The index after the last index we can scan from {@code atIndex} with at most {@code maxScanDistance}.
- */
- private int scanLimit(int atIndex, int maxScanDistance)
- {
- return Math.min(1 + atIndex + maxScanDistance, ranges.length);
- }
-
- private boolean shouldTenure(RoutingKey end, int scanLimit)
- {
- return scanLimit < ranges.length && end.compareTo(ranges[scanLimit].start()) > 0;
- }
-
- private boolean canWriteCheckpoint(int atIndex)
- {
- return atIndex - pending.atIndex >= minimumSpan();
- }
-
- private boolean shouldWriteCheckpoint(int atIndex)
- {
- if (!canWriteCheckpoint(atIndex))
- return false;
-
- // TODO (desired, efficiency): consider these triggers
- if (scan.mustCheckpointToScanTenured(atIndex, maxScanDistance()))
- return true;
-
- return scan.hasMaybeDivergedFromMatchSize(tenured);
- }
-
- /**
- * Write a checkpoint for ranges[prevCheckpointIndex...ri)
- *
- * 1) Finalise the scan distance
- * 2) Write the header
- * 3) Filter the pending tenured ranges to remove those we can scan
- * 4) Write this list out
- * 5) Setup a link to this list, if it is large enough
- * 6) Rollover the scan, tenured and pending structures for the new pending checkpoint
- */
- private void writeCheckpoint(int nextCheckpointIndex)
- {
- int lastIndex = nextCheckpointIndex - 1;
- int scanDistance = scan.finalise(lastIndex);
- scanDistance = extendScanDistance(lastIndex, scanDistance);
-
- if (pending.atIndex < 0)
- {
- // we don't have any checkpoints pending, so don't try to finalise it
- // but if the new checkpoint doesn't cover index 0, insert a new empty
- // checkpoint for the scan distance
- if (nextCheckpointIndex > 0)
- {
- // setup an initial empty checkpoint to store the first scan distance
- maxScanAndCheckpointMatches = scanDistance;
- writeHeader(scanDistance, 0);
- }
- }
- else
- {
- writeHeader(scanDistance, pending.atIndex);
- int maxCheckpointMatchCount = pending.filter(scanDistance, lastIndex);
- int listIndex = writeList(pending);
- if (withLinks)
- pending.setupLinkChain(tenured, listIndex, listCount);
- maxScanAndCheckpointMatches = Math.max(maxScanAndCheckpointMatches, scanDistance + maxCheckpointMatchCount);
- }
-
- savePendingCheckpointAndResetScanDistance(nextCheckpointIndex);
- }
-
- private void savePendingCheckpointAndResetScanDistance(int checkpointIndex)
- {
- // use the tail of checkpointListBuf to buffer ranges we plan to tenure
- ensureCapacity(tenured.count() + scan.watermark());
-
- scan.reset();
-
- if (isAccurate)
- {
- // TODO (low priority, efficiency): we can shift back the existing scanDistance if it's far enough from
- // the next checkpoint. this might permit us to skip some comparisons
- scan.resetPeakMax(tenured);
- for (Tenured tenured : this.tenured)
- {
- int distanceToEnd = (tenured.lastIndex - checkpointIndex);
- if (distanceToEnd >= scan.peakMax)
- break;
-
- int scanDistance = tenured.lastIndex - tenured.index;
- if (scanDistance <= scan.peakMax)
- scan.updateScanDistance(tenured.index, scanDistance, null);
- }
-
- if (scan.watermark() < scan.goal)
- {
- int ri = Scan.minScanIndex(checkpointIndex, scan.goal);
- while (ri < checkpointIndex)
- {
- RoutingKey end = ranges[ri].end();
- int scanLimit = scanLimit(ri, scan.peakMax);
- if (!shouldTenure(end, scanLimit))
- scan.update(end, ri, ranges, scanLimit, null);
- ++ri;
- }
- }
- }
- else
- {
- // the maximum scan distance that could ever have been adopted for last chunk
- int oldPeakMax = scan.peakMax();
- // the minimum scan distance we will start with for processing the proceeding ranges
- // note: this may increase if we decide to tenure additional ranges, at which point it will be the actual newPeakMax
- int newMinPeakMax = scan.newPeakMax(tenured);
- int minUntenuredIndex = scan.minUntenuredIndex(checkpointIndex, tenured);
- int minScanIndex = Scan.minScanIndex(checkpointIndex, newMinPeakMax);
-
- // we now make sure tenured and scan are correct for the new parameters.
- // 1) if our peakMax is lower then we need to go back and find items to tenure that we previously marked for scanning
- // 2) we must also reset our scan distances
-
- // since our peakMax is determined by tenured.count(), but we are tenuring items here we keep things simple
- // and do not account for those items we tenure but would later permit to scan as our peakMax grows
-
- int ri = Math.min(minUntenuredIndex, minScanIndex);
- while (ri < checkpointIndex)
- {
- RoutingKey end = ranges[ri].end();
- int newPeakMax = scan.newPeakMax(tenured);
- int scanLimit = scanLimit(ri, newPeakMax);
- if (shouldTenure(end, scanLimit))
- {
- // note: might have already been tenured
- // in this case our untenureLimit may be incorrect, but we won't use it
- if (ri >= minUntenuredIndex && newPeakMax < oldPeakMax)
- tenured.tenure(end, ri, ranges, scanLimit + 1, scanLimit(ri, oldPeakMax));
- }
- else
- {
- // this might effectively remove a previously tenured item
- scan.update(end, ri, ranges, scanLimit, null);
- }
- ++ri;
- }
-
- scan.resetPeakMax(tenured);
- }
-
- pending.atIndex = checkpointIndex;
- pending.clear();
- tenured.rollover(pending);
- }
-
- private int extendScanDistance(int lastIndex, int scanDistance)
- {
- // now we've established our lower bound on scan distance, see how many checkpoints we can remove
- // by increasing our scan distance so that it remains proportional to the number of results returned
- // TODO (low priority, efficiency): can reduce cost here by using scanDistances array for upper bounds to scan distance
- int maxScanDistance = scan.goal() + 2 * Math.min(tenured.count(), tenured.countAtPrevCheckpoint());
- if (maxScanDistance >= 1 + scanDistance + scanDistance/4 && pending.count() >= (maxScanDistance - scanDistance)/2)
- {
- int removeCount = 0;
- int extendedScanDistance = scanDistance;
- int target = (maxScanDistance - scanDistance)/2;
- for (int i = 0 ; i < pending.count() ; ++i)
- {
- Tenured t = pending.get(i);
- if (t.index < 0)
- continue;
-
- int distance = Math.min(lastIndex, t.lastIndex) - t.index;
- if (distance <= scanDistance)
- continue; // already scanned or untenured
-
- if (distance <= maxScanDistance)
- {
- ++removeCount;
- extendedScanDistance = Math.max(extendedScanDistance, distance);
- if (extendedScanDistance == maxScanDistance && removeCount >= target)
- break;
- }
- }
-
- // TODO (low priority, efficiency): should perhaps also gate this decision on the span we're covering
- // algorithmically, however, so long as we are under maxScanDistance we are fine
- if (removeCount >= (extendedScanDistance - scanDistance)/2)
- scanDistance = extendedScanDistance;
- }
- return scanDistance;
- }
-
- int writeList(PendingCheckpoint pending)
- {
- int startIndex = listCount;
- for (int i = pending.count() - 1 ; i >= 0 ; --i)
- {
- Tenured t = pending.get(i);
- if (t.index >= 0)
- {
- lists[listCount++] = t.index;
- }
- else
- {
- int index = t.index & ~BIT31;
- int length = t.linkLength & ~BIT31;
- if (length <= 0xff && index <= 0xfffff)
- {
- lists[listCount++] = BIT31 | BIT30 | (length << 20) | index;
- }
- else if (t.linkLength >= 0 && length < BIT30)
- {
- lists[listCount++] = BIT31 | BIT29 | index;
- }
- else
- {
- lists[listCount++] = BIT31 | length;
- lists[listCount++] = BIT31 | pending.count();
- }
- }
- }
- return startIndex;
- }
-
- void writeHeader(int scanDistance, int lowerBound)
- {
- int headerScanDistance = Math.min(scanDistance, MAX_SCAN_DISTANCE);
-
- if ((checkpointCount & 3) == 0)
- headers[headerPointer++] = headerScanDistance;
- else
- headers[headerPointer - (1 + (checkpointCount & 3))] |= headerScanDistance << (8 * (checkpointCount & 3));
-
- bounds[checkpointCount++] = lowerBound;
- headers[headerPointer++] = listCount;
-
- if (scanDistance >= MAX_SCAN_DISTANCE)
- lists[listCount++] = -scanDistance; // serialize as a negative value so we ignore it in most cases automatically
- }
-
- void closeHeaders()
- {
- // write our final checkpoint header
- if ((checkpointCount & 3) == 0) headers[headerPointer++] = 0;
- headers[headerPointer++] = listCount;
- }
-
- void ensureCapacity(int maxPendingSize)
- {
- if (listCount + maxPendingSize >= lists.length)
- lists = cachedInts().resize(lists, listCount, lists.length + lists.length/2 + maxPendingSize);
- }
-
- static class Scan
- {
- /** the scan distance we are aiming for; should be proportional to log2(N) */
- int goal;
-
- /** the indexes at which we increased the scan distance, and the new scan distance */
- int[] distances = new int[16];
- /** the number of unique scan distances we have adopted since the last checkpoint */
- int count;
- /** the highest scan distance we have adopted (==scanDistance(scanDistanceCount-1)) */
- int watermark;
- /**
- * the first index we have tenured a range from, but for which we did not immediately write a new checkpoint
- * we *must* scan at least from the last index in the checkpoint to here
- */
- int scanTenuredAtIndex = -1;
-
- /** The maximum (i.e. initial) scan distance limit we have used since the last attempted checkpoint write */
- int peakMax;
-
- void init(int goalScanDistance)
- {
- goal = peakMax = goalScanDistance;
- }
-
- private void update(RoutingKey end, int atIndex, Range[] ranges, int scanLimit, SearchableRangeListBuilder checkpoint)
- {
- int newScanDistance = find(end, atIndex, ranges, scanLimit, watermark);
- updateScanDistance(atIndex, newScanDistance, checkpoint);
- }
-
- private void updateScanDistance(int atIndex, int newScanDistance, SearchableRangeListBuilder checkpoint)
- {
- if (newScanDistance > watermark)
- {
- // TODO (desired, efficiency): we don't mind slight increases to the watermark;
- // should really look at scan distance history and ensure we haven't e.g. doubled since
- // some earlier point (and should track the match count + scan distance at each bump
- // to check overall work hasn't increased too much)
- if (checkpoint != null && checkpoint.canWriteCheckpoint(atIndex))
- checkpoint.writeCheckpoint(atIndex);
-
- watermark = newScanDistance;
- if (count * 2 == distances.length)
- distances = Arrays.copyOf(distances, distances.length * 2);
- distances[count * 2] = newScanDistance;
- distances[count * 2 + 1] = atIndex;
- ++count;
- }
- }
-
- private int find(RoutingKey end, int atIndex, Range[] ranges, int scanLimit, int currentScanDistance)
- {
- int lowerIndex = SortedArrays.exponentialSearch(ranges, atIndex + currentScanDistance, scanLimit, end, (e, s) -> e.compareTo(s.start()), CEIL);
- if (lowerIndex < 0) lowerIndex = -2 - lowerIndex;
- else lowerIndex -= 1;
- return lowerIndex - atIndex;
- }
-
- boolean isAboveGoal()
- {
- return watermark > goal;
- }
-
- int watermark()
- {
- return watermark;
- }
-
- int goal()
- {
- return goal;
- }
-
- int distanceToTenured(int lastIndex)
- {
- return scanTenuredAtIndex >= 0 ? lastIndex - scanTenuredAtIndex : 0;
- }
-
- boolean mustCheckpointToScanTenured(int checkpointIndex, int maxScanDistance)
- {
- return scanTenuredAtIndex >= 0 && checkpointIndex - scanTenuredAtIndex >= maxScanDistance;
- }
-
- /**
- * Are we scanning a much longer distance than the minimum number of matches we know a query will return?
- * Note: with Strategy.FAST, {@code tenured.count()} gets less accurate as scan distance increases, so this
- * will bounce around triggering checkpoints due to the larger scan distance, resetting the scan distance
- * and starting again
- */
- boolean hasMaybeDivergedFromMatchSize(TenuredSet tenured)
- {
- return isAboveGoal() && tenured.count() < watermark()/2;
- }
-
- private int distance(int i)
- {
- return distances[i*2];
- }
-
- private int index(int i)
- {
- return distances[i*2+1];
- }
-
- int finalise(int lastIndex)
- {
- Invariants.checkState(distanceToTenured(lastIndex) <= Math.max(watermark(), peakMax()));
-
- int scanDistance = watermark;
- // then, compute the minimum scan distance implied by any tenured ranges we did not immediately
- // write a checkpoint for - we *must* scan back as far as this record
- int minScanDistance = scanTenuredAtIndex >= 0 ? lastIndex - scanTenuredAtIndex : 0;
- if (minScanDistance > scanDistance)
- {
- // if this minimum is larger than the largest scan distance we picked up for non-tenured ranges
- // then we are done, as there's nothing we can save
- scanDistance = minScanDistance;
- }
- else if (scanDistance > 0)
- {
- // otherwise, we can look to see if any of the scan distances we computed overflow the checkpoint,
- // i.e. where no records served by this checkpoint need to scan the full distance to reach it
- int distanceToLastScanIndex = lastIndex - index(count -1);
- // if the distance to the last scan index is larger than its scan distance, we have overflowed;
- if (distanceToLastScanIndex < scanDistance)
- {
- minScanDistance = Math.max(distanceToLastScanIndex, minScanDistance);
- // loop until we find one that doesn't overflow, as this is another minimum scan distance
- int i = count - 1;
- while (--i >= 0)
- {
- int distance = lastIndex - index(i);
- if (distance >= distance(i)) break;
- else if (distance > minScanDistance) minScanDistance = distance;
- }
- if (i >= 0) scanDistance = Math.max(minScanDistance, distance(i));
- else scanDistance = minScanDistance;
- }
- }
-
- return scanDistance;
- }
-
- void reset()
- {
- // we could in theory reset our scan distance using the contents of scanDistance[]
- // but it's a bit complicated, as we want to have the first item to increment the scan distance
- // so that we can use it in writeScanDistance to shrink the scan distance;
- // jumping straight to the highest scan distance breaks this
- count = 0;
- scanTenuredAtIndex = -1;
- watermark = 0;
- }
-
- void resetPeakMax(TenuredSet tenured)
- {
- peakMax = newPeakMax(tenured);
- }
-
- int peakMax()
- {
- return peakMax;
- }
-
- int newPeakMax(TenuredSet tenured)
- {
- return Math.max(goal, tenured.count());
- }
-
- /**
- * The minimum index containing a range that might need to be tenured, if we have a smaller max scan distance than before
- */
- int minUntenuredIndex(int checkpointIndex, TenuredSet tenured)
- {
- int minUntenuredIndex = Math.max(0, (checkpointIndex - 1) - watermark());
- // the maximum scan distance that cxould ever have been adopted for the ranges processed since last checkpoint
- int oldPeakMax = peakMax;
- int newMinPeakMax = newPeakMax(tenured);
- if (newMinPeakMax < oldPeakMax)
- {
- // minimise range we unnecessarily re-tenure over
- // TODO (low priority, efficiency): see if can also use to reduce range we re-scan e.g. can recycle
- // scanDistances contents if we know we won't need to step back further at next checkpoint
- for (int i = count - 1; i >= 0 ; --i)
- {
- if (index(i) <= minUntenuredIndex)
- break;
- if (distance(i) <= newMinPeakMax)
- return i + 1 == count ? index(i) : index(i + 1) - 1;
- }
- }
- return minUntenuredIndex;
- }
-
- /**
- * Record that a range at this index has been tenured, so that we can track how far back
- * we need to scan to determine how long we can defer writing a new checkpoint while still
- * being able to scan it.
- *
- * TODO (low priority, efficiency): when a checkpoint is written, we should consider moving it
- * earlier if the scan distance is increased primarily because of this index, and the tenured
- * collection is otherwise unchanged (so can be written with minimal overhead)
- */
- void tenured(int atIndex)
- {
- if (scanTenuredAtIndex < 0)
- scanTenuredAtIndex = atIndex;
- }
-
- static int minScanIndex(int checkpointIndex, int scanDistance)
- {
- return Math.max(0, (checkpointIndex - 1) - scanDistance);
- }
-
- @Override
- public String toString()
- {
- return "Scan{watermark=" + watermark + ", tenured=" + scanTenuredAtIndex + '}';
- }
- }
-
- /**
- * Record-keeping for a range we have decided is not scannable
- */
- static class Tenured implements Comparable<Tenured>
- {
- /**
- * The end of the tenured range covered by the contents referred to be {@link #index}
- */
- RoutingKey end;
-
- /**
- * <ul>
- * <li>If positive, this points to {@code ranges[index]}</li>
- * <li>If negative, this points to an entry in {@link #lists};
- * see {@link SearchableRangeList#checkpointLists}</li>
- * </ul>
- */
- int index;
-
- /**
- * The last index in {@link #ranges} covered by this tenured range
- */
- int lastIndex;
-
- /**
- * set when this record is serialized in a checkpoint list to either:
- * <ul>
- * <li>point to itself, in which case no action should be
- * taken on removal (it is only retained for size bookkeeping); or</li>
- * <li>point to the next item in the checkpoint list; the first
- * such element removed triggers the clearing of the checkpoint
- * list so that its entries are re-inserted in the next checkpoint</li>
- * </ul>
- */
- Tenured next;
-
- /**
- * Only set for link entries, i.e. where {@code index < 0}.
- * <ul>
- * <li>if positive, the length is optional as we will terminate safely using the end bound filtering</li>
- * <li>if negative, the low 31 bits <b>must</b> be retrieved as the length for safe iteration</li>
- * </ul>
- */
- int linkLength;
-
- Tenured(RoutingKey end, int index)
- {
- this.end = end;
- this.index = index;
- }
-
- @Override
- public int compareTo(@Nonnull Tenured that)
- {
- int c = this.end.compareTo(that.end);
- // we sort indexes in reverse order so later tenured items find the earlier ones with same end when searching
- // for higher entries for the range of indexes to search, and
- if (c == 0) c = -Integer.compare(this.index, that.index);
- return c;
- }
-
- @Override
- public String toString()
- {
- return "Tenured{end=" + end + ", index=" + index + '}';
- }
- }
-
- /**
- * The set of ranges that we intend to write to checkpoints that remain open at the current point in the iteration
- * This collection may be filtered before serialization, but every member will be visited either by scanning
- * or visiting the checkpoint list
- * TODO (low priority, efficiency): save garbage by using an insertion-sorted array for collections where
- * this is sufficient. later, introduce a mutable b-tree supporting object recycling. we would also like
- * to use a collection that permits us to insert and return a finger into the tree so we can find the
- * successor as part of insertion, and that permits constant-time first() calls
- */
- static class TenuredSet extends TreeSet<Tenured>
- {
- /**
- * the number of direct tenured entries (i.e. ignoring link entries)
- * this is used to provide a minimum bound on the number of results a range query can return
- * note: with Strategy.FAST this gets less accurate as the span distance increases
- */
- int directCount;
- int directCountAtPrevCheckpoint;
- int minSpan;
-
- // a stack of recently used EndAndIndex objects - used only for the duration of a single build
- Tenured reuse, pendingReuse, pendingReuseTail;
-
- int count()
- {
- return directCount;
- }
-
- int countAtPrevCheckpoint()
- {
- return directCountAtPrevCheckpoint;
- }
-
- /**
- * We require a checkpoint to cover a distance at least as large as the number of tenured ranges leftover
- * since the prior checkpoint, to ensure these require at most linear additional space, while not requiring
- * more than O(k) additional complexity on search (i.e., we will scan a number of elements at most equal
- * to the number we have to visit in the checkpoint).
- *
- * We achieve this by recording the minimum number of match results as of the prior checkpoint (i.e. {@link #count()})
- * and discounting it by one each time we untenure a range, so that for each tenured range from the prior checkpoint
- * we have either untenured a range or processed at least one additional input.
- */
- int minimumSpan()
- {
- return minSpan;
- }
-
- private int tenure(RoutingKey end, int index, Range[] ranges, int minUntenureIndex)
- {
- return tenure(newTenured(end, index), ranges, minUntenureIndex, ranges.length);
- }
-
- private void tenure(RoutingKey end, int index, Range[] ranges, int minUntenureIndex, int untenureLimit)
- {
- tenure(newTenured(end, index), ranges, minUntenureIndex, untenureLimit);
- }
-
- private int tenure(Tenured tenure, Range[] ranges, int untenureMinIndex, int untenureLimit)
- {
- if (!add(tenure))
- return tenure.lastIndex;
-
- Tenured next = higher(tenure);
- if (next != null)
- untenureLimit = Math.min(untenureLimit, next.lastIndex + 1);
- int untenureIndex = SortedArrays.binarySearch(ranges, untenureMinIndex, untenureLimit, tenure.end, (e, s) -> e.compareTo(s.start()), CEIL);
- if (untenureIndex < 0) untenureIndex = -1 - untenureIndex;
- tenure.lastIndex = untenureIndex - 1;
- Invariants.checkState(tenure.end.compareTo(ranges[tenure.lastIndex].start()) > 0);
- Invariants.checkState(tenure.lastIndex + 1 == ranges.length || tenure.end.compareTo(ranges[tenure.lastIndex + 1].start()) <= 0);
- ++directCount;
- return untenureIndex - 1;
- }
-
- private Tenured newTenured(RoutingKey end, int index)
- {
- Tenured result = reuse;
- if (result == null)
- return new Tenured(end, index);
-
- reuse = result.next;
- result.end = end;
- result.index = index;
- result.lastIndex = 0;
- result.next = null;
- return result;
- }
-
- private Tenured addLinkEntry(RoutingKey end, int index, int lastIndex, int length)
- {
- Invariants.checkArgument(index < 0);
- Tenured result = newTenured(end, index);
- result.linkLength = length;
- result.lastIndex = lastIndex;
- add(result);
- return result;
- }
-
- /**
- * Retire any active tenured ranges that no longer cover the pointer into ranges;
- * if this crosses our checkpoint threshold, write a new checkpoint.
- */
- void untenure(int index)
- {
- while (!isEmpty() && first().lastIndex < index)
- {
- Tenured removed = pollFirst();
-
- // if removed.next == null, this is not referenced by a link
- // if removed.next == removed, it is referenced by a link but does not modify the link on removal
- if (removed.next != null && removed.next != removed)
- {
- // this is a member of a link's chain, which may serve one of two purposes:
- // 1) it may be the entry nominated to invalidate the link, due to the link
- // membership shrinking below the required threshold; in which case we
- // must clear the chain to reactivate its members for insertion into the
- // next checkpoint, and remove the chain link itself
- // 2) it may be nominated as an entry to update the chain link info, to make
- // it more succinct: if every entry of the chain remains active, and there
- // are *many* entries then we need two integers to represent the chain, but
- // as soon as any entry is invalid we can rely on this entry to terminate
- // iteration, so we update the bookkeeping on the first entry we remove in
- // this case
-
- // first clear the chain starting at the removed entry
- Tenured prev = removed, next = removed.next;
- while (next.next != null)
- {
- prev = next;
- next = next.next;
- prev.next = null;
- }
- Invariants.checkState(next.index < 0);
- if (prev.end == next.end)
- {
- // if this is the last entry in the link, the link is expired and should be removed/reused
- remove(next);
- if (pendingReuseTail == null)
- pendingReuseTail = next;
- next.next = pendingReuse;
- pendingReuse = next;
- }
- else if (next.linkLength < 0)
- {
- // otherwise, flag the link as safely consumed without knowing the length
- next.linkLength = next.linkLength & Integer.MAX_VALUE;
- }
- }
-
- // this was not a link reference; update our bookkeeping and save it for reuse
- Invariants.checkState(removed.index >= 0);
- --directCount;
- --minSpan;
- if (pendingReuseTail == null)
- pendingReuseTail = removed;
- removed.next = pendingReuse;
- pendingReuse = removed;
- }
- }
-
- /**
- * Write out any direct entries that are not pointed to by a chain entry, and any chain entries;
- * rollover any per-checkpoint data and free up for reuse discarded Tenured objects
- */
- void rollover(PendingCheckpoint pending)
- {
- for (Tenured tenured : this)
- {
- if (tenured.next == null)
- pending.add(tenured);
- }
- // make freed Tenured objects available for reuse
- if (pendingReuse != null)
- {
- pendingReuseTail.next = reuse;
- reuse = pendingReuse;
- pendingReuseTail = pendingReuse = null;
- }
- directCountAtPrevCheckpoint = minSpan = directCount;
- }
- }
-
- /**
- * we write checkpoints out before knowing the scan distance needed for the range, as a checkpoint precedes
- * the ranges it covers; so we record the position and contents of the checkpoint, and once the scan distance is
- * known (i.e. when the next checkpoint is written) we re-process the list to remove items we can now scan before
- * serializing to checkpointListsBuf.
- */
- static class PendingCheckpoint
- {
- int atIndex = -1;
- int count;
-
- Tenured[] contents = new Tenured[10];
-
- int openDirectCount, firstOpenDirect, openIndirectCount;
- boolean hasClosedDirect;
-
- int count()
- {
- return count;
- }
-
- Tenured get(int i)
- {
- return contents[i];
- }
-
- void add(Tenured tenured)
- {
- if (contents.length == count)
- contents = Arrays.copyOf(contents, 2 * contents.length);
- contents[count++] = tenured;
- }
-
- void clear()
- {
- count = 0;
- }
-
- /**
- * Remove pending entries that will be scanned by the scanDistance, and update
- * our bookkeeping for creating links
- */
- int filter(int scanDistance, int lastIndex)
- {
- int matchCountModifier = 0;
- int maxi = count;
- count = 0;
- openDirectCount = 0;
- openIndirectCount = 0;
- firstOpenDirect = -1;
-// lastClosedDirect = -1;
-
- for (int i = 0; i < maxi ; ++i)
- {
- Tenured t = get(i);
- if (t.index >= 0)
- {
- if (t.index + scanDistance >= lastIndex)
- continue; // last index will find it with a scan
-
- if (t.lastIndex <= t.index + scanDistance)
- continue; // all indexes will find it with a scan
-
- if (t.lastIndex > lastIndex)
- {
- // this range remains open for the next checkpoint;
- // we may want to reference this list from there
- // so track count and position of first one to make a determination
- ++openDirectCount;
- if (firstOpenDirect < 0) firstOpenDirect = count;
- }
- else hasClosedDirect = true;
- }
- else
- {
- // note: we over count here, as we count pointers within the chain
- matchCountModifier += (t.linkLength & Integer.MAX_VALUE) - 1; // (subtract 1 to discount the pointer)
- if (t.lastIndex > lastIndex)
- ++openIndirectCount;
- }
-
- if (i == count) ++count;
- else contents[count++] = t;
- }
-
- return count + matchCountModifier;
- }
-
- /**
- * Setup a link for referencing this chain later, if permitted.
- * Must have at least two items, and at least as many direct records as indirect
- */
- void setupLinkChain(TenuredSet tenured, int startIndex, int endIndex)
- {
- int minSizeToReference = openIndirectCount + MIN_INDIRECT_LINK_LENGTH;
- if (openDirectCount >= minSizeToReference)
- {
- int i = firstOpenDirect;
- Tenured prev = get(i++);
-
- while (openDirectCount > minSizeToReference)
- {
- Tenured e = get(i++);
- if (e.index < 0)
- {
- --minSizeToReference;
- continue;
- }
-
- Invariants.checkState(prev.next == null);
- prev.next = prev;
- prev = e;
- --openDirectCount;
- }
-
- while (i < count)
- {
- Tenured next = get(i++);
- if (next.index < 0)
- continue;
-
- Invariants.checkState(prev.next == null);
- prev.next = next;
- prev = next;
- }
-
- // may be more than one entry per item (though usually not)
- int length = endIndex - startIndex;
- Tenured chainEntry = tenured.addLinkEntry(prev.end, BIT31 | startIndex, prev.lastIndex, length);
- prev.next = chainEntry;
- if (hasClosedDirect && (startIndex > 0xfffff || (length > 0xff)))
- {
- // TODO (expected, testing): make sure this is tested, as not a common code path (may never be executed in normal operation)
- // we have no closed ranges so iteration needs to know the end bound, but we cannot encode our bounds cheaply
- // so link the first bound to the chain entry, so that on removal it triggers an update of endIndex to note
- // that it can be iterated safely without an end bound
- get(firstOpenDirect).next = chainEntry;
- }
- }
- }
-
- @Override
- public String toString()
- {
- return Arrays.stream(contents, 0, count)
- .map(Objects::toString)
- .collect(Collectors.joining(",", "[", "]"));
- }
+ return build((ranges, bounds, headers, lists, maxScanAndCheckpointMatches) ->
+ new SearchableRangeList(ranges, bounds, headers, lists, maxScanAndCheckpointMatches));
}
}
diff --git a/accord-core/src/main/java/accord/utils/async/AsyncChains.java b/accord-core/src/main/java/accord/utils/async/AsyncChains.java
index 579bbf9..a18f782 100644
--- a/accord-core/src/main/java/accord/utils/async/AsyncChains.java
+++ b/accord-core/src/main/java/accord/utils/async/AsyncChains.java
@@ -831,6 +831,18 @@
}
}
+ public static <V> V getUnchecked(AsyncChain<V> chain)
+ {
+ try
+ {
+ return getUninterruptibly(chain);
+ }
+ catch (ExecutionException e)
+ {
+ throw new RuntimeException(e);
+ }
+ }
+
public static void awaitUninterruptibly(AsyncChain<?> chain)
{
try
diff --git a/accord-core/src/main/java/accord/utils/random/Picker.java b/accord-core/src/main/java/accord/utils/random/Picker.java
index f6f07d7..272f17e 100644
--- a/accord-core/src/main/java/accord/utils/random/Picker.java
+++ b/accord-core/src/main/java/accord/utils/random/Picker.java
@@ -26,6 +26,18 @@
public class Picker
{
+ public static float[] randomWeights(RandomSource random, int length)
+ {
+ float[] weights = new float[length - 1];
+ float sum = 0;
+ for (int i = 0 ; i < weights.length ; ++i)
+ weights[i] = sum += random.nextFloat();
+ sum += random.nextFloat();
+ for (int i = 0 ; i < weights.length ; ++i)
+ weights[i] /= sum;
+ return weights;
+ }
+
static abstract class Weighted
{
final RandomSource random;
@@ -33,7 +45,7 @@
public Weighted(RandomSource random, int length)
{
- this(random, randomWeights(random, length));
+ this(random, Picker.randomWeights(random, length));
}
public Weighted(RandomSource random, float[] weights)
@@ -42,17 +54,6 @@
this.weights = weights;
}
- static float[] randomWeights(RandomSource random, int length)
- {
- float[] weights = new float[length - 1];
- float sum = 0;
- for (int i = 0 ; i < weights.length ; ++i)
- weights[i] = sum += random.nextFloat();
- sum += random.nextFloat();
- for (int i = 0 ; i < weights.length ; ++i)
- weights[i] /= sum;
- return weights;
- }
static float[] randomWeights(RandomSource random, float[] bias)
{
@@ -104,7 +105,7 @@
public static <T> WeightedObjectPicker<T> randomWeighted(RandomSource random, T[] values)
{
- return new WeightedObjectPicker<>(random, values, randomWeights(random, values.length));
+ return new WeightedObjectPicker<>(random, values, Picker.randomWeights(random, values.length));
}
public static <T> WeightedObjectPicker<T> randomWeighted(RandomSource random, T[] values, float[] bias)
diff --git a/accord-core/src/test/java/accord/utils/Gen.java b/accord-core/src/test/java/accord/utils/Gen.java
index af86340..04b6e63 100644
--- a/accord-core/src/test/java/accord/utils/Gen.java
+++ b/accord-core/src/test/java/accord/utils/Gen.java
@@ -99,6 +99,21 @@
return Stream.generate(() -> next(rs));
}
+ interface Int2IntMapFunction
+ {
+ int applyAsInt(RandomSource rs, int value);
+ }
+
+ interface Int2LongMapFunction
+ {
+ long applyAsLong(RandomSource rs, int value);
+ }
+
+ interface Long2LongMapFunction
+ {
+ long applyAsLong(RandomSource rs, long value);
+ }
+
interface IntGen extends Gen<Integer>
{
int nextInt(RandomSource random);
@@ -114,6 +129,16 @@
return r -> fn.applyAsInt(nextInt(r));
}
+ default IntGen mapAsInt(Int2IntMapFunction fn)
+ {
+ return r -> fn.applyAsInt(r, nextInt(r));
+ }
+
+ default LongGen mapAsLong(Int2LongMapFunction fn)
+ {
+ return r -> fn.applyAsLong(r, nextInt(r));
+ }
+
default Gen.IntGen filterAsInt(IntPredicate fn)
{
return rs -> {
@@ -159,6 +184,11 @@
return r -> fn.applyAsLong(nextLong(r));
}
+ default LongGen mapAsLong(Long2LongMapFunction fn)
+ {
+ return r -> fn.applyAsLong(r, nextLong(r));
+ }
+
default Gen.LongGen filterAsLong(LongPredicate fn)
{
return rs -> {
diff --git a/accord-core/src/test/java/accord/utils/Gens.java b/accord-core/src/test/java/accord/utils/Gens.java
index 35e4567..3723fc6 100644
--- a/accord-core/src/test/java/accord/utils/Gens.java
+++ b/accord-core/src/test/java/accord/utils/Gens.java
@@ -35,8 +35,12 @@
import java.util.function.Function;
import java.util.function.Supplier;
import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+import java.util.stream.LongStream;
import java.util.stream.Stream;
+import accord.utils.random.Picker;
+
public class Gens {
private Gens() {
}
@@ -51,6 +55,27 @@
return ignore -> constant.get();
}
+ public static <T> Gen<T> oneOf(Gen<T>... gens)
+ {
+ return oneOf(Arrays.asList(gens));
+ }
+
+ public static <T> Gen<T> oneOf(List<Gen<T>> gens)
+ {
+ return rs -> rs.pick(gens).next(rs);
+ }
+
+ public static <T> Gen<T> oneOf(Map<Gen<T>, Integer> values)
+ {
+ Gen<Gen<T>> gen = pick(values);
+ return rs -> gen.next(rs).next(rs);
+ }
+
+ public static Gen.IntGen pickInt(int... ts)
+ {
+ return rs -> ts[rs.nextInt(0, ts.length)];
+ }
+
public static <T> Gen<T> pick(T... ts)
{
return pick(Arrays.asList(ts));
@@ -116,6 +141,256 @@
};
}
+ public static Gen.LongGen pickZipf(long[] array)
+ {
+ if (array == null || array.length == 0)
+ throw new IllegalArgumentException("Empty array given");
+ if (array.length == 1)
+ return ignore -> array[0];
+ BigDecimal[] weights = new BigDecimal[array.length];
+ BigDecimal base = BigDecimal.valueOf(Math.pow(2, array.length));
+ weights[0] = base;
+ for (int i = 1; i < array.length; i++)
+ weights[i] = base.divide(BigDecimal.valueOf(i + 1), RoundingMode.UP);
+ BigDecimal totalWeights = Stream.of(weights).reduce(BigDecimal.ZERO, BigDecimal::add);
+
+ return rs -> {
+ BigDecimal value = BigDecimal.valueOf(rs.nextDouble()).multiply(totalWeights);
+ for (int i = 0; i < weights.length; i++)
+ {
+ value = value.subtract(weights[i]);
+ if (value.compareTo(BigDecimal.ZERO) <= 0)
+ return array[i];
+ }
+ return array[array.length - 1];
+ };
+ }
+
+ public static <T> Gen<T> pickZipf(T... array)
+ {
+ return pickZipf(Arrays.asList(array));
+ }
+
+ public static <T> Gen<T> pickZipf(List<T> array)
+ {
+ if (array == null || array.isEmpty())
+ throw new IllegalArgumentException("Empty array given");
+ if (array.size() == 1)
+ return ignore -> array.get(0);
+ BigDecimal[] weights = new BigDecimal[array.size()];
+ BigDecimal base = BigDecimal.valueOf(Math.pow(2, array.size()));
+ weights[0] = base;
+ for (int i = 1; i < array.size(); i++)
+ weights[i] = base.divide(BigDecimal.valueOf(i + 1), RoundingMode.UP);
+ BigDecimal totalWeights = Stream.of(weights).reduce(BigDecimal.ZERO, BigDecimal::add);
+
+ return rs -> {
+ BigDecimal value = BigDecimal.valueOf(rs.nextDouble()).multiply(totalWeights);
+ for (int i = 0; i < weights.length; i++)
+ {
+ value = value.subtract(weights[i]);
+ if (value.compareTo(BigDecimal.ZERO) <= 0)
+ return array.get(i);
+ }
+ return array.get(array.size() - 1);
+ };
+ }
+
+ public static Gen<Gen.IntGen> randomWeights(int[] array)
+ {
+ return rs -> {
+ float[] weights = Picker.randomWeights(rs, array.length);
+ return r -> array[index(r, weights)];
+ };
+ }
+
+ public static Gen<Gen.LongGen> randomWeights(long[] array)
+ {
+ return rs -> {
+ float[] weights = Picker.randomWeights(rs, array.length);
+ return r -> array[index(r, weights)];
+ };
+ }
+
+ public static <T> Gen<Gen<T>> randomWeights(T[] array)
+ {
+ return rs -> {
+ float[] weights = Picker.randomWeights(rs, array.length);
+ return r -> array[index(r, weights)];
+ };
+ }
+
+ public static <T> Gen<Gen<T>> randomWeights(List<T> array)
+ {
+ return rs -> {
+ float[] weights = Picker.randomWeights(rs, array.size());
+ return r -> array.get(index(r, weights));
+ };
+ }
+
+ private static int index(RandomSource rs, float[] weights)
+ {
+ int i = Arrays.binarySearch(weights, rs.nextFloat());
+ if (i < 0) i = -1 - i;
+ return i;
+ }
+
+ public static Gen<Gen.IntGen> mixedDistribution(int minInclusive, int maxExclusive)
+ {
+ int domainSize = (maxExclusive - minInclusive + 1);
+ if (domainSize < 0)
+ throw new IllegalArgumentException("Range is too large; min=" + minInclusive + ", max=" + maxExclusive);
+ int[] array, indexes;
+ if (domainSize > 200) // randomly selected
+ {
+ int numBuckets = 10;
+ int delta = domainSize / numBuckets;
+ array = new int[numBuckets];
+ for (int i = 0; i < numBuckets; i++)
+ array[i] = minInclusive + i * delta;
+ indexes = IntStream.range(0, array.length).toArray();
+ }
+ else
+ {
+ array = IntStream.range(minInclusive, maxExclusive).toArray();
+ indexes = null;
+ }
+ return rs -> {
+ switch (rs.nextInt(0, 4))
+ {
+ case 0: // uniform
+ return r -> r.nextInt(minInclusive, maxExclusive);
+ case 1: // median biased
+ int median = rs.nextInt(minInclusive, maxExclusive);
+ return r -> r.nextBiasedInt(minInclusive, median, maxExclusive);
+ case 2: // zipf
+ if (indexes == null)
+ return Gens.pickZipf(rs.nextBoolean() ? reverseAndCopy(array) : array);
+ return Gens.pickZipf(rs.nextBoolean() ? reverseAndCopy(indexes) : indexes).mapAsInt((r, index) -> {
+ int start = array[index];
+ int end = index == array.length - 1 ? maxExclusive : array[index + 1];
+ return r.nextInt(start, end);
+ });
+ case 3: // random weight
+ if (indexes == null)
+ return randomWeights(array).next(rs);
+ return randomWeights(indexes).next(rs).mapAsInt((r, index) -> {
+ int start = array[index];
+ int end = index == array.length - 1 ? maxExclusive : array[index + 1];
+ return r.nextInt(start, end);
+ });
+ default:
+ throw new AssertionError();
+ }
+ };
+ }
+
+ private static int[] reverseAndCopy(int[] array)
+ {
+ array = Arrays.copyOf(array, array.length);
+ for (int i = 0, mid = array.length / 2, j = array.length - 1; i < mid; i++, j--)
+ {
+ int tmp = array[i];
+ array[i] = array[j];
+ array[j] = tmp;
+ }
+ return array;
+ }
+
+ public static Gen<Gen.LongGen> mixedDistribution(long minInclusive, long maxExclusive)
+ {
+ long domainSize = (maxExclusive - minInclusive + 1);
+ if (domainSize < 0)
+ throw new IllegalArgumentException("Range is too large; min=" + minInclusive + ", max=" + maxExclusive);
+ long[] array;
+ int[] indexes;
+ if (domainSize > 200) // randomly selected
+ {
+ int numBuckets = 10;
+ long delta = domainSize / numBuckets;
+ array = new long[numBuckets];
+ for (int i = 0; i < numBuckets; i++)
+ array[i] = minInclusive + i * delta;
+ indexes = IntStream.range(0, array.length).toArray();
+ }
+ else
+ {
+ array = LongStream.range(minInclusive, maxExclusive).toArray();
+ indexes = null;
+ }
+ return rs -> {
+ switch (rs.nextInt(0, 4))
+ {
+ case 0: // uniform
+ return r -> r.nextLong(minInclusive, maxExclusive);
+ case 1: // median biased
+ long median = rs.nextLong(minInclusive, maxExclusive);
+ return r -> r.nextBiasedLong(minInclusive, median, maxExclusive);
+ case 2: // zipf
+ if (indexes == null)
+ return Gens.pickZipf(rs.nextBoolean() ? reverseAndCopy(array) : array);
+ return Gens.pickZipf(rs.nextBoolean() ? reverseAndCopy(indexes) : indexes).mapAsLong((r, index) -> {
+ long start = array[index];
+ long end = index == array.length - 1 ? maxExclusive : array[index + 1];
+ return r.nextLong(start, end);
+ });
+ case 3: // random weight
+ if (indexes == null)
+ return randomWeights(array).next(rs);
+ return randomWeights(indexes).next(rs).mapAsLong((r, index) -> {
+ long start = array[index];
+ long end = index == array.length - 1 ? maxExclusive : array[index + 1];
+ return r.nextLong(start, end);
+ });
+ default:
+ throw new AssertionError();
+ }
+ };
+ }
+
+ private static long[] reverseAndCopy(long[] array)
+ {
+ array = Arrays.copyOf(array, array.length);
+ for (int i = 0, mid = array.length / 2, j = array.length - 1; i < mid; i++, j--)
+ {
+ long tmp = array[i];
+ array[i] = array[j];
+ array[j] = tmp;
+ }
+ return array;
+ }
+
+ public static <T> Gen<Gen<T>> mixedDistribution(T... list)
+ {
+ return mixedDistribution(Arrays.asList(list));
+ }
+
+ public static <T> Gen<Gen<T>> mixedDistribution(List<T> list)
+ {
+ return rs -> {
+ switch (rs.nextInt(0, 4))
+ {
+ case 0: // uniform
+ return r -> list.get(rs.nextInt(0, list.size()));
+ case 1: // median biased
+ int median = rs.nextInt(0, list.size());
+ return r -> list.get(r.nextBiasedInt(0, median, list.size()));
+ case 2: // zipf
+ List<T> array = list;
+ if (rs.nextBoolean())
+ {
+ array = new ArrayList<>(list);
+ Collections.reverse(array);
+ }
+ return pickZipf(array);
+ case 3: // random weight
+ return randomWeights(list).next(rs);
+ default:
+ throw new AssertionError();
+ }
+ };
+ }
+
public static Gen<char[]> charArray(Gen.IntGen sizes, char[] domain)
{
return charArray(sizes, domain, (a, b) -> true);
@@ -240,6 +515,28 @@
}
};
}
+
+ public Gen<Gen<Boolean>> mixedDistribution()
+ {
+ return rs -> {
+ int selection = rs.nextInt(0, 4);
+ switch (selection)
+ {
+ case 0: // uniform 50/50
+ return r -> r.nextBoolean();
+ case 1: // variable frequency
+ var freq = rs.nextFloat();
+ return r -> r.decide(freq);
+ case 2: // fixed result
+ boolean result = rs.nextBoolean();
+ return ignore -> result;
+ case 3: // biased repeating runs
+ return biasedRepeatingRuns(rs.nextDouble(), rs.nextInt(1, 100));
+ default:
+ throw new IllegalStateException("Unexpected int for bool selection: " + selection);
+ }
+ };
+ }
}
public static class IntDSL
@@ -265,6 +562,11 @@
return r -> r.nextInt(min, max);
return r -> r.nextInt(min, max + 1);
}
+
+ public Gen<Gen.IntGen> mixedDistribution(int minInclusive, int maxExclusive)
+ {
+ return Gens.mixedDistribution(minInclusive, maxExclusive);
+ }
}
public static class LongDSL {
@@ -296,6 +598,11 @@
return pick(klass.getEnumConstants());
}
+ public <T extends Enum<T>> Gen<Gen<T>> allMixedDistribution(Class<T> klass)
+ {
+ return mixedDistribution(klass.getEnumConstants());
+ }
+
public <T extends Enum<T>> Gen<T> allWithWeights(Class<T> klass, int... weights)
{
T[] constants = klass.getEnumConstants();
diff --git a/accord-core/src/test/java/accord/utils/Property.java b/accord-core/src/test/java/accord/utils/Property.java
index 9c81375..50eba0c 100644
--- a/accord-core/src/test/java/accord/utils/Property.java
+++ b/accord-core/src/test/java/accord/utils/Property.java
@@ -23,12 +23,16 @@
import accord.utils.async.AsyncResults;
import java.time.Duration;
+import java.util.ArrayList;
import java.util.Arrays;
+import java.util.Collection;
+import java.util.List;
import java.util.Objects;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
+import java.util.stream.Collectors;
import javax.annotation.Nullable;
public class Property
@@ -75,6 +79,7 @@
public T withTimeout(Duration timeout)
{
this.timeout = timeout;
+ this.pure = false;
return (T) this;
}
@@ -168,7 +173,10 @@
}
try
{
- return value.toString();
+ String result = value.toString();
+ if (result != null && result.length() > 100 && value instanceof Collection)
+ result = ((Collection<?>) value).stream().map(o -> "\n\t " + o).collect(Collectors.joining(",", "[", "]"));
+ return result;
}
catch (Throwable t)
{
@@ -176,7 +184,7 @@
}
}
- private static String propertyError(Common<?> input, Throwable cause, Object... values)
+ private static StringBuilder propertyErrorCommon(Common<?> input, Throwable cause)
{
StringBuilder sb = new StringBuilder();
// return "Seed=" + seed + "\nExamples=" + examples;
@@ -194,15 +202,33 @@
msg = cause.getClass().getCanonicalName();
sb.append(msg).append('\n');
}
+ return sb;
+ }
+
+ private static String propertyError(Common<?> input, Throwable cause, Object... values)
+ {
+ StringBuilder sb = propertyErrorCommon(input, cause);
if (values != null)
{
sb.append("Values:\n");
for (int i = 0; i < values.length; i++)
- sb.append('\t').append(i).append(" = ").append(normalizeValue(values[i])).append('\n');
+ sb.append('\t').append(i).append(" = ").append(normalizeValue(values[i])).append(": ").append(values[i] == null ? "unknown type" : values[i].getClass().getCanonicalName()).append('\n');
}
return sb.toString();
}
+ private static String statefulPropertyError(StatefulBuilder input, Throwable cause, Object state, List<String> history)
+ {
+ StringBuilder sb = propertyErrorCommon(input, cause);
+ sb.append("Steps: ").append(input.steps).append('\n');
+ sb.append("Values:\n");
+ sb.append("\tState: ").append(state).append(": ").append(state == null ? "unknown type" : state.getClass().getCanonicalName()).append('\n');
+ sb.append("\tHistory:").append('\n');
+ for (var event : history)
+ sb.append("\t\t").append(event).append('\n');
+ return sb.toString();
+ }
+
public interface FailingConsumer<A>
{
void accept(A value) throws Exception;
@@ -380,4 +406,116 @@
{
return new ForBuilder();
}
+
+ public static StatefulBuilder stateful()
+ {
+ return new StatefulBuilder();
+ }
+
+ public static class StatefulBuilder extends Common<StatefulBuilder>
+ {
+ protected int steps = 1000;
+
+ public StatefulBuilder()
+ {
+ examples = 500;
+ }
+
+ public StatefulBuilder withSteps(int steps)
+ {
+ this.steps = steps;
+ return this;
+ }
+
+ @SuppressWarnings("rawtypes")
+ public <State, SystemUnderTest> void check(Commands<State, SystemUnderTest> commands)
+ {
+ RandomSource rs = new DefaultRandom(seed);
+ for (int i = 0; i < examples; i++)
+ {
+ State state = null;
+ List<String> history = new ArrayList<>(steps);
+ try
+ {
+ checkInterrupted();
+
+ state = commands.genInitialState().next(rs);
+ SystemUnderTest sut = commands.createSut(state);
+
+ try
+ {
+ for (int j = 0; j < steps; j++)
+ {
+ Gen<Command<State, SystemUnderTest, ?>> cmdGen = commands.commands(state);
+ Command cmd = cmdGen.next(rs);
+ for (int a = 0; cmd.checkPreconditions(state) != PreCheckResult.Ok && a < 42; a++)
+ {
+ if (a == 41)
+ throw new IllegalArgumentException("Unable to find next command");
+ cmd = cmdGen.next(rs);
+ }
+ history.add(cmd.detailed(state));
+ Object stateResult = cmd.apply(state);
+ cmd.checkPostconditions(state, stateResult,
+ sut, cmd.run(sut));
+ }
+ }
+ finally
+ {
+ commands.destroySut(sut);
+ commands.destroyState(state);
+ }
+ }
+ catch (Throwable t)
+ {
+ throw new PropertyError(statefulPropertyError(this, t, state, history), t);
+ }
+ if (pure)
+ {
+ seed = rs.nextLong();
+ rs.setSeed(seed);
+ }
+ }
+ }
+ }
+
+ public enum PreCheckResult { Ok, Ignore }
+ public interface Command<State, SystemUnderTest, Result>
+ {
+ default PreCheckResult checkPreconditions(State state) {return PreCheckResult.Ok;}
+ Result apply(State state) throws Throwable;
+ Result run(SystemUnderTest sut) throws Throwable;
+ default void checkPostconditions(State state, Result expected,
+ SystemUnderTest sut, Result actual) throws Throwable {}
+ default String detailed(State state) {return this.toString();}
+ }
+
+ public interface UnitCommand<State, SystemUnderTest> extends Command<State, SystemUnderTest, Void>
+ {
+ void applyUnit(State state) throws Throwable;
+ void runUnit(SystemUnderTest sut) throws Throwable;
+
+ @Override
+ default Void apply(State state) throws Throwable
+ {
+ applyUnit(state);
+ return null;
+ }
+
+ @Override
+ default Void run(SystemUnderTest sut) throws Throwable
+ {
+ runUnit(sut);
+ return null;
+ }
+ }
+
+ public interface Commands<State, SystemUnderTest>
+ {
+ Gen<State> genInitialState() throws Throwable;
+ SystemUnderTest createSut(State state) throws Throwable;
+ default void destroyState(State state) throws Throwable {}
+ default void destroySut(SystemUnderTest sut) throws Throwable {}
+ Gen<Command<State, SystemUnderTest, ?>> commands(State state) throws Throwable;
+ }
}
diff --git a/accord-core/src/test/java/accord/utils/SearchableRangeListTest.java b/accord-core/src/test/java/accord/utils/SearchableRangeListTest.java
new file mode 100644
index 0000000..d31c95c
--- /dev/null
+++ b/accord-core/src/test/java/accord/utils/SearchableRangeListTest.java
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package accord.utils;
+
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.List;
+import java.util.function.BiConsumer;
+
+import org.junit.jupiter.api.Test;
+
+import accord.impl.IntKey;
+import accord.primitives.Range;
+import org.assertj.core.api.Assertions;
+
+import static accord.utils.Property.qt;
+
+class SearchableRangeListTest
+{
+ @Test
+ public void fullWorld()
+ {
+ int numRanges = 1000;
+ List<Range> ranges = new ArrayList<>(numRanges);
+ for (int i = 0; i < numRanges; i++)
+ ranges.add(IntKey.range(i, i + 1));
+
+ SearchableRangeList list = SearchableRangeList.build(ranges.toArray(new Range[0]));
+ class Counter { int value;}
+ BiConsumer<Integer, Integer> test = (rangeStart, rangeEnd) -> {
+ Counter counter = new Counter();
+ list.forEach(IntKey.range(rangeStart, rangeEnd), (a, b, c, d, e) -> {
+ counter.value++;
+ }, (a, b, c, d, start, end) -> {
+ counter.value += (end - start + 1);
+ }, 0, 0, 0, 0, 0);
+ Assertions.assertThat(counter.value).isEqualTo(rangeEnd - rangeStart + 1);
+ };
+ for (int i = 0; i < numRanges; i++)
+ test.accept(i, numRanges);
+ for (int i = 0; i < numRanges; i++)
+ test.accept(0, numRanges - i);
+ }
+
+ @Test
+ public void random()
+ {
+ qt().check(rs -> {
+ int numRanges = rs.nextInt(1000, 10000);
+ List<Range> ranges = new ArrayList<>(numRanges);
+ for (int i = 0; i < numRanges; i++)
+ {
+ int start = rs.nextInt(Integer.MIN_VALUE, Integer.MAX_VALUE - 1000);
+ int offset = rs.nextInt(1, 1000);
+ ranges.add(IntKey.range(start, start + offset));
+ }
+ ranges.sort(Comparator.comparing(Range::start));
+
+ SearchableRangeList list = SearchableRangeList.build(ranges.toArray(new Range[0]));
+ for (int i = 0; i < 1000; i++)
+ {
+ Range range;
+ int selection = rs.nextInt(0, 3);
+ switch (selection)
+ {
+ case 0:
+ range = rs.pick(ranges);
+ break;
+ case 1:
+ int rangeStart = rs.nextInt(Integer.MIN_VALUE, Integer.MAX_VALUE - 1000);
+ int offset = rs.nextInt(1, 1000);
+ range = IntKey.range(rangeStart, rangeStart + offset);
+ break;
+ case 2:
+ int start = rs.nextInt(0, ranges.size());
+ int end = start + rs.nextInt(0, (ranges.size() - start));
+ range = IntKey.range(((IntKey) ranges.get(start).start()).key, ((IntKey) ranges.get(end).end()).key);
+ break;
+ default:
+ throw new IllegalStateException("Unhandled value");
+ }
+ List<Range> expected = new ArrayList<>();
+ for (Range r : ranges)
+ {
+ if (range.compareIntersecting(r) == 0)
+ expected.add(r);
+ }
+ List<Range> actual = new ArrayList<>(expected.size());
+ list.forEach(range, (a, b, c, d, idx) -> {
+ actual.add(list.ranges[idx]);
+ }, (a, b, c, d, start, end) -> {
+ for (int j = start; j < end; j++)
+ actual.add(list.ranges[j]);
+ }, 0, 0, 0, 0, 0);
+
+ Assertions.assertThat(actual).isEqualTo(expected);
+ }
+ });
+ }
+}
\ No newline at end of file
diff --git a/buildSrc/src/main/groovy/accord.java-conventions.gradle b/buildSrc/src/main/groovy/accord.java-conventions.gradle
index f77a0c0..32cd21b 100644
--- a/buildSrc/src/main/groovy/accord.java-conventions.gradle
+++ b/buildSrc/src/main/groovy/accord.java-conventions.gradle
@@ -29,7 +29,7 @@
}
compileJava {
- sourceCompatibility = JavaVersion.VERSION_1_8
+ sourceCompatibility = JavaVersion.VERSION_11
dependsOn(':rat')
}