Trigger exception if we run out of partitions

Patch by Alex Petrov; reviewed by Abe Ratnofsky for CASSANDRA-18315
diff --git a/harry-core/src/harry/core/Configuration.java b/harry-core/src/harry/core/Configuration.java
index 9427c89..e1fb52b 100644
--- a/harry-core/src/harry/core/Configuration.java
+++ b/harry-core/src/harry/core/Configuration.java
@@ -283,7 +283,7 @@
         DataTrackerConfiguration data_tracker = new DefaultDataTrackerConfiguration();
         RunnerConfiguration runner;
         SutConfiguration system_under_test;
-        PDSelectorConfiguration partition_descriptor_selector = new Configuration.DefaultPDSelectorConfiguration(10, 100, 0);
+        PDSelectorConfiguration partition_descriptor_selector = new Configuration.DefaultPDSelectorConfiguration(10, 100);
         CDSelectorConfiguration clustering_descriptor_selector; // TODO: sensible default value
 
         public ConfigurationBuilder setSeed(long seed)
@@ -710,6 +710,7 @@
         public final int window_size;
         public final int slide_after_repeats;
         public final long position_offset;
+        public final long position_window_size;
 
         @Deprecated
         public DefaultPDSelectorConfiguration(int window_size,
@@ -718,21 +719,27 @@
             this.window_size = window_size;
             this.slide_after_repeats = slide_after_repeats;
             this.position_offset = 0L;
+            this.position_window_size = Long.MAX_VALUE;
         }
 
         @JsonCreator
         public DefaultPDSelectorConfiguration(@JsonProperty(value = "window_size", defaultValue = "10") int window_size,
                                               @JsonProperty(value = "slide_after_repeats", defaultValue = "100") int slide_after_repeats,
-                                              @JsonProperty(value = "position_offset", defaultValue = "0") long position_offset)
+                                              @JsonProperty(value = "position_offset") Long position_offset,
+                                              @JsonProperty(value = "position_window_size") Long position_window_size)
         {
             this.window_size = window_size;
             this.slide_after_repeats = slide_after_repeats;
-            this.position_offset = position_offset;
+            this.position_offset = position_offset == null ? 0 : position_offset;
+            if (position_window_size == null)
+                this.position_window_size = Long.MAX_VALUE - this.position_offset;
+            else
+                this.position_window_size = position_window_size;
         }
 
         public OpSelectors.PdSelector make(OpSelectors.Rng rng)
         {
-            return new OpSelectors.DefaultPdSelector(rng, window_size, slide_after_repeats, position_offset);
+            return new OpSelectors.DefaultPdSelector(rng, window_size, slide_after_repeats, position_offset, position_window_size);
         }
     }
 
diff --git a/harry-core/src/harry/model/OpSelectors.java b/harry-core/src/harry/model/OpSelectors.java
index cbbdeca..c15a172 100644
--- a/harry-core/src/harry/model/OpSelectors.java
+++ b/harry-core/src/harry/model/OpSelectors.java
@@ -119,9 +119,6 @@
         public abstract long minLtsAt(long position);
 
         public abstract long minLtsFor(long pd);
-
-        // TODO: right now, we can only calculate a position for 64-bit (in other words, full entropy) pds
-        public abstract long positionFor(long lts);
         public abstract long maxPosition(long maxLts);
     }
 
@@ -274,24 +271,26 @@
         private final long windowSize;
 
         private final long positionOffset;
+        private final long positionWindowSize;
 
         public DefaultPdSelector(OpSelectors.Rng rng, long windowSize, long slideAfterRepeats)
         {
-            this(rng, windowSize, slideAfterRepeats, 0L);
+            this(rng, windowSize, slideAfterRepeats, 0L, Long.MAX_VALUE);
         }
 
-        public DefaultPdSelector(OpSelectors.Rng rng, long windowSize, long slideAfterRepeats, long positionOffset)
+        public DefaultPdSelector(OpSelectors.Rng rng, long windowSize, long slideAfterRepeats, long positionOffset, long positionWindowSize)
         {
             this.rng = rng;
             this.slideAfterRepeats = slideAfterRepeats;
             this.windowSize = windowSize;
             this.switchAfter = windowSize * slideAfterRepeats;
             this.positionOffset = positionOffset;
+            this.positionWindowSize = positionWindowSize;
         }
 
         protected long pd(long lts)
         {
-            return rng.randomNumber(positionOffset + positionFor(lts), PARTITION_DESCRIPTOR_STREAM_ID);
+            return rng.randomNumber(adjustPosition(positionFor(lts)), PARTITION_DESCRIPTOR_STREAM_ID);
         }
 
         public long minLtsAt(long position)
