Trigger exception if we run out of partitions
Patch by Alex Petrov; reviewed by Abe Ratnofsky for CASSANDRA-18315
diff --git a/harry-core/src/harry/core/Configuration.java b/harry-core/src/harry/core/Configuration.java
index 9427c89..e1fb52b 100644
--- a/harry-core/src/harry/core/Configuration.java
+++ b/harry-core/src/harry/core/Configuration.java
@@ -283,7 +283,7 @@
DataTrackerConfiguration data_tracker = new DefaultDataTrackerConfiguration();
RunnerConfiguration runner;
SutConfiguration system_under_test;
- PDSelectorConfiguration partition_descriptor_selector = new Configuration.DefaultPDSelectorConfiguration(10, 100, 0);
+ PDSelectorConfiguration partition_descriptor_selector = new Configuration.DefaultPDSelectorConfiguration(10, 100);
CDSelectorConfiguration clustering_descriptor_selector; // TODO: sensible default value
public ConfigurationBuilder setSeed(long seed)
@@ -710,6 +710,7 @@
public final int window_size;
public final int slide_after_repeats;
public final long position_offset;
+ public final long position_window_size;
@Deprecated
public DefaultPDSelectorConfiguration(int window_size,
@@ -718,21 +719,27 @@
this.window_size = window_size;
this.slide_after_repeats = slide_after_repeats;
this.position_offset = 0L;
+ this.position_window_size = Long.MAX_VALUE;
}
@JsonCreator
public DefaultPDSelectorConfiguration(@JsonProperty(value = "window_size", defaultValue = "10") int window_size,
@JsonProperty(value = "slide_after_repeats", defaultValue = "100") int slide_after_repeats,
- @JsonProperty(value = "position_offset", defaultValue = "0") long position_offset)
+ @JsonProperty(value = "position_offset") Long position_offset,
+ @JsonProperty(value = "position_window_size") Long position_window_size)
{
this.window_size = window_size;
this.slide_after_repeats = slide_after_repeats;
- this.position_offset = position_offset;
+ this.position_offset = position_offset == null ? 0 : position_offset;
+ if (position_window_size == null)
+ this.position_window_size = Long.MAX_VALUE - this.position_offset;
+ else
+ this.position_window_size = position_window_size;
}
public OpSelectors.PdSelector make(OpSelectors.Rng rng)
{
- return new OpSelectors.DefaultPdSelector(rng, window_size, slide_after_repeats, position_offset);
+ return new OpSelectors.DefaultPdSelector(rng, window_size, slide_after_repeats, position_offset, position_window_size);
}
}
diff --git a/harry-core/src/harry/model/OpSelectors.java b/harry-core/src/harry/model/OpSelectors.java
index cbbdeca..c15a172 100644
--- a/harry-core/src/harry/model/OpSelectors.java
+++ b/harry-core/src/harry/model/OpSelectors.java
@@ -119,9 +119,6 @@
public abstract long minLtsAt(long position);
public abstract long minLtsFor(long pd);
-
- // TODO: right now, we can only calculate a position for 64-bit (in other words, full entropy) pds
- public abstract long positionFor(long lts);
public abstract long maxPosition(long maxLts);
}
@@ -274,24 +271,26 @@
private final long windowSize;
private final long positionOffset;
+ private final long positionWindowSize;
public DefaultPdSelector(OpSelectors.Rng rng, long windowSize, long slideAfterRepeats)
{
- this(rng, windowSize, slideAfterRepeats, 0L);
+ this(rng, windowSize, slideAfterRepeats, 0L, Long.MAX_VALUE);
}
- public DefaultPdSelector(OpSelectors.Rng rng, long windowSize, long slideAfterRepeats, long positionOffset)
+ public DefaultPdSelector(OpSelectors.Rng rng, long windowSize, long slideAfterRepeats, long positionOffset, long positionWindowSize)
{
this.rng = rng;
this.slideAfterRepeats = slideAfterRepeats;
this.windowSize = windowSize;
this.switchAfter = windowSize * slideAfterRepeats;
this.positionOffset = positionOffset;
+ this.positionWindowSize = positionWindowSize;
}
protected long pd(long lts)
{
- return rng.randomNumber(positionOffset + positionFor(lts), PARTITION_DESCRIPTOR_STREAM_ID);
+ return rng.randomNumber(adjustPosition(positionFor(lts)), PARTITION_DESCRIPTOR_STREAM_ID);
}
public long minLtsAt(long position)
@@ -313,10 +312,10 @@
{
long maxPosition = maxPosition(maxLts);
if (maxPosition == 0)
- return schema.adjustPdEntropy(rng.randomNumber(positionOffset, PARTITION_DESCRIPTOR_STREAM_ID));
+ return schema.adjustPdEntropy(rng.randomNumber(adjustPosition(0), PARTITION_DESCRIPTOR_STREAM_ID));
int idx = RngUtils.asInt(rng.randomNumber(visitLts, maxPosition), 0, (int) (maxPosition - 1));
- return schema.adjustPdEntropy(rng.randomNumber(positionOffset + idx, PARTITION_DESCRIPTOR_STREAM_ID));
+ return schema.adjustPdEntropy(rng.randomNumber(adjustPosition(idx), PARTITION_DESCRIPTOR_STREAM_ID));
}
public long maxPosition(long maxLts)
@@ -330,16 +329,40 @@
return windowStart + maxLts % windowSize;
}
- // TODO: add maxPosition to make it easier/more accessible for the components like sampler, etc
- public long positionFor(long lts)
+ protected long positionFor(long lts)
{
long windowStart = lts / switchAfter;
return windowStart + lts % windowSize;
}
+ /**
+ * We only adjust position right before we go to the PCG RNG to grab a partition id, since we want
+ * the fact that PCG is actually offset to be hidden from the user and even the rest of this class.
+ */
+ private long adjustPosition(long position)
+ {
+ if (position > positionWindowSize)
+ throw new IllegalStateException(String.format("Partition position has wrapped around, so can not be safely used. " +
+ "This runner has been given %d partitions, and if we wrap back to " +
+ "position 0, partition state is not going to be inflatable, since" +
+ "nextLts will not jump to the lts that is about to be visited. " +
+ "Increase rows per visit, batch size, or slideAfter repeats.",
+ positionWindowSize));
+ return positionOffset + position;
+ }
+
+ /**
+ * When computing position from pd or lts, we need to translate the offset back to the original, since
+ * it is not aware (and should not be) of the fact that positions are offset.
+ */
+ private long unadjustPosition(long position)
+ {
+ return position - positionOffset;
+ }
+
public long positionForPd(long pd)
{
- return rng.sequenceNumber(pd, PARTITION_DESCRIPTOR_STREAM_ID) - positionOffset;
+ return unadjustPosition(rng.sequenceNumber(pd, PARTITION_DESCRIPTOR_STREAM_ID));
}
public long nextLts(long lts)
@@ -379,7 +402,7 @@
public long maxLtsFor(long pd)
{
- long position = rng.sequenceNumber(pd, PARTITION_DESCRIPTOR_STREAM_ID) - positionOffset;
+ long position = positionForPd(pd);
return position * switchAfter + (slideAfterRepeats - 1) * windowSize;
}
diff --git a/harry-core/src/harry/visitors/CorruptingVisitor.java b/harry-core/src/harry/visitors/CorruptingVisitor.java
index e332947..5fb24de 100644
--- a/harry-core/src/harry/visitors/CorruptingVisitor.java
+++ b/harry-core/src/harry/visitors/CorruptingVisitor.java
@@ -63,12 +63,9 @@
};
}
- private final AtomicLong maxPos = new AtomicLong(-1);
-
public void visit()
{
long lts = run.clock.peek();
- maxPos.updateAndGet(current -> Math.max(run.pdSelector.positionFor(lts), current));
if (lts > triggerAfter)
return;
@@ -77,7 +74,7 @@
Random random = new Random(1);
QueryResponseCorruptor corruptor = corruptors[random.nextInt(corruptors.length)];
- long maxPos = this.maxPos.get();
+ long maxPos = run.pdSelector.maxPosition(run.tracker.maxStarted());
long pd = run.pdSelector.pd(random.nextInt((int) maxPos), run.schemaSpec);
try
{
diff --git a/harry-core/src/harry/visitors/ParallelRecentValidator.java b/harry-core/src/harry/visitors/ParallelRecentValidator.java
index 415f10b..d3d0711 100644
--- a/harry-core/src/harry/visitors/ParallelRecentValidator.java
+++ b/harry-core/src/harry/visitors/ParallelRecentValidator.java
@@ -32,8 +32,10 @@
import harry.core.Run;
import harry.generators.Surjections;
import harry.model.Model;
+import harry.model.OpSelectors;
import harry.operations.Query;
import harry.operations.QueryGenerator;
+import harry.runner.DataTracker;
public class ParallelRecentValidator extends ParallelValidator<ParallelRecentValidator.State>
{
@@ -44,6 +46,8 @@
private final QueryGenerator.TypedQueryGenerator querySelector;
private final Model model;
private final QueryLogger queryLogger;
+ private final OpSelectors.PdSelector pdSelector;
+ private final DataTracker tracker;
public ParallelRecentValidator(int partitionCount, int concurrency, int queries,
Run run,
@@ -51,6 +55,8 @@
QueryLogger queryLogger)
{
super(concurrency, run);
+ this.pdSelector = run.pdSelector;
+ this.tracker = run.tracker;
this.partitionCount = partitionCount;
this.queries = Math.max(queries, 1);
this.querySelector = new QueryGenerator.TypedQueryGenerator(run.rng,
@@ -85,7 +91,7 @@
protected State initialState()
{
- return new State(run.pdSelector.maxPosition(run.tracker.maxStarted()));
+ return new State(pdSelector.maxPosition(tracker.maxStarted()));
}
public class State extends ParallelValidator.State
diff --git a/harry-core/src/harry/visitors/Sampler.java b/harry-core/src/harry/visitors/Sampler.java
index 274bf5b..7688ac9 100644
--- a/harry-core/src/harry/visitors/Sampler.java
+++ b/harry-core/src/harry/visitors/Sampler.java
@@ -50,8 +50,8 @@
this.sut = run.sut;
this.pdSelector = run.pdSelector;
this.clock = run.clock;
- this.schema = run.schemaSpec;
this.tracker = run.tracker;
+ this.schema = run.schemaSpec;
this.samplePartitions = samplePartitions;
}
diff --git a/harry-core/test/harry/model/OpSelectorsTest.java b/harry-core/test/harry/model/OpSelectorsTest.java
index e2ebd82..a5aebea 100644
--- a/harry-core/test/harry/model/OpSelectorsTest.java
+++ b/harry-core/test/harry/model/OpSelectorsTest.java
@@ -105,66 +105,69 @@
OpSelectors.Rng rng = new OpSelectors.PCGFast(1);
int cycles = 10000;
- for (int repeats = 2; repeats <= 1000; repeats++)
+ for (long[] positions : new long[][]{ { 0, Long.MAX_VALUE }, { 100, Long.MAX_VALUE }, { 1000, Long.MAX_VALUE } })
{
- for (int windowSize = 2; windowSize <= 10; windowSize++)
+ for (int repeats = 2; repeats <= 1000; repeats++)
{
- OpSelectors.DefaultPdSelector pdSupplier = new OpSelectors.DefaultPdSelector(rng, windowSize, repeats);
- long[] pds = new long[cycles];
- for (int i = 0; i < cycles; i++)
+ for (int windowSize = 2; windowSize <= 10; windowSize++)
{
- long pd = pdSupplier.pd(i);
- pds[i] = pd;
- Assert.assertEquals(pdSupplier.positionFor(i), pdSupplier.positionForPd(pd));
- }
+ OpSelectors.DefaultPdSelector pdSupplier = new OpSelectors.DefaultPdSelector(rng, windowSize, repeats, positions[0], positions[1]);
+ long[] pds = new long[cycles];
+ for (int i = 0; i < cycles; i++)
+ {
+ long pd = pdSupplier.pd(i);
+ pds[i] = pd;
+ Assert.assertEquals(pdSupplier.positionFor(i), pdSupplier.positionForPd(pd));
+ }
- Set<Long> noNext = new HashSet<>();
- for (int i = 0; i < cycles; i++)
- {
- long nextLts = pdSupplier.nextLts(i);
- Assert.assertFalse(noNext.contains(pds[i]));
- if (nextLts == -1)
+ Set<Long> noNext = new HashSet<>();
+ for (int i = 0; i < cycles; i++)
{
- noNext.add(nextLts);
+ long nextLts = pdSupplier.nextLts(i);
+ Assert.assertFalse(noNext.contains(pds[i]));
+ if (nextLts == -1)
+ {
+ noNext.add(nextLts);
+ }
+ else if (nextLts < cycles)
+ {
+ Assert.assertEquals(pds[(int) nextLts], pdSupplier.pd(i));
+ }
}
- else if (nextLts < cycles)
- {
- Assert.assertEquals(pds[(int) nextLts], pdSupplier.pd(i));
- }
- }
- Set<Long> noPrev = new HashSet<>();
- for (int i = cycles - 1; i >= 0; i--)
- {
- long prevLts = pdSupplier.prevLts(i);
- Assert.assertFalse(noPrev.contains(pds[i]));
- if (prevLts == -1)
+ Set<Long> noPrev = new HashSet<>();
+ for (int i = cycles - 1; i >= 0; i--)
{
- noPrev.add(prevLts);
+ long prevLts = pdSupplier.prevLts(i);
+ Assert.assertFalse(noPrev.contains(pds[i]));
+ if (prevLts == -1)
+ {
+ noPrev.add(prevLts);
+ }
+ else if (prevLts >= 0)
+ {
+ Assert.assertEquals(pds[(int) prevLts], pdSupplier.pd(i));
+ }
}
- else if (prevLts >= 0)
- {
- Assert.assertEquals(pds[(int) prevLts], pdSupplier.pd(i));
- }
- }
- Set<Long> seen = new HashSet<>();
- for (int i = 0; i < cycles; i++)
- {
- long pd = pdSupplier.pd(i);
- if (!seen.contains(pd))
+ Set<Long> seen = new HashSet<>();
+ for (int i = 0; i < cycles; i++)
{
- Assert.assertEquals(i, pdSupplier.minLtsAt(pdSupplier.positionFor(i)));
- seen.add(pd);
+ long pd = pdSupplier.pd(i);
+ if (!seen.contains(pd))
+ {
+ Assert.assertEquals(i, pdSupplier.minLtsAt(pdSupplier.positionFor(i)));
+ seen.add(pd);
+ }
}
- }
- for (int i = 0; i < cycles; i++)
- {
- long pd = pdSupplier.pd(i);
- long maxLts = pdSupplier.maxLtsFor(pd);
- Assert.assertEquals(-1, pdSupplier.nextLts(maxLts));
- Assert.assertEquals(pdSupplier.pd(i), pdSupplier.pd(maxLts));
+ for (int i = 0; i < cycles; i++)
+ {
+ long pd = pdSupplier.pd(i);
+ long maxLts = pdSupplier.maxLtsFor(pd);
+ Assert.assertEquals(-1, pdSupplier.nextLts(maxLts));
+ Assert.assertEquals(pdSupplier.pd(i), pdSupplier.pd(maxLts));
+ }
}
}
}
diff --git a/harry-integration/test/harry/model/IntegrationTestBase.java b/harry-integration/test/harry/model/IntegrationTestBase.java
index e76ca1a..9eb8a7e 100644
--- a/harry-integration/test/harry/model/IntegrationTestBase.java
+++ b/harry-integration/test/harry/model/IntegrationTestBase.java
@@ -100,7 +100,7 @@
.setDropSchema(true)
.setSchemaProvider((seed1, sut) -> schema)
.setClusteringDescriptorSelector(sharedCDSelectorConfiguration().build())
- .setPartitionDescriptorSelector(new Configuration.DefaultPDSelectorConfiguration(2, 200, 0))
+ .setPartitionDescriptorSelector(new Configuration.DefaultPDSelectorConfiguration(2, 200))
.setSUT(() -> sut);
}
}
\ No newline at end of file