| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package harry.model; |
| |
| import java.util.EnumMap; |
| import java.util.List; |
| import java.util.Map; |
| |
| import harry.core.Configuration; |
| import harry.core.VisibleForTesting; |
| import harry.ddl.ColumnSpec; |
| import harry.ddl.SchemaSpec; |
| import harry.generators.Bytes; |
| import harry.generators.PCGFastPure; |
| import harry.generators.RngUtils; |
| import harry.generators.Surjections; |
| import harry.generators.distribution.Distribution; |
| import harry.util.BitSet; |
| |
| import static harry.generators.DataGenerators.NIL_DESCR; |
| import static harry.generators.DataGenerators.UNSET_DESCR; |
| |
| /** |
| * Row (deflated) data selectors. Not calling them generators, since their output is entirely |
| * deterministic, and for each input they are able to produce a single output. |
| * <p> |
| * This is more or less a direct translation of the formalization. |
| * <p> |
| * All functions implemented by this interface have to _always_ produce same outputs for same inputs. |
| * Most of the functions, with the exception of real-time clock translations, should be pure. |
| * <p> |
| * Functions that are reverse of their counterparts are prefixed with "un" |
| */ |
| public interface OpSelectors |
| { |
| public static interface Rng |
| { |
| long randomNumber(long i, long stream); |
| |
| long sequenceNumber(long r, long stream); |
| |
| default long next(long r) |
| { |
| return next(r, 0); |
| } |
| |
| long next(long r, long stream); |
| |
| long prev(long r, long stream); |
| |
| default long prev(long r) |
| { |
| return next(r, 0); |
| } |
| } |
| |
| /** |
| * Clock is a component responsible for mapping _logical_ timestamps to _real-time_ ones. |
| * When reproducing test failures, and for validation purposes, a snapshot of such clock can |
| * be taken to map a real-time timestamp from the value retrieved from the database in order |
| * to map it back to the logical timestamp of the operation that wrote this value. |
| */ |
| public interface MonotonicClock |
| { |
| long rts(long lts); |
| long lts(long rts); |
| |
| long nextLts(); |
| long peek(); |
| |
| Configuration.ClockConfiguration toConfig(); |
| } |
| |
| public static interface MonotonicClockFactory |
| { |
| public MonotonicClock make(); |
| } |
| |
| // TODO: move to DescriptorSelector, makes no sense to split them |
| |
| /** |
| * *Partition descriptor selector* controls how partitions is selected based on the current logical |
| * timestamp. Default implementation is a sliding window of partition descriptors that will visit |
| * one partition after the other in the window `slide_after_repeats` times. After that will |
| * retire one partition descriptor, and pick one instead of it. |
| */ |
| public abstract class PdSelector |
| { |
| @VisibleForTesting |
| protected abstract long pd(long lts); |
| |
| public long pd(long lts, SchemaSpec schema) |
| { |
| return schema.adjustPdEntropy(pd(lts)); |
| } |
| |
| // previous and next LTS with that will yield same pd |
| public abstract long nextLts(long lts); |
| |
| public abstract long prevLts(long lts); |
| |
| public abstract long maxLtsFor(long pd); |
| public abstract long minLtsAt(long position); |
| |
| public abstract long minLtsFor(long pd); |
| public abstract long maxPosition(long maxLts); |
| } |
| |
| public static interface PdSelectorFactory |
| { |
| public PdSelector make(Rng rng); |
| } |
| |
| public static interface DescriptorSelectorFactory |
| { |
| public DescriptorSelector make(OpSelectors.Rng rng, SchemaSpec schemaSpec); |
| } |
| |
| /** |
| * DescriptorSelector controls how clustering descriptors are picked within the partition: |
| * how many rows there can be in a partition, how many rows will be visited for a logical timestamp, |
| * how many operations there will be in batch, what kind of operations there will and how often |
| * each kind of operation is going to occur. |
| */ |
| public abstract class DescriptorSelector |
| { |
| public abstract int numberOfModifications(long lts); |
| |
| public abstract int opsPerModification(long lts); |
| |
| public abstract int maxPartitionSize(); |
| |
| public abstract boolean isCdVisitedBy(long pd, long lts, long cd); |
| |
| // clustering descriptor is calculated using operation id and not modification id, since |
| // value descriptors are calculated using modification ids. |
| public long cd(long pd, long lts, long opId, SchemaSpec schema) |
| { |
| return schema.adjustCdEntropy(cd(pd, lts, opId)); |
| } |
| |
| /** |
| * Currently, we do not allow visiting the same row more than once per lts, which means that: |
| * <p> |
| * * `max(opId)` returned `cds` have to be unique for any `lts/pd` pair |
| * * {@code max(opId) < maxPartitionSize} |
| */ |
| @VisibleForTesting |
| protected abstract long cd(long pd, long lts, long opId); |
| |
| public long randomCd(long pd, long entropy, SchemaSpec schema) |
| { |
| return schema.adjustCdEntropy(randomCd(pd, entropy)); |
| } |
| |
| public abstract long randomCd(long pd, long entropy); |
| |
| @VisibleForTesting |
| protected abstract long vd(long pd, long cd, long lts, long opId, int col); |
| |
| public long[] vds(long pd, long cd, long lts, long opId, OperationKind opType, SchemaSpec schema) |
| { |
| BitSet setColumns = columnMask(pd, lts, opId, opType); |
| return descriptors(pd, cd, lts, opId, schema.regularColumns, schema.regularColumnsMask(), setColumns, schema.regularColumnsOffset); |
| } |
| |
| public long[] sds(long pd, long cd, long lts, long opId, OperationKind opType, SchemaSpec schema) |
| { |
| BitSet setColumns = columnMask(pd, lts, opId, opType); |
| return descriptors(pd, cd, lts, opId, schema.staticColumns, schema.staticColumnsMask(), setColumns, schema.staticColumnsOffset); |
| } |
| |
| public long[] descriptors(long pd, long cd, long lts, long opId, List<ColumnSpec<?>> columns, BitSet mask, BitSet setColumns, int offset) |
| { |
| assert opId < opsPerModification(lts) * numberOfModifications(lts) : String.format("Operation id %d exceeds the maximum expected number of operations %d (%d * %d)", |
| opId, opsPerModification(lts) * numberOfModifications(lts), opsPerModification(lts), numberOfModifications(lts)); |
| long[] descriptors = new long[columns.size()]; |
| |
| for (int i = 0; i < descriptors.length; i++) |
| { |
| int col = offset + i; |
| if (setColumns.isSet(col, mask)) |
| { |
| ColumnSpec<?> spec = columns.get(i); |
| long vd = vd(pd, cd, lts, opId, col) & Bytes.bytePatternFor(spec.type.maxSize()); |
| assert vd != UNSET_DESCR : "Ambiguous unset descriptor generated for the value"; |
| assert vd != NIL_DESCR : "Ambiguous nil descriptor generated for the value"; |
| |
| descriptors[i] = vd; |
| } |
| else |
| { |
| descriptors[i] = UNSET_DESCR; |
| } |
| } |
| return descriptors; |
| } |
| |
| public abstract OperationKind operationType(long pd, long lts, long opId); |
| |
| public abstract BitSet columnMask(long pd, long lts, long opId, OperationKind opType); |
| |
| // TODO: why is this one unused? |
| public abstract long rowId(long pd, long lts, long cd); |
| |
| public abstract long modificationId(long pd, long cd, long lts, long vd, int col); |
| } |
| |
| public static class PCGFast implements OpSelectors.Rng |
| { |
| private final long seed; |
| |
| public PCGFast(long seed) |
| { |
| this.seed = seed; |
| } |
| |
| public long randomNumber(long i, long stream) |
| { |
| return PCGFastPure.shuffle(PCGFastPure.advanceState(seed, i, stream)); |
| } |
| |
| public long sequenceNumber(long r, long stream) |
| { |
| return PCGFastPure.distance(seed, PCGFastPure.unshuffle(r), stream); |
| } |
| |
| public long next(long r, long stream) |
| { |
| return PCGFastPure.next(r, stream); |
| } |
| |
| public long prev(long r, long stream) |
| { |
| return PCGFastPure.previous(r, stream); |
| } |
| } |
| |
| /** |
| * Generates partition descriptors, based on LTS as if we had a sliding window. |
| * <p> |
| * Each {@code windowSize * switchAfter} steps, we move the window by one, effectively |
| * expiring one partition descriptor, and adding one partition descriptor to the window. |
| * <p> |
| * For any LTS, we can calculate previous and next LTS on which it will visit the same |
| * partition |
| */ |
| public static class DefaultPdSelector extends OpSelectors.PdSelector |
| { |
| public final static long PARTITION_DESCRIPTOR_STREAM_ID = 0x706b; |
| |
| private final OpSelectors.Rng rng; |
| private final long slideAfterRepeats; |
| private final long switchAfter; |
| private final long windowSize; |
| |
| private final long positionOffset; |
| private final long positionWindowSize; |
| |
| public DefaultPdSelector(OpSelectors.Rng rng, long windowSize, long slideAfterRepeats) |
| { |
| this(rng, windowSize, slideAfterRepeats, 0L, Long.MAX_VALUE); |
| } |
| |
| public DefaultPdSelector(OpSelectors.Rng rng, long windowSize, long slideAfterRepeats, long positionOffset, long positionWindowSize) |
| { |
| this.rng = rng; |
| this.slideAfterRepeats = slideAfterRepeats; |
| this.windowSize = windowSize; |
| this.switchAfter = windowSize * slideAfterRepeats; |
| this.positionOffset = positionOffset; |
| this.positionWindowSize = positionWindowSize; |
| } |
| |
| protected long pd(long lts) |
| { |
| return rng.randomNumber(adjustPosition(positionFor(lts)), PARTITION_DESCRIPTOR_STREAM_ID); |
| } |
| |
| public long minLtsAt(long position) |
| { |
| if (position < windowSize) |
| return position; |
| |
| long windowStart = (position - (windowSize - 1)) * slideAfterRepeats * windowSize; |
| return windowStart + windowSize - 1; |
| } |
| |
| public long minLtsFor(long pd) |
| { |
| long position = positionForPd(pd); |
| return minLtsAt(position); |
| } |
| |
| public long randomVisitedPd(long maxLts, long visitLts, SchemaSpec schema) |
| { |
| long maxPosition = maxPosition(maxLts); |
| if (maxPosition == 0) |
| return schema.adjustPdEntropy(rng.randomNumber(adjustPosition(0), PARTITION_DESCRIPTOR_STREAM_ID)); |
| |
| int idx = RngUtils.asInt(rng.randomNumber(visitLts, maxPosition), 0, (int) (maxPosition - 1)); |
| return schema.adjustPdEntropy(rng.randomNumber(adjustPosition(idx), PARTITION_DESCRIPTOR_STREAM_ID)); |
| } |
| |
| public long maxPosition(long maxLts) |
| { |
| long timesCycled = maxLts / switchAfter; |
| long windowStart = timesCycled * windowSize; |
| |
| // We have cycled through _current_ window at least once |
| if (maxLts > (windowStart * slideAfterRepeats) + windowSize) |
| return windowStart + windowSize; |
| return windowStart + maxLts % windowSize; |
| } |
| |
| protected long positionFor(long lts) |
| { |
| long windowStart = lts / switchAfter; |
| return windowStart + lts % windowSize; |
| } |
| |
| /** |
| * We only adjust position right before we go to the PCG RNG to grab a partition id, since we want |
| * the fact that PCG is actually offset to be hidden from the user and even the rest of this class. |
| */ |
| private long adjustPosition(long position) |
| { |
| if (position > positionWindowSize) |
| throw new IllegalStateException(String.format("Partition position has wrapped around, so can not be safely used. " + |
| "This runner has been given %d partitions, and if we wrap back to " + |
| "position 0, partition state is not going to be inflatable, since" + |
| "nextLts will not jump to the lts that is about to be visited. " + |
| "Increase rows per visit, batch size, or slideAfter repeats.", |
| positionWindowSize)); |
| return positionOffset + position; |
| } |
| |
| /** |
| * When computing position from pd or lts, we need to translate the offset back to the original, since |
| * it is not aware (and should not be) of the fact that positions are offset. |
| */ |
| private long unadjustPosition(long position) |
| { |
| return position - positionOffset; |
| } |
| |
| public long positionForPd(long pd) |
| { |
| return unadjustPosition(rng.sequenceNumber(pd, PARTITION_DESCRIPTOR_STREAM_ID)); |
| } |
| |
| public long nextLts(long lts) |
| { |
| long slideCount = lts / switchAfter; |
| long positionInCycle = lts - slideCount * switchAfter; |
| long nextRepeat = positionInCycle / windowSize + 1; |
| |
| if (nextRepeat > slideAfterRepeats || |
| (nextRepeat == slideAfterRepeats && (positionInCycle % windowSize) == 0)) |
| return -1; |
| |
| // last cycle before window slides; next window will have shifted by one |
| if (nextRepeat == slideAfterRepeats) |
| positionInCycle -= 1; |
| |
| return slideCount * switchAfter + windowSize + positionInCycle; |
| } |
| |
| public long prevLts(long lts) |
| { |
| long slideCount = lts / switchAfter; |
| long positionInCycle = lts - slideCount * switchAfter; |
| long prevRepeat = positionInCycle / windowSize - 1; |
| |
| if (lts < windowSize || |
| prevRepeat < -1 || |
| (prevRepeat == -1 && (positionInCycle % windowSize) == (windowSize - 1))) |
| return -1; |
| |
| // last cycle before window slides; next window will have shifted by one |
| if (prevRepeat == -1) |
| positionInCycle += 1; |
| |
| return slideCount * switchAfter - windowSize + positionInCycle; |
| } |
| |
| public long maxLtsFor(long pd) |
| { |
| long position = positionForPd(pd); |
| return position * switchAfter + (slideAfterRepeats - 1) * windowSize; |
| } |
| |
| public String toString() |
| { |
| return "DefaultPdSelector{" + |
| "slideAfterRepeats=" + slideAfterRepeats + |
| ", windowSize=" + windowSize + |
| '}'; |
| } |
| } |
| |
| public static ColumnSelectorBuilder columnSelectorBuilder() |
| { |
| return new ColumnSelectorBuilder(); |
| } |
| |
| // TODO: add weights/probabilities to this |
| // TODO: this looks like a hierarchical surjection |
| |
| public static class ColumnSelectorBuilder |
| { |
| private Map<OperationKind, Surjections.Surjection<BitSet>> m; |
| |
| public ColumnSelectorBuilder() |
| { |
| this.m = new EnumMap<>(OperationKind.class); |
| } |
| |
| public ColumnSelectorBuilder forAll(SchemaSpec schema) |
| { |
| return forAll(schema, BitSet.surjection(schema.allColumns.size())); |
| } |
| |
| // TODO: change bitsets to take into account _all_ columns not only regulars |
| public ColumnSelectorBuilder forAll(SchemaSpec schema, Surjections.Surjection<BitSet> orig) |
| { |
| for (OperationKind type : OperationKind.values()) |
| { |
| Surjections.Surjection<BitSet> gen = orig; |
| |
| switch (type) |
| { |
| case UPDATE_WITH_STATICS: |
| case DELETE_COLUMN_WITH_STATICS: |
| gen = (descriptor) -> { |
| long counter = 0; |
| while (counter <= 100) |
| { |
| BitSet bitSet = orig.inflate(descriptor); |
| if ((schema.regularColumns.isEmpty() || !bitSet.allUnset(schema.regularColumnsMask)) |
| && (schema.staticColumns.isEmpty() || !bitSet.allUnset(schema.staticColumnsMask))) |
| return bitSet; |
| |
| descriptor = RngUtils.next(descriptor); |
| counter++; |
| } |
| throw new RuntimeException(String.format("Could not generate a value after %d attempts.", counter)); |
| }; |
| break; |
| // Can not have an UPDATE statement without anything to update |
| case UPDATE: |
| gen = descriptor -> { |
| long counter = 0; |
| while (counter <= 100) |
| { |
| BitSet bitSet = orig.inflate(descriptor); |
| |
| if (!bitSet.allUnset(schema.regularColumnsMask)) |
| return bitSet; |
| |
| descriptor = RngUtils.next(descriptor); |
| counter++; |
| } |
| throw new RuntimeException(String.format("Could not generate a value after %d attempts.", counter)); |
| }; |
| break; |
| case DELETE_COLUMN: |
| gen = (descriptor) -> { |
| long counter = 0; |
| while (counter <= 100) |
| { |
| BitSet bitSet = orig.inflate(descriptor); |
| BitSet mask = schema.regularColumnsMask; |
| |
| if (!bitSet.allUnset(mask)) |
| return bitSet; |
| |
| descriptor = RngUtils.next(descriptor); |
| counter++; |
| } |
| throw new RuntimeException(String.format("Could not generate a value after %d attempts.", counter)); |
| }; |
| break; |
| } |
| this.m.put(type, gen); |
| } |
| return this; |
| } |
| |
| public ColumnSelectorBuilder forWrite(Surjections.Surjection<BitSet> gen) |
| { |
| m.put(OperationKind.INSERT, gen); |
| return this; |
| } |
| |
| public ColumnSelectorBuilder forWrite(BitSet pickFrom) |
| { |
| return forWrite(Surjections.pick(pickFrom)); |
| } |
| |
| public ColumnSelectorBuilder forDelete(Surjections.Surjection<BitSet> gen) |
| { |
| m.put(OperationKind.DELETE_ROW, gen); |
| return this; |
| } |
| |
| public ColumnSelectorBuilder forDelete(BitSet pickFrom) |
| { |
| return forDelete(Surjections.pick(pickFrom)); |
| } |
| |
| public ColumnSelectorBuilder forColumnDelete(Surjections.Surjection<BitSet> gen) |
| { |
| m.put(OperationKind.DELETE_COLUMN, gen); |
| return this; |
| } |
| |
| public ColumnSelectorBuilder forColumnDelete(BitSet pickFrom) |
| { |
| return forColumnDelete(Surjections.pick(pickFrom)); |
| } |
| |
| public ColumnSelector build() |
| { |
| return (kind, descriptor) -> m.get(kind).inflate(descriptor); |
| } |
| } |
| |
| |
| /** |
| * ColumnSelector has to return BitSet specifying _all_ columns |
| */ |
| public static interface ColumnSelector |
| { |
| public BitSet columnMask(OperationKind operationKind, long descriptor); |
| } |
| |
| // TODO: this can actually be further improved upon. Maybe not generation-wise, this part seems to be ok, |
| // but in the way it is hooked up with the rest of the system |
| public static class HierarchicalDescriptorSelector extends DefaultDescriptorSelector |
| { |
| private final int[] fractions; |
| |
| public HierarchicalDescriptorSelector(Rng rng, |
| // how many parts (at most) each subsequent "level" should contain |
| int[] fractions, |
| ColumnSelector columnSelector, |
| OperationSelector operationSelector, |
| Distribution modificationsPerLtsDistribution, |
| Distribution rowsPerModificationsDistribution, |
| int maxPartitionSize) |
| { |
| super(rng, |
| columnSelector, |
| operationSelector, |
| modificationsPerLtsDistribution, |
| rowsPerModificationsDistribution, |
| maxPartitionSize); |
| this.fractions = fractions; |
| } |
| |
| @Override |
| public long cd(long pd, long lts, long opId, SchemaSpec schema) |
| { |
| if (schema.clusteringKeys.size() <= 1) |
| return schema.adjustCdEntropy(super.cd(pd, lts, opId)); |
| |
| int partitionSize = maxPartitionSize(); |
| int clusteringOffset = clusteringOffset(lts); |
| long res; |
| if (clusteringOffset == 0) |
| { |
| res = rng.prev(opId, pd); |
| } |
| else |
| { |
| int positionInPartition = (int) ((clusteringOffset + opId) % partitionSize); |
| res = cd(positionInPartition, fractions, schema, rng, pd); |
| } |
| return schema.adjustCdEntropy(res); |
| } |
| |
| @VisibleForTesting |
| public static long cd(int positionInPartition, int[] fractions, SchemaSpec schema, Rng rng, long pd) |
| { |
| long[] slices = new long[schema.clusteringKeys.size()]; |
| for (int i = 0; i < slices.length; i++) |
| { |
| int idx = i < fractions.length ? (positionInPartition % (fractions[i] - 1)) : positionInPartition; |
| slices[i] = rng.prev(idx, rng.next(pd, i + 1)); |
| } |
| |
| return schema.ckGenerator.stitch(slices); |
| } |
| |
| protected long cd(long pd, long lts, long opId) |
| { |
| throw new RuntimeException("Shouldn't be called"); |
| } |
| } |
| |
| // TODO: add a way to limit partition size alltogether; current "number of rows" notion is a bit misleading |
| public static class DefaultDescriptorSelector extends DescriptorSelector |
| { |
| protected final static long ROW_ID_STREAM = 0x726F4772069640AL; |
| protected final static long NUMBER_OF_MODIFICATIONS_STREAM = 0xf490c5272baL; |
| protected final static long ROWS_PER_OPERATION_STREAM = 0x5e03812e293L; |
| protected final static long BITSET_IDX_STREAM = 0x92eb607bef1L; |
| |
| public static OperationSelector DEFAULT_OP_SELECTOR = OperationSelector.weighted(Surjections.weights(45, 45, 3, 2, 2, 1, 1, 1), |
| OperationKind.INSERT, |
| OperationKind.INSERT_WITH_STATICS, |
| OperationKind.DELETE_ROW, |
| OperationKind.DELETE_COLUMN, |
| OperationKind.DELETE_COLUMN_WITH_STATICS, |
| OperationKind.DELETE_PARTITION, |
| OperationKind.DELETE_RANGE, |
| OperationKind.DELETE_SLICE); |
| |
| protected final OpSelectors.Rng rng; |
| protected final OperationSelector operationSelector; |
| protected final ColumnSelector columnSelector; |
| protected final Distribution modificationsPerLtsDistribution; |
| protected final Distribution rowsPerModificationsDistribution; |
| protected final int maxPartitionSize; |
| |
| public DefaultDescriptorSelector(OpSelectors.Rng rng, |
| ColumnSelector columnMaskSelector, |
| OperationSelector operationSelector, |
| Distribution modificationsPerLtsDistribution, |
| Distribution rowsPerModificationsDistribution, |
| int maxPartitionSize) |
| { |
| this.rng = rng; |
| |
| this.operationSelector = operationSelector; |
| this.columnSelector = columnMaskSelector; |
| |
| this.modificationsPerLtsDistribution = modificationsPerLtsDistribution; |
| this.rowsPerModificationsDistribution = rowsPerModificationsDistribution; |
| this.maxPartitionSize = maxPartitionSize; |
| } |
| |
| public int numberOfModifications(long lts) |
| { |
| return (int) modificationsPerLtsDistribution.skew(rng.randomNumber(lts, NUMBER_OF_MODIFICATIONS_STREAM)); |
| } |
| |
| public int opsPerModification(long lts) |
| { |
| return (int) rowsPerModificationsDistribution.skew(rng.randomNumber(lts, ROWS_PER_OPERATION_STREAM)); |
| } |
| |
| // TODO: this is not the best way to calculate a clustering offset; potentially we'd like to use |
| // some sort of expiration mechanism slimilar to PDs. |
| public int maxPartitionSize() |
| { |
| return maxPartitionSize; |
| } |
| |
| protected int clusteringOffset(long lts) |
| { |
| return RngUtils.asInt(lts, 0, maxPartitionSize() - 1); |
| } |
| |
| // TODO: this won't work for entropy-adjusted CDs, at least the way they're implemented now |
| public boolean isCdVisitedBy(long pd, long lts, long cd) |
| { |
| return rowId(pd, lts, cd) < (numberOfModifications(lts) * opsPerModification(lts)); |
| } |
| |
| public long randomCd(long pd, long entropy) |
| { |
| long positionInPartition = Math.abs(rng.prev(entropy)) % maxPartitionSize(); |
| return rng.prev(positionInPartition, pd); |
| } |
| |
| protected long cd(long pd, long lts, long opId) |
| { |
| int partitionSize = maxPartitionSize(); |
| int clusteringOffset = clusteringOffset(lts); |
| if (clusteringOffset == 0) |
| return rng.prev(opId, pd); |
| |
| // TODO: partition size can't be larger than cardinality of the value. |
| // So if we have 10 modifications per lts and 10 rows per modification, |
| // we'll visit the same row twice per lts. |
| int positionInPartition = (int) ((clusteringOffset + opId) % partitionSize); |
| return rng.prev(positionInPartition, pd); |
| } |
| |
| public long rowId(long pd, long lts, long cd) |
| { |
| int partitionSize = maxPartitionSize(); |
| int clusteringOffset = clusteringOffset(lts); |
| int positionInPartition = (int) rng.next(cd, pd); |
| |
| if (clusteringOffset == 0) |
| return positionInPartition; |
| |
| if (positionInPartition == 0) |
| return partitionSize - clusteringOffset; |
| if (positionInPartition == clusteringOffset) |
| return 0; |
| else if (positionInPartition < clusteringOffset) |
| return partitionSize - clusteringOffset + positionInPartition; |
| else |
| return positionInPartition - clusteringOffset; |
| } |
| |
| public OperationKind operationType(long pd, long lts, long opId) |
| { |
| return operationType(pd, lts, opId, partitionLevelOperationsMask(pd, lts)); |
| } |
| |
| // TODO: create this bitset once per lts |
| public BitSet partitionLevelOperationsMask(long pd, long lts) |
| { |
| int totalOps = opsPerModification(lts) * numberOfModifications(lts); |
| if (totalOps > 64) |
| { |
| throw new IllegalArgumentException("RngUtils#randomBits currently supports only up to 64 bits of entropy, so we can not " + |
| "split partition and row level operations for more than 64 operations at the moment." + |
| "Set modifications_per_lts to a number that is lower than 64 and use rows_per_modification" + |
| "to have more operations per LTS instead"); |
| } |
| |
| long seed = rng.randomNumber(pd, lts); |
| |
| int partitionLevelOps = (int) Math.ceil(operationSelector.partitionLevelThreshold * totalOps); |
| long partitionLevelOpsMask = RngUtils.randomBits(partitionLevelOps, totalOps, seed); |
| |
| return BitSet.create(partitionLevelOpsMask, totalOps); |
| } |
| |
| private OperationKind operationType(long pd, long lts, long opId, BitSet partitionLevelOperationsMask) |
| { |
| try |
| { |
| long descriptor = rng.randomNumber(pd ^ lts ^ opId, BITSET_IDX_STREAM); |
| return operationSelector.inflate(descriptor, partitionLevelOperationsMask.isSet((int) opId)); |
| } |
| catch (Throwable t) |
| { |
| throw new RuntimeException(String.format("Can not generate a random number with the following inputs: " + |
| "pd=%d lts=%d opId=%d partitionLevelOperationsMask=%s", |
| pd, lts, opId, partitionLevelOperationsMask)); |
| } |
| } |
| |
| public BitSet columnMask(long pd, long lts, long opId, OperationKind opType) |
| { |
| long descriptor = rng.randomNumber(pd ^ lts ^ opId, BITSET_IDX_STREAM); |
| return columnSelector.columnMask(opType, descriptor); |
| } |
| |
| public long vd(long pd, long cd, long lts, long opId, int col) |
| { |
| return rng.randomNumber(opId + 1, pd ^ cd ^ lts ^ col); |
| } |
| |
| public long modificationId(long pd, long cd, long lts, long vd, int col) |
| { |
| return rng.sequenceNumber(vd, pd ^ cd ^ lts ^ col) - 1; |
| } |
| } |
| |
| public enum OperationKind |
| { |
| UPDATE(false), |
| INSERT(false), |
| UPDATE_WITH_STATICS(true), |
| INSERT_WITH_STATICS(true), |
| DELETE_PARTITION(true), |
| DELETE_ROW(false), |
| DELETE_COLUMN(false), |
| DELETE_COLUMN_WITH_STATICS(true), |
| DELETE_RANGE(false), |
| DELETE_SLICE(false); |
| |
| public final boolean partititonLevel; |
| |
| OperationKind(boolean partitionLevel) |
| { |
| this.partititonLevel = partitionLevel; |
| } |
| } |
| |
| public static class OperationSelector |
| { |
| public final Surjections.Surjection<OperationKind> partitionLevelOperationSelector; |
| public final Surjections.Surjection<OperationKind> rowLevelOperationSelector; |
| // TODO: start using partitionLevelThreshold |
| public final double partitionLevelThreshold; |
| |
| public OperationSelector(Surjections.Surjection<OperationKind> partitionLevelOperationSelector, |
| Surjections.Surjection<OperationKind> rowLevelOperationSelector, |
| double partitionLevelThreshold) |
| { |
| this.partitionLevelOperationSelector = partitionLevelOperationSelector; |
| this.rowLevelOperationSelector = rowLevelOperationSelector; |
| this.partitionLevelThreshold = partitionLevelThreshold; |
| } |
| |
| public OperationKind inflate(long descriptor, boolean partitionLevel) |
| { |
| OperationKind operationKind = partitionLevel ? partitionLevelOperationSelector.inflate(descriptor) : rowLevelOperationSelector.inflate(descriptor); |
| assert operationKind.partititonLevel == partitionLevel : "Generated operation with an incorrect partition level. Check your generators."; |
| return operationKind; |
| } |
| |
| public static OperationSelector weighted(Map<OperationKind, Integer> weightsMap) |
| { |
| int[] weights = new int[weightsMap.size()]; |
| OperationKind[] operationKinds = new OperationKind[weightsMap.size()]; |
| int i = 0; |
| for (Map.Entry<OperationKind, Integer> entry : weightsMap.entrySet()) |
| { |
| weights[i] = entry.getValue(); |
| operationKinds[i] = entry.getKey(); |
| i++; |
| } |
| return weighted(Surjections.weights(weights), operationKinds); |
| } |
| |
| public static OperationSelector weighted(long[] weights, OperationKind... operationKinds) |
| { |
| assert weights.length == operationKinds.length; |
| |
| Map<OperationKind, Integer> partitionLevel = new EnumMap<OperationKind, Integer>(OperationKind.class); |
| Map<OperationKind, Integer> rowLevel = new EnumMap<OperationKind, Integer>(OperationKind.class); |
| |
| int partitionLevelSum = 0; |
| int rowLevelSum = 0; |
| for (int i = 0; i < weights.length; i++) |
| { |
| int v = (int) (weights[i] >> 32); |
| if (operationKinds[i].partititonLevel) |
| { |
| partitionLevel.put(operationKinds[i], v); |
| partitionLevelSum += v; |
| } |
| else |
| { |
| rowLevel.put(operationKinds[i], v); |
| rowLevelSum += v; |
| } |
| } |
| int sum = (partitionLevelSum + rowLevelSum); |
| |
| return new OperationSelector(Surjections.weighted(normalize(partitionLevel)), |
| Surjections.weighted(normalize(rowLevel)), |
| (partitionLevelSum * 1.0) / sum); |
| } |
| |
| public static Map<OperationKind, Integer> normalize(Map<OperationKind, Integer> weights) |
| { |
| Map<OperationKind, Integer> normalized = new EnumMap<OperationKind, Integer>(OperationKind.class); |
| int sum = 0; |
| for (Integer value : weights.values()) |
| sum += value; |
| |
| for (OperationKind kind : weights.keySet()) |
| { |
| double dbl = (sum * ((double) weights.get(kind)) / sum); |
| normalized.put(kind, (int) Math.round(dbl)); |
| } |
| |
| return normalized; |
| } |
| } |
| } |