| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package accord.primitives; |
| |
| import accord.api.Key; |
| import accord.utils.*; |
| import accord.utils.RelationMultiMap.AbstractBuilder; |
| import accord.utils.RelationMultiMap.Adapter; |
| import net.nicoulaj.compilecommand.annotations.DontInline; |
| import net.nicoulaj.compilecommand.annotations.Inline; |
| |
| import javax.annotation.Nonnull; |
| import javax.annotation.Nullable; |
| import java.util.*; |
| import java.util.function.Consumer; |
| import java.util.function.Function; |
| import java.util.function.Predicate; |
| |
| import static accord.utils.ArrayBuffers.*; |
| import static accord.utils.RelationMultiMap.*; |
| import static accord.utils.RelationMultiMap.remove; |
| import static accord.utils.SortedArrays.Search.CEIL; |
| |
| /** |
| * <p>Maintains a lazily-constructed, bidirectional map between Range and TxnId. |
| * <p>Ranges are stored sorted by start then end, and indexed by a secondary {@link SearchableRangeList} structure. |
| * <p>The relationship between Range and TxnId is maintained via {@code int[]} utilising {@link RelationMultiMap} |
| * functionality. |
| */ |
| public class RangeDeps implements Iterable<Map.Entry<Range, TxnId>> |
| { |
| public static class SerializerSupport |
| { |
| private SerializerSupport() {} |
| |
| public static int rangesToTxnIdsCount(RangeDeps deps) |
| { |
| return deps.rangesToTxnIds.length; |
| } |
| |
| public static int rangesToTxnIds(RangeDeps deps, int idx) |
| { |
| return deps.rangesToTxnIds[idx]; |
| } |
| |
| public static RangeDeps create(Range[] ranges, TxnId[] txnIds, int[] rangesToTxnIds) |
| { |
| return new RangeDeps(ranges, txnIds, rangesToTxnIds); |
| } |
| } |
| |
| private static final Range[] NO_RANGES = new Range[0]; |
| public static final RangeDeps NONE = new RangeDeps(new Range[0], new TxnId[0], new int[0], new int[0]); |
| |
| final TxnId[] txnIds; |
| // the list of ranges and their mappings to txnIds |
| // unique, and sorted by start() |
| final Range[] ranges; |
| /** |
| * See {@link RelationMultiMap}. |
| * TODO consider alternative layout depending on real-world data distributions: |
| * if most ranges have at most TxnId (or vice-versa) might be better to use negative values |
| * to index into the dynamic portion of the array. We started with this, but decided it was |
| * hard to justify the extra work for two layouts for the moment. |
| */ |
| final int[] rangesToTxnIds; |
| int[] txnIdsToRanges; |
| |
| private SearchableRangeList searchable; |
| |
| public static <T1, T2> RangeDeps merge(List<T1> merge, Function<T1, T2> getter1, Function<T2, RangeDeps> getter2) |
| { |
| try (LinearMerger<Range, TxnId, RangeDeps> linearMerger = new LinearMerger<>(ADAPTER)) |
| { |
| int mergeIndex = 0, mergeSize = merge.size(); |
| while (mergeIndex < mergeSize) |
| { |
| T2 intermediate = getter1.apply(merge.get(mergeIndex++)); |
| if (intermediate == null) |
| continue; |
| |
| RangeDeps deps = getter2.apply(intermediate); |
| if (deps == null || deps.isEmpty()) |
| continue; |
| |
| linearMerger.update(deps, deps.ranges, deps.txnIds, deps.rangesToTxnIds); |
| } |
| |
| return linearMerger.get(RangeDeps::new, NONE); |
| } |
| } |
| |
| private RangeDeps(Range[] ranges, TxnId[] txnIds, int[] rangesToTxnIds) |
| { |
| this(ranges, txnIds, rangesToTxnIds, null); |
| } |
| |
| private RangeDeps(Range[] ranges, TxnId[] txnIds, int[] rangesToTxnIds, int[] txnIdsToRanges) |
| { |
| Invariants.checkArgument(rangesToTxnIds.length >= ranges.length); |
| Invariants.checkArgument(ranges.length > 0 || rangesToTxnIds.length == 0); |
| Invariants.paranoid(SortedArrays.isSorted(ranges, Range::compare)); |
| this.ranges = ranges; |
| this.txnIds = txnIds; |
| this.rangesToTxnIds = rangesToTxnIds; |
| this.txnIdsToRanges = txnIdsToRanges; |
| } |
| |
| @Inline |
| public <P1, P2, P3> int forEach(Key key, IndexedTriConsumer<P1, P2, P3> forEachScanOrCheckpoint, IndexedRangeTriConsumer<P1, P2, P3> forEachRange, P1 p1, P2 p2, P3 p3, int minIndex) |
| { |
| return ensureSearchable().forEach(key, forEachScanOrCheckpoint, forEachRange, p1, p2, p3, minIndex); |
| } |
| |
| private int forEach(Key key, Consumer<TxnId> forEach, int minIndex, @Nullable BitSet visited) |
| { |
| return forEach(key, RangeDeps::visitTxnIdsForRangeIndex, RangeDeps::visitTxnIdsForRangeIndex, |
| this, forEach, visited, minIndex); |
| } |
| |
| @Inline |
| public <P1, P2, P3> int forEach(Range range, IndexedTriConsumer<P1, P2, P3> forEachScanOrCheckpoint, IndexedRangeTriConsumer<P1, P2, P3> forEachRange, P1 p1, P2 p2, P3 p3, int minIndex) |
| { |
| return ensureSearchable().forEach(range, forEachScanOrCheckpoint, forEachRange, p1, p2, p3, minIndex); |
| } |
| |
| private <P1, P2, P3> void forEach(Ranges ranges, IndexedTriConsumer<P1, P2, P3> forEachScanOrCheckpoint, IndexedRangeTriConsumer<P1, P2, P3> forEachRange, P1 p1, P2 p2, P3 p3) |
| { |
| int minIndex = 0; |
| for (int i = 0; i < ranges.size() ; ++i) |
| minIndex = forEach(ranges.get(i), forEachScanOrCheckpoint, forEachRange, p1, p2, p3, minIndex); |
| } |
| |
| private int forEach(Range range, Consumer<TxnId> forEach, int minIndex, @Nullable BitSet visited) |
| { |
| return forEach(range, RangeDeps::visitTxnIdsForRangeIndex, RangeDeps::visitTxnIdsForRangeIndex, |
| this, forEach, visited, minIndex); |
| } |
| |
| private void visitTxnIdsForRangeIndex(Consumer<TxnId> forEach, @Nullable BitSet visited, int rangeIndex) |
| { |
| for (int i = startOffset(ranges, rangesToTxnIds, rangeIndex), end = endOffset(rangesToTxnIds, rangeIndex) ; i < end ; ++i) |
| visitTxnIdx(rangesToTxnIds[i], forEach, visited); |
| } |
| |
| private void visitTxnIdsForRangeIndex(Consumer<TxnId> forEach, @Nullable BitSet visited, int start, int end) |
| { |
| if (end == 0) |
| return; |
| for (int i = startOffset(ranges, rangesToTxnIds, start) ; i < endOffset(rangesToTxnIds, end - 1) ; ++i) |
| visitTxnIdx(rangesToTxnIds[i], forEach, visited); |
| } |
| |
| // TODO (low priority, efficiency): ideally we would accept something like a BitHashSet or IntegerTrie |
| // as O(N) space needed for BitSet here (but with a very low constant multiplier) |
| private void visitTxnIdx(int txnIdx, Consumer<TxnId> forEach, @Nullable BitSet visited) |
| { |
| if (visited == null || !visited.get(txnIdx)) |
| { |
| if (visited != null) |
| visited.set(txnIdx); |
| forEach.accept(txnIds[txnIdx]); |
| } |
| } |
| |
| /** |
| * Each matching TxnId will be provided precisely once |
| */ |
| public void forEachUniqueTxnId(Key key, Consumer<TxnId> forEach) |
| { |
| forEach(key, forEach, 0, new BitSet()); |
| } |
| |
| /** |
| * The same TxnId may be provided as a parameter multiple times |
| */ |
| public void forEach(Range range, Consumer<TxnId> forEach) |
| { |
| forEach(range, forEach, 0, null); |
| } |
| |
| /** |
| * The same TxnId may be provided as a parameter multiple times |
| */ |
| public void forEach(Ranges ranges, Consumer<TxnId> forEach) |
| { |
| int minIndex = 0; |
| for (int i = 0; i < ranges.size() ; ++i) |
| minIndex = forEach(ranges.get(i), forEach, minIndex, null); |
| } |
| |
| /** |
| * Each matching TxnId will be provided precisely once |
| */ |
| public void forEachUniqueTxnId(Range range, Consumer<TxnId> forEach) |
| { |
| forEach(range, forEach, 0, new BitSet()); |
| } |
| |
| /** |
| * Each matching TxnId will be provided precisely once |
| * |
| * @param ranges to match on |
| * @param forEach function to call on each unique {@link TxnId} |
| */ |
| public void forEachUniqueTxnId(Ranges ranges, Consumer<TxnId> forEach) |
| { |
| int minIndex = 0; |
| for (int i = 0; i < ranges.size() ; ++i) |
| minIndex = forEach(ranges.get(i), forEach, minIndex, new BitSet()); |
| } |
| |
| // return true iff we map any ranges to any txnId |
| // if the mapping is empty we return false, whether or not we have any ranges or txnId by themselves |
| public boolean isEmpty() |
| { |
| return RelationMultiMap.isEmpty(ranges, rangesToTxnIds); |
| } |
| |
| public Unseekables<Range, ?> someUnseekables(TxnId txnId) |
| { |
| int txnIdIndex = Arrays.binarySearch(txnIds, txnId); |
| if (txnIdIndex < 0) |
| throw new IllegalStateException("Cannot create a RouteFragment without any keys"); |
| |
| ensureTxnIdToRange(); |
| |
| int start = txnIdIndex == 0 ? txnIds.length : txnIdsToRanges[txnIdIndex - 1]; |
| int end = txnIdsToRanges[txnIdIndex]; |
| if (start == end) |
| throw new IllegalStateException("Cannot create a RouteFragment without any keys"); |
| |
| Range[] result = new Range[end - start]; |
| result[0] = ranges[txnIdsToRanges[start]].toUnseekable(); |
| int resultCount = 1; |
| for (int i = start + 1 ; i < end ; ++i) |
| { |
| Range next = ranges[txnIdsToRanges[i]]; |
| if (!next.equals(result[resultCount - 1])) |
| result[resultCount++] = next; |
| } |
| |
| if (resultCount < result.length) |
| result = Arrays.copyOf(result, resultCount); |
| return new Ranges(result); |
| } |
| |
| void ensureTxnIdToRange() |
| { |
| if (txnIdsToRanges != null) |
| return; |
| |
| txnIdsToRanges = invert(rangesToTxnIds, rangesToTxnIds.length, ranges.length, txnIds.length); |
| } |
| |
| public RangeDeps slice(Ranges select) |
| { |
| if (isEmpty()) |
| return new RangeDeps(NO_RANGES, txnIds, NO_INTS); |
| |
| try (RangeAndMapCollector collector = new RangeAndMapCollector(ensureSearchable().maxScanAndCheckpointMatches)) |
| { |
| forEach(select, collector, collector, ranges, rangesToTxnIds, null); |
| |
| if (collector.rangesCount == 0) |
| return new RangeDeps(NO_RANGES, NO_TXNIDS, NO_INTS); |
| |
| if (collector.rangesCount == this.ranges.length) |
| return this; |
| |
| Range[] ranges = collector.getRanges(); |
| int[] rangesToTxnIds = collector.getRangesToTxnIds(); |
| TxnId[] txnIds = trimUnusedValues(ranges, this.txnIds, rangesToTxnIds, TxnId[]::new); |
| return new RangeDeps(ranges, txnIds, rangesToTxnIds); |
| } |
| } |
| |
| public RangeDeps with(RangeDeps that) |
| { |
| if (isEmpty() || that.isEmpty()) |
| return isEmpty() ? that : this; |
| |
| return linearUnion( |
| this.ranges, this.ranges.length, this.txnIds, this.txnIds.length, this.rangesToTxnIds, this.rangesToTxnIds.length, |
| that.ranges, that.ranges.length, that.txnIds, that.txnIds.length, that.rangesToTxnIds, that.rangesToTxnIds.length, |
| rangeComparator(), TxnId::compareTo, |
| cachedRanges(), cachedTxnIds(), cachedInts(), |
| (ranges, rangesLength, txnIds, txnIdsLength, out, outLength) -> |
| new RangeDeps(cachedRanges().complete(ranges, rangesLength), |
| cachedTxnIds().complete(txnIds, txnIdsLength), |
| cachedInts().complete(out, outLength)) |
| ); |
| } |
| |
| public RangeDeps without(Predicate<TxnId> remove) |
| { |
| return remove(this, ranges, txnIds, rangesToTxnIds, remove, |
| NONE, TxnId[]::new, ranges, RangeDeps::new); |
| } |
| |
| public boolean contains(TxnId txnId) |
| { |
| return Arrays.binarySearch(txnIds, txnId) >= 0; |
| } |
| |
| public boolean isCoveredBy(Ranges covering) |
| { |
| // check that every entry intersects with some entry in covering |
| int prev = 0; |
| for (Range range : covering) |
| { |
| int start = SortedArrays.binarySearch(ranges, 0, ranges.length, range.start(), (a, b) -> a.compareTo(b.start()), CEIL); |
| if (start < 0) start = -1 - start; |
| int end = SortedArrays.binarySearch(ranges, 0, ranges.length, range.end(), (a, b) -> a.compareTo(b.start()), CEIL); |
| if (end < 0) end = -1 - end; |
| for (int i = prev; i < start ; ++i) |
| { |
| if (range.compareIntersecting(ranges[i]) != 0) |
| return false; |
| } |
| prev = end; |
| } |
| return prev == ranges.length; |
| } |
| |
| public List<TxnId> txnIds(Key key) |
| { |
| List<TxnId> result = new ArrayList<>(); |
| forEachUniqueTxnId(key, result::add); |
| result.sort(TxnId::compareTo); |
| return result; |
| } |
| |
| public List<TxnId> txnIds(Range key) |
| { |
| List<TxnId> result = new ArrayList<>(); |
| forEachUniqueTxnId(key, result::add); |
| result.sort(TxnId::compareTo); |
| return result; |
| } |
| |
| public TxnId txnId(int i) |
| { |
| return txnIds[i]; |
| } |
| |
| public int txnIdCount() |
| { |
| return txnIds.length; |
| } |
| |
| public Range range(int i) |
| { |
| return ranges[i]; |
| } |
| |
| public int rangeCount() |
| { |
| return ranges.length; |
| } |
| |
| @Override |
| public boolean equals(Object that) |
| { |
| return this == that || (that instanceof RangeDeps && equals((RangeDeps)that)); |
| } |
| |
| public boolean equals(RangeDeps that) |
| { |
| return testEquality(this.ranges, this.txnIds, this.rangesToTxnIds, that.ranges, that.txnIds, that.rangesToTxnIds); |
| } |
| |
| @Override |
| public String toString() |
| { |
| return RelationMultiMap.toSimpleString(ranges, txnIds, rangesToTxnIds); |
| } |
| |
| @Nonnull |
| @Override |
| public Iterator<Map.Entry<Range, TxnId>> iterator() |
| { |
| return newIterator(ranges, txnIds, rangesToTxnIds); |
| } |
| |
| private SearchableRangeList ensureSearchable() |
| { |
| if (searchable == null) |
| buildSearchable(); |
| return searchable; |
| } |
| |
| @DontInline |
| private void buildSearchable() |
| { |
| searchable = SearchableRangeList.build(ranges); |
| } |
| |
| public boolean isSearchable() |
| { |
| return searchable != null; |
| } |
| |
| static class RangeCollector implements |
| IndexedRangeTriConsumer<Range[], int[], Object>, |
| IndexedTriConsumer<Range[], int[], Object>, |
| AutoCloseable |
| { |
| int[] oooBuffer; |
| Range[] rangesOut; |
| int oooCount, rangesCount; |
| |
| RangeCollector(int maxScanAndCheckpointCount) |
| { |
| oooBuffer = cachedInts().getInts(maxScanAndCheckpointCount); |
| rangesOut = cachedRanges().get(32); |
| } |
| |
| @Override |
| public void accept(Range[] o, int[] o2, Object o3, int index) |
| { |
| oooBuffer[oooCount++] = index; |
| } |
| |
| @Override |
| public void accept(Range[] ranges, int[] rangesToTxnIds, Object o3, int fromIndex, int toIndex) |
| { |
| if (oooCount > 0) |
| { |
| Arrays.sort(oooBuffer, 0, oooCount); |
| int oooCount = Arrays.binarySearch(oooBuffer, 0, this.oooCount, fromIndex); |
| if (oooCount < 0) oooCount = -1 - oooCount; |
| copy(ranges, rangesToTxnIds, oooCount, fromIndex, toIndex); |
| this.oooCount = 0; |
| } |
| else if (fromIndex < toIndex) |
| { |
| copy(ranges, rangesToTxnIds, 0, fromIndex, toIndex); |
| } |
| } |
| |
| protected void copy(Range[] ranges, int[] rangesToTxnIds, int oooCount, int start, int end) |
| { |
| int count = oooCount + (end - start); |
| if (rangesCount + count >= rangesOut.length) |
| rangesOut = cachedRanges().resize(rangesOut, rangesCount, rangesCount + count + (rangesCount /2)); |
| for (int i = 0 ; i < oooCount ; ++i) |
| rangesOut[rangesCount++] = ranges[oooBuffer[i]]; |
| for (int i = start ; i < end ; ++i) |
| rangesOut[rangesCount++] = ranges[i]; |
| } |
| |
| Range[] getRanges() |
| { |
| Range[] result = cachedRanges().completeAndDiscard(rangesOut, rangesCount); |
| rangesOut = null; |
| return result; |
| } |
| |
| @Override |
| public void close() |
| { |
| if (oooBuffer != null) |
| { |
| cachedInts().forceDiscard(oooBuffer); |
| oooBuffer = null; |
| } |
| if (rangesOut != null) |
| { |
| cachedRanges().forceDiscard(rangesOut, rangesCount); |
| rangesOut = null; |
| } |
| } |
| } |
| |
| static class RangeAndMapCollector extends RangeCollector |
| { |
| int[] headers; |
| int[] lists; |
| int headerCount, listOffset; |
| |
| RangeAndMapCollector(int maxScanAndCheckpointCount) |
| { |
| super(maxScanAndCheckpointCount); |
| headers = cachedInts().getInts(32); |
| lists = cachedInts().getInts(32); |
| } |
| |
| @Override |
| protected void copy(Range[] ranges, int[] rangesToTxnIds, int oooCount, int start, int end) |
| { |
| super.copy(ranges, rangesToTxnIds, oooCount, start, end); |
| int count = oooCount + (end - start); |
| if (headerCount + count >= headers.length) |
| headers = cachedInts().resize(headers, headerCount, headerCount + count + (headerCount /2)); |
| for (int i = 0 ; i < oooCount ; ++i) |
| { |
| int ri = oooBuffer[i]; |
| copyToDynamic(rangesToTxnIds, startOffset(ranges, rangesToTxnIds, ri), endOffset(rangesToTxnIds, ri)); |
| headers[headerCount++] = listOffset; |
| } |
| int startOffset = startOffset(ranges, rangesToTxnIds, start); |
| for (int i = start ; i < end ; ++i) |
| headers[this.headerCount++] = listOffset + rangesToTxnIds[i] - startOffset; |
| copyToDynamic(rangesToTxnIds, startOffset, startOffset(ranges, rangesToTxnIds, end)); |
| } |
| |
| protected void copyToDynamic(int[] rangesToTxnIds, int start, int end) |
| { |
| int count = end - start; |
| if (count + listOffset >= lists.length) |
| lists = cachedInts().resize(lists, listOffset, listOffset + (listOffset /2) + count); |
| System.arraycopy(rangesToTxnIds, start, lists, listOffset, count); |
| listOffset += count; |
| } |
| |
| public int[] getRangesToTxnIds() |
| { |
| int[] out = new int[headerCount + listOffset]; |
| for (int i = 0; i < headerCount; ++i) |
| out[i] = headers[i] + headerCount; |
| System.arraycopy(lists, 0, out, headerCount, listOffset); |
| return out; |
| } |
| } |
| |
| public static RangeDeps of(Map<TxnId, Ranges> txnIdRanges) |
| { |
| if (txnIdRanges.isEmpty()) |
| return NONE; |
| |
| try (BuilderByTxnId builder = new BuilderByTxnId()) |
| { |
| for (Map.Entry<TxnId, Ranges> e : txnIdRanges.entrySet()) |
| { |
| builder.nextKey(e.getKey()); |
| Ranges ranges = e.getValue(); |
| for (int i = 0 ; i < ranges.size() ; ++i) |
| builder.add(ranges.get(i)); |
| } |
| return builder.build(); |
| } |
| } |
| |
| public static Builder builder() |
| { |
| return new Builder(); |
| } |
| |
| public static class Builder extends AbstractBuilder<Range, TxnId, RangeDeps> |
| { |
| public Builder() |
| { |
| super(ADAPTER); |
| } |
| |
| @Override |
| protected RangeDeps none() |
| { |
| return RangeDeps.NONE; |
| } |
| |
| @Override |
| protected RangeDeps build(Range[] ranges, TxnId[] txnIds, int[] keyToValue) |
| { |
| return new RangeDeps(ranges, txnIds, keyToValue); |
| } |
| } |
| |
| public static BuilderByTxnId byTxnIdBuilder() |
| { |
| return new BuilderByTxnId(); |
| } |
| |
| public static class BuilderByTxnId extends AbstractBuilder<TxnId, Range, RangeDeps> |
| { |
| public BuilderByTxnId() |
| { |
| super(REVERSE_ADAPTER); |
| } |
| |
| @Override |
| protected RangeDeps none() |
| { |
| return RangeDeps.NONE; |
| } |
| |
| @Override |
| protected RangeDeps build(TxnId[] txnIds, Range[] ranges, int[] txnIdsToRanges) |
| { |
| return new RangeDeps(ranges, txnIds, invert(txnIdsToRanges, txnIdsToRanges.length, txnIds.length, ranges.length), txnIdsToRanges); |
| } |
| } |
| |
| public static SymmetricComparator<? super Range> rangeComparator() |
| { |
| return Range::compare; |
| } |
| |
| private static final RangeDepsAdapter ADAPTER = new RangeDepsAdapter(); |
| private static final class RangeDepsAdapter implements Adapter<Range, TxnId> |
| { |
| @Override |
| public SymmetricComparator<? super Range> keyComparator() |
| { |
| return rangeComparator(); |
| } |
| |
| @Override |
| public SymmetricComparator<? super TxnId> valueComparator() |
| { |
| return TxnId::compareTo; |
| } |
| |
| @Override |
| public ObjectBuffers<Range> cachedKeys() |
| { |
| return ArrayBuffers.cachedRanges(); |
| } |
| |
| @Override |
| public ObjectBuffers<TxnId> cachedValues() |
| { |
| return ArrayBuffers.cachedTxnIds(); |
| } |
| } |
| |
| private static final ReverseRangeDepsAdapter REVERSE_ADAPTER = new ReverseRangeDepsAdapter(); |
| private static final class ReverseRangeDepsAdapter implements Adapter<TxnId, Range> |
| { |
| @Override |
| public SymmetricComparator<? super TxnId> keyComparator() |
| { |
| return TxnId::compareTo; |
| } |
| |
| @Override |
| public SymmetricComparator<? super Range> valueComparator() |
| { |
| return rangeComparator(); |
| } |
| |
| @Override |
| public ObjectBuffers<TxnId> cachedKeys() |
| { |
| return ArrayBuffers.cachedTxnIds(); |
| } |
| |
| @Override |
| public ObjectBuffers<Range> cachedValues() |
| { |
| return ArrayBuffers.cachedRanges(); |
| } |
| } |
| |
| } |