@@ -313,10 +312,10 @@
         {
             long maxPosition = maxPosition(maxLts);
             if (maxPosition == 0)
-                return schema.adjustPdEntropy(rng.randomNumber(positionOffset, PARTITION_DESCRIPTOR_STREAM_ID));
+                return schema.adjustPdEntropy(rng.randomNumber(adjustPosition(0), PARTITION_DESCRIPTOR_STREAM_ID));
 
             int idx = RngUtils.asInt(rng.randomNumber(visitLts, maxPosition), 0, (int) (maxPosition - 1));
-            return schema.adjustPdEntropy(rng.randomNumber(positionOffset + idx, PARTITION_DESCRIPTOR_STREAM_ID));
+            return schema.adjustPdEntropy(rng.randomNumber(adjustPosition(idx), PARTITION_DESCRIPTOR_STREAM_ID));
         }
 
         public long maxPosition(long maxLts)
@@ -330,16 +329,40 @@
             return windowStart + maxLts % windowSize;
         }
 
-        // TODO: add maxPosition to make it easier/more accessible for the components like sampler, etc
-        public long positionFor(long lts)
+        protected long positionFor(long lts)
         {
             long windowStart = lts / switchAfter;
             return windowStart + lts % windowSize;
         }
 
+        /**
+         * We only adjust position right before we go to the PCG RNG to grab a partition id, since we want
+         * the fact that PCG is actually offset to be hidden from the user and even the rest of this class.
+         */
+        private long adjustPosition(long position)
+        {
+            if (position > positionWindowSize)
+                throw new IllegalStateException(String.format("Partition position has wrapped around, so can not be safely used. " +
+                                                              "This runner has been given %d partitions, and if we wrap back to " +
+                                                              "position 0, partition state is not going to be inflatable, since" +
+                                                              "nextLts will not jump to the lts that is about to be visited. " +
+                                                              "Increase rows per visit, batch size, or slideAfter repeats.",
+                                                              positionWindowSize));
+            return positionOffset + position;
+        }
+
+        /**
+         * When computing position from pd or lts, we need to translate the offset back to the original, since
+         * it is not aware (and should not be) of the fact that positions are offset.
+         */
+        private long unadjustPosition(long position)
+        {
+            return position - positionOffset;
+        }
+
         public long positionForPd(long pd)
         {
-            return rng.sequenceNumber(pd, PARTITION_DESCRIPTOR_STREAM_ID) - positionOffset;
+            return unadjustPosition(rng.sequenceNumber(pd, PARTITION_DESCRIPTOR_STREAM_ID));
         }
 
         public long nextLts(long lts)
@@ -379,7 +402,7 @@
 
         public long maxLtsFor(long pd)
         {
-            long position = rng.sequenceNumber(pd, PARTITION_DESCRIPTOR_STREAM_ID) - positionOffset;
+            long position = positionForPd(pd);
             return position * switchAfter + (slideAfterRepeats - 1) * windowSize;
         }
 
diff --git a/harry-core/src/harry/visitors/CorruptingVisitor.java b/harry-core/src/harry/visitors/CorruptingVisitor.java
index e332947..5fb24de 100644
--- a/harry-core/src/harry/visitors/CorruptingVisitor.java
+++ b/harry-core/src/harry/visitors/CorruptingVisitor.java
@@ -63,12 +63,9 @@
         };
     }
 
