| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package accord.primitives; |
| |
| import accord.api.Key; |
| import accord.api.RoutingKey; |
| import accord.utils.ArrayBuffers; |
| import accord.utils.IndexedBiConsumer; |
| import accord.utils.IndexedTriConsumer; |
| import accord.utils.RelationMultiMap; |
| import accord.utils.SortedArrays.SortedArrayList; |
| import accord.utils.SymmetricComparator; |
| import accord.utils.TriFunction; |
| |
| import java.util.*; |
| import java.util.function.BiConsumer; |
| import java.util.function.Consumer; |
| import java.util.function.Function; |
| import java.util.function.Predicate; |
| import java.util.stream.Stream; |
| |
| import static accord.utils.ArrayBuffers.*; |
| import static accord.utils.Invariants.illegalState; |
| import static accord.utils.RelationMultiMap.*; |
| import static accord.utils.SortedArrays.Search.FAST; |
| |
| // TODO (desired, consider): switch to RoutingKey? Would mean adopting execution dependencies less precisely, but saving ser/deser of large keys |
| // TODO (expected): consider which projection we should default to on de/serialise |
| /** |
| * A collection of dependencies for a transaction, organised by the key the dependency is adopted via. |
| * An inverse map from TxnId to Key may also be constructed and stored in this collection. |
| */ |
| public class KeyDeps implements Iterable<Map.Entry<Key, TxnId>> |
| { |
| public static final KeyDeps NONE = new KeyDeps(Keys.EMPTY, NO_TXNIDS, NO_INTS); |
| |
| public static class SerializerSupport |
| { |
| private SerializerSupport() {} |
| |
| public static int keysToTxnIdsCount(KeyDeps deps) |
| { |
| return deps.keysToTxnIds.length; |
| } |
| |
| public static int keysToTxnIds(KeyDeps deps, int idx) |
| { |
| return deps.keysToTxnIds[idx]; |
| } |
| |
| public static KeyDeps create(Keys keys, TxnId[] txnIds, int[] keyToTxnId) |
| { |
| return new KeyDeps(keys, txnIds, keyToTxnId); |
| } |
| } |
| |
| public static KeyDeps none(Keys keys) |
| { |
| int[] keysToTxnId = new int[keys.size()]; |
| Arrays.fill(keysToTxnId, keys.size()); |
| return new KeyDeps(keys, NO_TXNIDS, keysToTxnId); |
| } |
| |
| /** |
| * Expects Command to be provided in TxnId order |
| */ |
| public static Builder builder() |
| { |
| return new Builder(); |
| } |
| |
| public static class Builder extends AbstractBuilder<Key, TxnId, KeyDeps> |
| { |
| public Builder() |
| { |
| super(ADAPTER); |
| } |
| |
| @Override |
| protected KeyDeps none() |
| { |
| return KeyDeps.NONE; |
| } |
| |
| @Override |
| protected KeyDeps build(Key[] keys, TxnId[] txnIds, int[] keysToTxnIds) |
| { |
| return new KeyDeps(Keys.ofSortedUnique(keys), txnIds, keysToTxnIds); |
| } |
| } |
| |
| public static LinearMerger<Key, TxnId, KeyDeps> newMerger() |
| { |
| return new LinearMerger<>(ADAPTER); |
| } |
| |
| public static <T1, T2> KeyDeps merge(List<T1> merge, Function<T1, T2> getter1, Function<T2, KeyDeps> getter2) |
| { |
| try (LinearMerger<Key, TxnId, KeyDeps> linearMerger = newMerger()) |
| { |
| int mergeIndex = 0, mergeSize = merge.size(); |
| while (mergeIndex < mergeSize) |
| { |
| T2 intermediate = getter1.apply(merge.get(mergeIndex++)); |
| if (intermediate == null) |
| continue; |
| |
| KeyDeps deps = getter2.apply(intermediate); |
| if (deps == null || deps.isEmpty()) |
| continue; |
| |
| linearMerger.update(deps, deps.keys.keys, deps.txnIds, deps.keysToTxnIds); |
| } |
| |
| return linearMerger.get(KeyDeps::new, NONE); |
| } |
| } |
| |
| public static KeyDeps merge(Stream<KeyDeps> merge) |
| { |
| try (LinearMerger<Key, TxnId, KeyDeps> linearMerger = newMerger()) |
| { |
| merge.forEach(deps -> { |
| if (!deps.isEmpty()) |
| linearMerger.update(deps, deps.keys.keys, deps.txnIds, deps.keysToTxnIds); |
| }); |
| |
| return linearMerger.get(KeyDeps::new, NONE); |
| } |
| } |
| |
| final Keys keys; // unique Keys |
| final TxnId[] txnIds; // unique TxnId |
| |
| /** |
| * This represents a map of {@code Key -> [TxnId] } where each TxnId is actually a pointer into the txnIds array. |
| * The beginning of the array (the first keys.size() entries) are offsets into this array. |
| * <p/> |
| * Example: |
| * <p/> |
| * {@code |
| * int keyIdx = keys.indexOf(key); |
| * int startOfTxnOffset = keyIdx == 0 ? keys.size() : keyToTxnId[keyIdx - 1]; |
| * int endOfTxnOffset = keyToTxnId[keyIdx]; |
| * for (int i = startOfTxnOffset; i < endOfTxnOffset; i++) |
| * { |
| * TxnId id = txnIds[keyToTxnId[i]] |
| * ... |
| * } |
| * } |
| */ |
| // TODO (expected): support deserializing to one or the other |
| int[] keysToTxnIds; // Key -> [TxnId] |
| int[] txnIdsToKeys; // TxnId -> [Key] |
| |
| KeyDeps(Key[] keys, TxnId[] txnIds, int[] keysToTxnIds) |
| { |
| this(Keys.ofSortedUnique(keys), txnIds, keysToTxnIds); |
| } |
| |
| KeyDeps(Keys keys, TxnId[] txnIds, int[] keysToTxnIds) |
| { |
| this.keys = keys; |
| this.txnIds = txnIds; |
| this.keysToTxnIds = keysToTxnIds; |
| if (!(keys.isEmpty() || keysToTxnIds[keys.size() - 1] == keysToTxnIds.length)) |
| throw new IllegalArgumentException(String.format("Last key (%s) in keyToTxnId does not point (%d) to the end of the array (%d);\nkeyToTxnId=%s", keys.get(keys.size() - 1), keysToTxnIds[keys.size() - 1], keysToTxnIds.length, Arrays.toString(keysToTxnIds))); |
| checkValid(keys.keys, txnIds, keysToTxnIds); |
| } |
| |
| public KeyDeps slice(Ranges ranges) |
| { |
| if (isEmpty()) |
| return new KeyDeps(keys, txnIds, keysToTxnIds); |
| |
| // TODO (low priority, efficiency): can slice in parallel with selecting keyToTxnId contents to avoid duplicate merging |
| Keys select = keys.slice(ranges); |
| |
| if (select.isEmpty()) |
| return new KeyDeps(Keys.EMPTY, NO_TXNIDS, NO_INTS); |
| |
| if (select.size() == keys.size()) |
| return this; |
| |
| int i = 0; |
| int offset = select.size(); |
| for (int j = 0 ; j < select.size() ; ++j) |
| { |
| int findi = keys.findNext(i, select.get(j), FAST); |
| if (findi < 0) |
| continue; |
| |
| i = findi; |
| offset += keysToTxnIds[i] - (i == 0 ? keys.size() : keysToTxnIds[i - 1]); |
| } |
| |
| int[] src = keysToTxnIds; |
| int[] trg = new int[offset]; |
| |
| i = 0; |
| offset = select.size(); |
| for (int j = 0 ; j < select.size() ; ++j) |
| { |
| int findi = keys.findNext(i, select.get(j), FAST); |
| if (findi >= 0) |
| { |
| i = findi; |
| int start = i == 0 ? keys.size() : src[i - 1]; |
| int count = src[i] - start; |
| System.arraycopy(src, start, trg, offset, count); |
| offset += count; |
| } |
| trg[j] = offset; |
| } |
| |
| TxnId[] txnIds = trimUnusedValues(select.keys, this.txnIds, trg, TxnId[]::new); |
| return new KeyDeps(select, txnIds, trg); |
| } |
| |
| public KeyDeps with(KeyDeps that) |
| { |
| if (isEmpty() || that.isEmpty()) |
| return isEmpty() ? that : this; |
| |
| return linearUnion( |
| this.keys.keys, this.keys.keys.length, this.txnIds, this.txnIds.length, this.keysToTxnIds, this.keysToTxnIds.length, |
| that.keys.keys, that.keys.keys.length, that.txnIds, that.txnIds.length, that.keysToTxnIds, that.keysToTxnIds.length, |
| Key::compareTo, TxnId::compareTo, |
| cachedKeys(), cachedTxnIds(), cachedInts(), |
| (keys, keysLength, txnIds, txnIdsLength, out, outLength) -> |
| new KeyDeps(Keys.ofSortedUnchecked(cachedKeys().complete(keys, keysLength)), |
| cachedTxnIds().complete(txnIds, txnIdsLength), |
| cachedInts().complete(out, outLength)) |
| ); |
| } |
| |
| public KeyDeps without(Predicate<TxnId> remove) |
| { |
| return remove(this, keys.keys, txnIds, keysToTxnIds, remove, |
| NONE, TxnId[]::new, keys, KeyDeps::new); |
| } |
| |
| public boolean contains(TxnId txnId) |
| { |
| return Arrays.binarySearch(txnIds, txnId) >= 0; |
| } |
| |
| public boolean intersects(TxnId txnId, Ranges ranges) |
| { |
| int txnIdx = Arrays.binarySearch(txnIds, txnId); |
| if (txnIdx < 0) |
| return false; |
| |
| int[] txnIdsToKeys = txnIdsToKeys(); |
| |
| int start = txnIdx == 0 ? txnIds.length : txnIdsToKeys[txnIdx - 1]; |
| int end = txnIdsToKeys[txnIdx]; |
| if (start == end) |
| return false; |
| |
| int li = start, ri = 0; |
| while (li < end && ri < ranges.size()) |
| { |
| ri = ranges.findNext(ri, keys.get(txnIdsToKeys[li]), FAST); |
| if (ri >= 0) return true; |
| ri = -1 - ri; |
| ++li; |
| } |
| return false; |
| } |
| |
| // return true iff we map any keys to any txnId |
| // if the mapping is empty we return false, whether or not we have any keys or txnId by themselves |
| public boolean isEmpty() |
| { |
| return keysToTxnIds.length == keys.size(); |
| } |
| |
| public Keys participatingKeys(TxnId txnId) |
| { |
| int txnIdx = Arrays.binarySearch(txnIds, txnId); |
| if (txnIdx < 0) |
| return Keys.EMPTY; |
| |
| return participatingKeys(txnIdx); |
| } |
| |
| public Keys participatingKeys(int txnIdx) |
| { |
| int[] txnIdsToKeys = txnIdsToKeys(); |
| |
| int start = txnIdx == 0 ? txnIds.length : txnIdsToKeys[txnIdx - 1]; |
| int end = txnIdsToKeys[txnIdx]; |
| if (start == end) |
| return Keys.EMPTY; |
| |
| Key[] result = new Key[end - start]; |
| for (int i = start ; i < end ; ++i) |
| result[i - start] = keys.get(txnIdsToKeys[i]); |
| return Keys.of(result); |
| } |
| |
| // TODO (desired): consider optionally not inverting before answering, as a single txnId may be answered more efficiently without inversion |
| public RoutingKeys participants(TxnId txnId) |
| { |
| int txnIdIndex = Arrays.binarySearch(txnIds, txnId); |
| if (txnIdIndex < 0) |
| throw illegalState("Cannot create a RouteFragment without any keys"); |
| |
| int[] txnIdsToKeys = txnIdsToKeys(); |
| |
| int start = txnIdIndex == 0 ? txnIds.length : txnIdsToKeys[txnIdIndex - 1]; |
| int end = txnIdsToKeys[txnIdIndex]; |
| if (start == end) |
| throw illegalState("Cannot create a RouteFragment without any keys"); |
| |
| RoutingKey[] result = new RoutingKey[end - start]; |
| result[0] = keys.get(txnIdsToKeys[start]).toUnseekable(); |
| int resultCount = 1; |
| for (int i = start + 1 ; i < end ; ++i) |
| { |
| RoutingKey next = keys.get(txnIdsToKeys[i]).toUnseekable(); |
| if (!next.equals(result[resultCount - 1])) |
| result[resultCount++] = next; |
| } |
| |
| if (resultCount < result.length) |
| result = Arrays.copyOf(result, resultCount); |
| return new RoutingKeys(result); |
| } |
| |
| int[] txnIdsToKeys() |
| { |
| if (txnIdsToKeys == null) |
| txnIdsToKeys = invert(keysToTxnIds, keysToTxnIds.length, keys.size(), txnIds.length); |
| return txnIdsToKeys; |
| } |
| |
| int[] keysToTxnIds() |
| { |
| if (keysToTxnIds == null) |
| keysToTxnIds = invert(txnIdsToKeys, txnIdsToKeys.length, txnIds.length, keys.size()); |
| return keysToTxnIds; |
| } |
| |
| public void forEach(Ranges ranges, BiConsumer<Key, TxnId> forEach) |
| { |
| int[] keysToTxnIds = keysToTxnIds(); |
| Routables.foldl(keys, ranges, (key, value, index) -> { |
| for (int t = startOffset(index), end = endOffset(index); t < end ; ++t) |
| { |
| TxnId txnId = txnIds[keysToTxnIds[t]]; |
| forEach.accept(key, txnId); |
| } |
| return null; |
| }, null); |
| } |
| |
| /** |
| * For each {@link TxnId} that references a key within the {@link Ranges}; the {@link TxnId} will be seen exactly once. |
| * @param ranges to match on |
| * @param forEach function to call on each unique {@link TxnId} |
| */ |
| public void forEachUniqueTxnId(Ranges ranges, Consumer<TxnId> forEach) |
| { |
| if (txnIds.length == 0) |
| return; |
| |
| int[] keysToTxnIds = keysToTxnIds(); |
| if (txnIds.length <= 64) |
| { |
| long bitset = Routables.foldl(keys, ranges, (key, ignore, value, keyIndex) -> { |
| int index = startOffset(keyIndex); |
| int end = endOffset(keyIndex); |
| |
| while (index < end) |
| { |
| long next = keysToTxnIds[index++]; |
| if (next >= 64) |
| break; |
| value |= 1L << next; |
| } |
| |
| return value; |
| }, 0, 0, -1L >>> (64 - txnIds.length)); |
| |
| while (bitset != 0) |
| { |
| int i = Long.numberOfTrailingZeros(bitset); |
| TxnId txnId = txnIds[i]; |
| forEach.accept(txnId); |
| bitset ^= Long.lowestOneBit(bitset); |
| } |
| } |
| else |
| { |
| BitSet bitset = Routables.foldl(keys, ranges, (key, value, keyIndex) -> { |
| int index = startOffset(keyIndex); |
| int end = endOffset(keyIndex); |
| while (index < end) |
| value.set(keysToTxnIds[index++]); |
| return value; |
| }, new BitSet(txnIds.length)); |
| |
| int i = -1; |
| while ((i = bitset.nextSetBit(i + 1)) >= 0) |
| forEach.accept(txnIds[i]); |
| } |
| } |
| |
| public void forEach(Key key, Consumer<TxnId> forEach) |
| { |
| int keyIndex = keys.indexOf(key); |
| if (keyIndex < 0) |
| return; |
| |
| int[] keysToTxnIds = keysToTxnIds(); |
| int index = startOffset(keyIndex); |
| int end = endOffset(keyIndex); |
| while (index < end) |
| forEach.accept(txnIds[keysToTxnIds[index++]]); |
| } |
| |
| public <P1, P2> void forEach(Ranges ranges, int inclIdx, int exclIdx, P1 p1, P2 p2, IndexedBiConsumer<P1, P2> forEach) |
| { |
| for (int i = 0; i < ranges.size(); ++i) |
| forEach(ranges.get(i), inclIdx, exclIdx, p1, p2, forEach); |
| } |
| |
| public <P1, P2> void forEach(Range range, P1 p1, P2 p2, IndexedBiConsumer<P1, P2> forEach) |
| { |
| forEach(range, 0, keys.size(), p1, p2, forEach); |
| } |
| |
| public <P1, P2> void forEach(Range range, int inclKeyIdx, int exclKeyIdx, P1 p1, P2 p2, IndexedBiConsumer<P1, P2> forEach) |
| { |
| forEach(range, inclKeyIdx, exclKeyIdx, forEach, p1, p2, IndexedBiConsumer::accept); |
| } |
| |
| public <P1, P2, P3> void forEach(Range range, P1 p1, P2 p2, P3 p3, IndexedTriConsumer<P1, P2, P3> forEach) |
| { |
| forEach(range, 0, keys.size(), p1, p2, p3, forEach); |
| } |
| |
| public <P1, P2, P3> void forEach(Range range, int inclKeyIdx, int exclKeyIdx, P1 p1, P2 p2, P3 p3, IndexedTriConsumer<P1, P2, P3> forEach) |
| { |
| int[] keysToTxnIds = keysToTxnIds(); |
| int start = keys.indexOf(range.start()); |
| if (start < 0) start = -1 - start; |
| else if (!range.startInclusive()) ++start; |
| start = startOffset(start); |
| |
| int end = keys.indexOf(range.end()); |
| if (end < 0) end = -1 - end; |
| else if (range.endInclusive()) ++end; |
| end = startOffset(end); |
| |
| while (start < end) |
| { |
| int txnIdx = keysToTxnIds[start++]; |
| if (txnIdx >= inclKeyIdx && txnIdx < exclKeyIdx) |
| forEach.accept(p1, p2, p3, txnIdx); |
| } |
| } |
| |
| public <P1, V> V foldEachKey(int txnIdx, P1 p1, V accumulate, TriFunction<P1, Key, V, V> fold) |
| { |
| int[] txnIdsToKeys = txnIdsToKeys(); |
| |
| int start = txnIdx == 0 ? txnIds.length : txnIdsToKeys[txnIdx - 1]; |
| int end = txnIdsToKeys[txnIdx]; |
| for (int i = start; i < end ; ++i) |
| accumulate = fold.apply(p1, keys.get(txnIdsToKeys[i]), accumulate); |
| return accumulate; |
| } |
| |
| public Keys keys() |
| { |
| return keys; |
| } |
| |
| public int txnIdCount() |
| { |
| return txnIds.length; |
| } |
| |
| public int totalCount() |
| { |
| return keysToTxnIds.length - keys.size(); |
| } |
| |
| public TxnId txnId(int i) |
| { |
| return txnIds[i]; |
| } |
| |
| public SortedArrayList<TxnId> txnIds() |
| { |
| return new SortedArrayList<>(txnIds); |
| } |
| |
| public SortedRelationList<TxnId> txnIds(Key key) |
| { |
| int keyIndex = keys.indexOf(key); |
| if (keyIndex < 0) |
| return SortedRelationList.EMPTY; |
| |
| return txnIdsForKeyIndex(keyIndex); |
| } |
| |
| public SortedRelationList<TxnId> txnIdsForKeyIndex(int keyIndex) |
| { |
| int[] keysToTxnIds = keysToTxnIds(); |
| int start = startOffset(keyIndex); |
| int end = endOffset(keyIndex); |
| return txnIds(keysToTxnIds, start, end); |
| } |
| |
| @SuppressWarnings("unchecked") |
| public SortedRelationList<TxnId> txnIds(Range range) |
| { |
| int startIndex = keys.indexOf(range.start()); |
| if (startIndex < 0) startIndex = -1 - startIndex; |
| else if (!range.startInclusive()) ++startIndex; |
| |
| int endIndex = keys.indexOf(range.end()); |
| if (endIndex < 0) endIndex = -1 - endIndex; |
| else if (range.endInclusive()) ++endIndex; |
| |
| if (startIndex == endIndex) |
| return SortedRelationList.EMPTY; |
| |
| int[] keysToTxnIds = keysToTxnIds(); |
| int maxLength = Math.min(txnIds.length, startOffset(endIndex) - startOffset(startIndex)); |
| int[] scratch = cachedInts().getInts(maxLength); |
| int count = 0; |
| for (int i = startIndex ; i < endIndex ; ++i) |
| { |
| int ri = startOffset(i), re = endOffset(i); |
| if (ri == re) |
| continue; |
| |
| System.arraycopy(scratch, 0, scratch, maxLength - count, count); |
| int li = maxLength - count, le = maxLength; |
| count = 0; |
| while (li < le && ri < re) |
| { |
| int c = scratch[li] - keysToTxnIds[ri]; |
| if (c <= 0) |
| { |
| scratch[count++] = scratch[li++]; |
| if (c == 0) ++ri; |
| } |
| else |
| { |
| scratch[count++] = keysToTxnIds[ri++]; |
| } |
| } |
| while (li < le) |
| scratch[count++] = scratch[li++]; |
| while (ri < re) |
| scratch[count++] = keysToTxnIds[ri++]; |
| |
| if (count == maxLength) |
| break; |
| } |
| |
| int[] ids = cachedInts().completeAndDiscard(scratch, count); |
| return txnIds(ids, 0, count); |
| } |
| |
| @SuppressWarnings("unchecked") |
| private SortedRelationList<TxnId> txnIds(int[] ids, int start, int end) |
| { |
| if (start == end) |
| return SortedRelationList.EMPTY; |
| |
| return new SortedRelationList<>(txnIds, ids, start, end); |
| } |
| |
| private int startOffset(int keyIndex) |
| { |
| return keyIndex == 0 ? keys.size() : keysToTxnIds[keyIndex - 1]; |
| } |
| |
| private int endOffset(int keyIndex) |
| { |
| return keysToTxnIds[keyIndex]; |
| } |
| |
| public boolean equals(Object that) |
| { |
| return this == that || (that instanceof KeyDeps && equals((KeyDeps)that)); |
| } |
| |
| public boolean equals(KeyDeps that) |
| { |
| return testEquality(this.keys.keys, this.txnIds, this.keysToTxnIds, that.keys.keys, that.txnIds, that.keysToTxnIds); |
| } |
| |
| @Override |
| public Iterator<Map.Entry<Key, TxnId>> iterator() |
| { |
| return newIterator(keys.keys, txnIds, keysToTxnIds); |
| } |
| |
| @Override |
| public String toString() |
| { |
| return toSimpleString(keys.keys, txnIds, keysToTxnIds); |
| } |
| |
| public String toBriefString() |
| { |
| return RelationMultiMap.toBriefString(keys.keys, txnIds); |
| } |
| |
| private static final KeyDepsAdapter ADAPTER = new KeyDepsAdapter(); |
| static class KeyDepsAdapter implements Adapter<Key, TxnId> |
| { |
| @Override |
| public final SymmetricComparator<? super Key> keyComparator() |
| { |
| return Key::compareTo; |
| } |
| |
| @Override |
| public final SymmetricComparator<? super TxnId> valueComparator() |
| { |
| return TxnId::compareTo; |
| } |
| |
| @Override |
| public final ObjectBuffers<Key> cachedKeys() |
| { |
| return ArrayBuffers.cachedKeys(); |
| } |
| |
| @Override |
| public final ObjectBuffers<TxnId> cachedValues() |
| { |
| return ArrayBuffers.cachedTxnIds(); |
| } |
| } |
| } |