-    private final AtomicLong maxPos = new AtomicLong(-1);
-
     public void visit()
     {
         long lts = run.clock.peek();
-        maxPos.updateAndGet(current -> Math.max(run.pdSelector.positionFor(lts), current));
 
         if (lts > triggerAfter)
             return;
@@ -77,7 +74,7 @@
         Random random = new Random(1);
 
         QueryResponseCorruptor corruptor = corruptors[random.nextInt(corruptors.length)];
-        long maxPos = this.maxPos.get();
+        long maxPos = run.pdSelector.maxPosition(run.tracker.maxStarted());
         long pd = run.pdSelector.pd(random.nextInt((int) maxPos), run.schemaSpec);
         try
         {
diff --git a/harry-core/src/harry/visitors/ParallelRecentValidator.java b/harry-core/src/harry/visitors/ParallelRecentValidator.java
index 415f10b..d3d0711 100644
--- a/harry-core/src/harry/visitors/ParallelRecentValidator.java
+++ b/harry-core/src/harry/visitors/ParallelRecentValidator.java
@@ -32,8 +32,10 @@
 import harry.core.Run;
 import harry.generators.Surjections;
 import harry.model.Model;
+import harry.model.OpSelectors;
 import harry.operations.Query;
 import harry.operations.QueryGenerator;
+import harry.runner.DataTracker;
 
 public class ParallelRecentValidator extends ParallelValidator<ParallelRecentValidator.State>
 {
@@ -44,6 +46,8 @@
     private final QueryGenerator.TypedQueryGenerator querySelector;
     private final Model model;
     private final QueryLogger queryLogger;
+    private final OpSelectors.PdSelector pdSelector;
+    private final DataTracker tracker;
 
     public ParallelRecentValidator(int partitionCount, int concurrency, int queries,
                                    Run run,
@@ -51,6 +55,8 @@
                                    QueryLogger queryLogger)
     {
         super(concurrency, run);
+        this.pdSelector = run.pdSelector;
+        this.tracker = run.tracker;
         this.partitionCount = partitionCount;
         this.queries = Math.max(queries, 1);
         this.querySelector = new QueryGenerator.TypedQueryGenerator(run.rng,
@@ -85,7 +91,7 @@
 
     protected State initialState()
     {
-        return new State(run.pdSelector.maxPosition(run.tracker.maxStarted()));
+        return new State(pdSelector.maxPosition(tracker.maxStarted()));
     }
 
     public class State extends ParallelValidator.State
diff --git a/harry-core/src/harry/visitors/Sampler.java b/harry-core/src/harry/visitors/Sampler.java
index 274bf5b..7688ac9 100644
--- a/harry-core/src/harry/visitors/Sampler.java
+++ b/harry-core/src/harry/visitors/Sampler.java
@@ -50,8 +50,8 @@
         this.sut = run.sut;
         this.pdSelector = run.pdSelector;
         this.clock = run.clock;
-        this.schema = run.schemaSpec;
         this.tracker = run.tracker;
+        this.schema = run.schemaSpec;
         this.samplePartitions = samplePartitions;
     }
 
diff --git a/harry-core/test/harry/model/OpSelectorsTest.java b/harry-core/test/harry/model/OpSelectorsTest.java
index e2ebd82..a5aebea 100644
--- a/harry-core/test/harry/model/OpSelectorsTest.java
+++ b/harry-core/test/harry/model/OpSelectorsTest.java
@@ -105,66 +105,69 @@
         OpSelectors.Rng rng = new OpSelectors.PCGFast(1);
         int cycles = 10000;
 
-        for (int repeats = 2; repeats <= 1000; repeats++)
+        for (long[] positions : new long[][]{ { 0, Long.MAX_VALUE }, { 100, Long.MAX_VALUE }, { 1000, Long.MAX_VALUE } })
         {
-            for (int windowSize = 2; windowSize <= 10; windowSize++)
+            for (int repeats = 2; repeats <= 1000; repeats++)
             {
-                OpSelectors.DefaultPdSelector pdSupplier = new OpSelectors.DefaultPdSelector(rng, windowSize, repeats);
-                long[] pds = new long[cycles];
-                for (int i = 0; i < cycles; i++)
+                for (int windowSize = 2; windowSize <= 10; windowSize++)
                 {
-                    long pd = pdSupplier.pd(i);
-                    pds[i] = pd;
-                    Assert.assertEquals(pdSupplier.positionFor(i), pdSupplier.positionForPd(pd));
-                }
+                    OpSelectors.DefaultPdSelector pdSupplier = new OpSelectors.DefaultPdSelector(rng, windowSize, repeats, positions[0], positions[1]);
+                    long[] pds = new long[cycles];
+                    for (int i = 0; i < cycles; i++)
+                    {
+                        long pd = pdSupplier.pd(i);
+                        pds[i] = pd;
+                        Assert.assertEquals(pdSupplier.positionFor(i), pdSupplier.positionForPd(pd));
+                    }
 
-                Set<Long> noNext = new HashSet<>();
-                for (int i = 0; i < cycles; i++)
-                {
-                    long nextLts = pdSupplier.nextLts(i);
-                    Assert.assertFalse(noNext.contains(pds[i]));
-                    if (nextLts == -1)
+                    Set<Long> noNext = new HashSet<>();
+                    for (int i = 0; i < cycles; i++)
                     {
-                        noNext.add(nextLts);
+                        long nextLts = pdSupplier.nextLts(i);
+                        Assert.assertFalse(noNext.contains(pds[i]));
+                        if (nextLts == -1)
+                        {
+                            noNext.add(nextLts);
+                        }
+                        else if (nextLts < cycles)
+                        {
+                            Assert.assertEquals(pds[(int) nextLts], pdSupplier.pd(i));
+                        }
                     }
-                    else if (nextLts < cycles)
-                    {
-                        Assert.assertEquals(pds[(int) nextLts], pdSupplier.pd(i));
-                    }
-                }
 
-                Set<Long> noPrev = new HashSet<>();
-                for (int i = cycles - 1; i >= 0; i--)
-                {
-                    long prevLts = pdSupplier.prevLts(i);
-                    Assert.assertFalse(noPrev.contains(pds[i]));
-                    if (prevLts == -1)
+                    Set<Long> noPrev = new HashSet<>();
+                    for (int i = cycles - 1; i >= 0; i--)
                     {
-                        noPrev.add(prevLts);
+                        long prevLts = pdSupplier.prevLts(i);
+                        Assert.assertFalse(noPrev.contains(pds[i]));
+                        if (prevLts == -1)
+                        {
+                            noPrev.add(prevLts);
+                        }
+                        else if (prevLts >= 0)
+                        {
+                            Assert.assertEquals(pds[(int) prevLts], pdSupplier.pd(i));
+                        }
                     }
-                    else if (prevLts >= 0)
-                    {
-                        Assert.assertEquals(pds[(int) prevLts], pdSupplier.pd(i));
-                    }
-                }
 
-                Set<Long> seen = new HashSet<>();
-                for (int i = 0; i < cycles; i++)
-                {
-                    long pd = pdSupplier.pd(i);
-                    if (!seen.contains(pd))
+                    Set<Long> seen = new HashSet<>();
+                    for (int i = 0; i < cycles; i++)
                     {
-                        Assert.assertEquals(i, pdSupplier.minLtsAt(pdSupplier.positionFor(i)));
-                        seen.add(pd);
+                        long pd = pdSupplier.pd(i);
+                        if (!seen.contains(pd))
+                        {
+                            Assert.assertEquals(i, pdSupplier.minLtsAt(pdSupplier.positionFor(i)));
+                            seen.add(pd);
+                        }
                     }
-                }
 
-                for (int i = 0; i < cycles; i++)
-                {
-                    long pd = pdSupplier.pd(i);
-                    long maxLts = pdSupplier.maxLtsFor(pd);
-                    Assert.assertEquals(-1, pdSupplier.nextLts(maxLts));
-                    Assert.assertEquals(pdSupplier.pd(i), pdSupplier.pd(maxLts));
+                    for (int i = 0; i < cycles; i++)
+                    {
+                        long pd = pdSupplier.pd(i);
+                        long maxLts = pdSupplier.maxLtsFor(pd);
+                        Assert.assertEquals(-1, pdSupplier.nextLts(maxLts));
+                        Assert.assertEquals(pdSupplier.pd(i), pdSupplier.pd(maxLts));
+                    }
                 }
             }
         }
diff --git a/harry-integration/test/harry/model/IntegrationTestBase.java b/harry-integration/test/harry/model/IntegrationTestBase.java
index e76ca1a..9eb8a7e 100644
--- a/harry-integration/test/harry/model/IntegrationTestBase.java
+++ b/harry-integration/test/harry/model/IntegrationTestBase.java
@@ -100,7 +100,7 @@
                                                        .setDropSchema(true)
                                                        .setSchemaProvider((seed1, sut) -> schema)
                                                        .setClusteringDescriptorSelector(sharedCDSelectorConfiguration().build())
-                                                       .setPartitionDescriptorSelector(new Configuration.DefaultPDSelectorConfiguration(2, 200, 0))
+                                                       .setPartitionDescriptorSelector(new Configuration.DefaultPDSelectorConfiguration(2, 200))
                                                        .setSUT(() -> sut);
     }
 }
\ No newline at end of file