improve bitmap vector offset to report contiguous groups (#11039)

* improve bitmap vector offset to report contiguous groups

* benchmark style

* check for contiguous in getOffsets, tests for exceptions
diff --git a/benchmarks/src/test/java/org/apache/druid/benchmark/compression/BaseColumnarLongsBenchmark.java b/benchmarks/src/test/java/org/apache/druid/benchmark/compression/BaseColumnarLongsBenchmark.java
index 9093171..69d4aa1 100644
--- a/benchmarks/src/test/java/org/apache/druid/benchmark/compression/BaseColumnarLongsBenchmark.java
+++ b/benchmarks/src/test/java/org/apache/druid/benchmark/compression/BaseColumnarLongsBenchmark.java
@@ -21,11 +21,14 @@
 
 import org.apache.druid.collections.bitmap.WrappedImmutableRoaringBitmap;
 import org.apache.druid.java.util.common.RE;
+import org.apache.druid.segment.BitmapOffset;
+import org.apache.druid.segment.SimpleAscendingOffset;
 import org.apache.druid.segment.data.ColumnarLongs;
 import org.apache.druid.segment.data.ColumnarLongsSerializer;
 import org.apache.druid.segment.data.CompressedColumnarLongsSupplier;
 import org.apache.druid.segment.data.CompressionFactory;
 import org.apache.druid.segment.data.CompressionStrategy;
+import org.apache.druid.segment.data.Offset;
 import org.apache.druid.segment.vector.BitmapVectorOffset;
 import org.apache.druid.segment.vector.NoFilterVectorOffset;
 import org.apache.druid.segment.vector.VectorOffset;
@@ -34,14 +37,14 @@
 import org.openjdk.jmh.annotations.Param;
 import org.openjdk.jmh.annotations.Scope;
 import org.openjdk.jmh.annotations.State;
+import org.openjdk.jmh.infra.Blackhole;
 import org.roaringbitmap.buffer.MutableRoaringBitmap;
 
-import javax.annotation.Nullable;
 import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.nio.ByteOrder;
 import java.nio.channels.FileChannel;
-import java.util.BitSet;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Random;
@@ -51,6 +54,8 @@
 {
   static final int VECTOR_SIZE = 512;
 
+  Map<String, ColumnarLongs> decoders = new HashMap<>();
+  Map<String, Integer> encodedSize = new HashMap<>();
   /**
    * Name of the long encoding strategy. For longs, this is a composite of both byte level block compression and
    * encoding of values within the block.
@@ -68,42 +73,158 @@
   long minValue;
   long maxValue;
 
-  @Nullable
-  BitSet filter;
-
+  Offset offset;
   VectorOffset vectorOffset;
 
-  void setupFilters(int rows, double filteredRowCountPercentage)
+
+  void scan(Blackhole blackhole)
   {
-    // todo: filter set distributions to simulate different select patterns?
-    //  (because benchmarks don't take long enough already..)
-    filter = null;
+    EncodingSizeProfiler.encodedSize = encodedSize.get(encoding);
+    ColumnarLongs encoder = decoders.get(encoding);
+    while (offset.withinBounds()) {
+      blackhole.consume(encoder.get(offset.getOffset()));
+      offset.increment();
+    }
+    offset.reset();
+    blackhole.consume(offset);
+  }
+
+  void scanVectorized(Blackhole blackhole)
+  {
+    EncodingSizeProfiler.encodedSize = encodedSize.get(encoding);
+    ColumnarLongs columnDecoder = decoders.get(encoding);
+    long[] vector = new long[VECTOR_SIZE];
+    while (!vectorOffset.isDone()) {
+      if (vectorOffset.isContiguous()) {
+        columnDecoder.get(vector, vectorOffset.getStartOffset(), vectorOffset.getCurrentVectorSize());
+      } else {
+        columnDecoder.get(vector, vectorOffset.getOffsets(), vectorOffset.getCurrentVectorSize());
+      }
+      for (int i = 0; i < vectorOffset.getCurrentVectorSize(); i++) {
+        blackhole.consume(vector[i]);
+      }
+      vectorOffset.advance();
+    }
+    blackhole.consume(vector);
+    blackhole.consume(vectorOffset);
+    vectorOffset.reset();
+    columnDecoder.close();
+  }
+
+  void setupFilters(int rows, double filteredRowCountPercentage, String filterDistribution)
+  {
     final int filteredRowCount = (int) Math.floor(rows * filteredRowCountPercentage);
 
+
     if (filteredRowCount < rows) {
-      // setup bitset filter
-      filter = new BitSet();
-      MutableRoaringBitmap bitmap = new MutableRoaringBitmap();
-      for (int i = 0; i < filteredRowCount; i++) {
-        int rowToAccess = rand.nextInt(rows);
-        // Skip already selected rows if any
-        while (filter.get(rowToAccess)) {
-          rowToAccess = rand.nextInt(rows);
-        }
-        filter.set(rowToAccess);
-        bitmap.add(rowToAccess);
+      switch (filterDistribution) {
+        case "random":
+          setupRandomFilter(rows, filteredRowCount);
+          break;
+        case "contiguous-start":
+          offset = new SimpleAscendingOffset(rows);
+          vectorOffset = new NoFilterVectorOffset(VECTOR_SIZE, 0, filteredRowCount);
+          break;
+        case "contiguous-end":
+          offset = new SimpleAscendingOffset(rows);
+          vectorOffset = new NoFilterVectorOffset(VECTOR_SIZE, rows - filteredRowCount, rows);
+          break;
+        case "contiguous-bitmap-start":
+          setupContiguousBitmapFilter(rows, filteredRowCount, 0);
+          break;
+        case "contiguous-bitmap-end":
+          setupContiguousBitmapFilter(rows, filteredRowCount, rows - filteredRowCount);
+          break;
+        case "chunky-1000":
+          setupChunkyFilter(rows, filteredRowCount, 1000);
+          break;
+        case "chunky-10000":
+          setupChunkyFilter(rows, filteredRowCount, 10000);
+          break;
+        default:
+          throw new IllegalArgumentException("unknown filter distribution");
       }
-      vectorOffset = new BitmapVectorOffset(
-          VECTOR_SIZE,
-          new WrappedImmutableRoaringBitmap(bitmap.toImmutableRoaringBitmap()),
-          0,
-          rows
-      );
     } else {
+      offset = new SimpleAscendingOffset(rows);
       vectorOffset = new NoFilterVectorOffset(VECTOR_SIZE, 0, rows);
     }
   }
 
+  private void setupRandomFilter(int rows, int filteredRowCount)
+  {
+    MutableRoaringBitmap bitmap = new MutableRoaringBitmap();
+    for (int i = 0; i < filteredRowCount; i++) {
+      int rowToAccess = rand.nextInt(rows);
+      // Skip already selected rows if any
+      while (bitmap.contains(rowToAccess)) {
+        rowToAccess = rand.nextInt(rows);
+      }
+      bitmap.add(rowToAccess);
+    }
+    offset = BitmapOffset.of(
+        new WrappedImmutableRoaringBitmap(bitmap.toImmutableRoaringBitmap()),
+        false,
+        rows
+    );
+    vectorOffset = new BitmapVectorOffset(
+        VECTOR_SIZE,
+        new WrappedImmutableRoaringBitmap(bitmap.toImmutableRoaringBitmap()),
+        0,
+        rows
+    );
+  }
+
+  private void setupContiguousBitmapFilter(int rows, int filterRowCount, int startOffset)
+  {
+    MutableRoaringBitmap bitmap = new MutableRoaringBitmap();
+    for (int i = startOffset; i < filterRowCount; i++) {
+      bitmap.add(i);
+    }
+    offset = BitmapOffset.of(
+        new WrappedImmutableRoaringBitmap(bitmap.toImmutableRoaringBitmap()),
+        false,
+        rows
+    );
+    vectorOffset = new BitmapVectorOffset(
+        VECTOR_SIZE,
+        new WrappedImmutableRoaringBitmap(bitmap.toImmutableRoaringBitmap()),
+        startOffset,
+        rows
+    );
+  }
+
+  private void setupChunkyFilter(int rows, int filteredRowCount, int chunkSize)
+  {
+    MutableRoaringBitmap bitmap = new MutableRoaringBitmap();
+    for (int count = 0; count < filteredRowCount; ) {
+      int chunkOffset = rand.nextInt(rows - chunkSize);
+      // Skip already selected rows if any
+      while (bitmap.contains(chunkOffset)) {
+        chunkOffset = rand.nextInt(rows - chunkSize);
+      }
+      int numAdded = 0;
+      for (; numAdded < chunkSize && count + numAdded < filteredRowCount; numAdded++) {
+        // break if we run into an existing contiguous section
+        if (bitmap.contains(numAdded)) {
+          break;
+        }
+        bitmap.add(chunkOffset + numAdded);
+      }
+      count += numAdded;
+    }
+    offset = BitmapOffset.of(
+        new WrappedImmutableRoaringBitmap(bitmap.toImmutableRoaringBitmap()),
+        false,
+        rows
+    );
+    vectorOffset = new BitmapVectorOffset(
+        VECTOR_SIZE,
+        new WrappedImmutableRoaringBitmap(bitmap.toImmutableRoaringBitmap()),
+        0,
+        rows
+    );
+  }
+
   static int encodeToFile(long[] vals, String encoding, FileChannel output)throws IOException
   {
     SegmentWriteOutMedium writeOutMedium = new OnHeapMemorySegmentWriteOutMedium();
diff --git a/benchmarks/src/test/java/org/apache/druid/benchmark/compression/ColumnarLongsSelectRowsFromGeneratorBenchmark.java b/benchmarks/src/test/java/org/apache/druid/benchmark/compression/ColumnarLongsSelectRowsFromGeneratorBenchmark.java
index d25ded7..93d707d 100644
--- a/benchmarks/src/test/java/org/apache/druid/benchmark/compression/ColumnarLongsSelectRowsFromGeneratorBenchmark.java
+++ b/benchmarks/src/test/java/org/apache/druid/benchmark/compression/ColumnarLongsSelectRowsFromGeneratorBenchmark.java
@@ -43,41 +43,47 @@
 import java.io.File;
 import java.io.IOException;
 import java.nio.ByteBuffer;
-import java.util.HashMap;
-import java.util.Map;
 import java.util.concurrent.TimeUnit;
 
 @State(Scope.Benchmark)
 @Fork(value = 1)
-@Warmup(iterations = 3)
-@Measurement(iterations = 5)
+@Warmup(iterations = 5)
+@Measurement(iterations = 10)
 public class ColumnarLongsSelectRowsFromGeneratorBenchmark extends BaseColumnarLongsFromGeneratorBenchmark
 {
-  private Map<String, ColumnarLongs> decoders;
-  private Map<String, Integer> encodedSize;
-
   /**
    * Number of rows to read, the test will randomly set positions in a simulated offset of the specified density in
-   * {@link #setupFilters(int, double)}
+   * {@link #setupFilters(int, double, String)}
    */
   @Param({
       "0.1",
       "0.25",
       "0.5",
+      "0.6",
       "0.75",
+      "0.8",
+      "0.9",
       "0.95",
       "1.0"
   })
   private double filteredRowCountPercentage;
 
+  @Param({
+      "random",
+      "contiguous-start",
+      "contiguous-end",
+      "contiguous-bitmap-start",
+      "contiguous-bitmap-end",
+      "chunky-1000",
+      "chunky-10000"
+  })
+  private String filterDistribution;
+
   @Setup
   public void setup() throws IOException
   {
-    decoders = new HashMap<>();
-    encodedSize = new HashMap<>();
-
     setupFromFile(encoding);
-    setupFilters(rows, filteredRowCountPercentage);
+    setupFilters(rows, filteredRowCountPercentage, filterDistribution);
 
     // uncomment this block to run sanity check to ensure all specified encodings produce the same set of results
     //CHECKSTYLE.OFF: Regexp
@@ -117,17 +123,7 @@
   @OutputTimeUnit(TimeUnit.MICROSECONDS)
   public void selectRows(Blackhole blackhole)
   {
-    EncodingSizeProfiler.encodedSize = encodedSize.get(encoding);
-    ColumnarLongs encoder = decoders.get(encoding);
-    if (filter == null) {
-      for (int i = 0; i < rows; i++) {
-        blackhole.consume(encoder.get(i));
-      }
-    } else {
-      for (int i = filter.nextSetBit(0); i >= 0; i = filter.nextSetBit(i + 1)) {
-        blackhole.consume(encoder.get(i));
-      }
-    }
+    scan(blackhole);
   }
 
   @Benchmark
@@ -135,24 +131,7 @@
   @OutputTimeUnit(TimeUnit.MICROSECONDS)
   public void selectRowsVectorized(Blackhole blackhole)
   {
-    EncodingSizeProfiler.encodedSize = encodedSize.get(encoding);
-    ColumnarLongs columnDecoder = decoders.get(encoding);
-    long[] vector = new long[VECTOR_SIZE];
-    while (!vectorOffset.isDone()) {
-      if (vectorOffset.isContiguous()) {
-        columnDecoder.get(vector, vectorOffset.getStartOffset(), vectorOffset.getCurrentVectorSize());
-      } else {
-        columnDecoder.get(vector, vectorOffset.getOffsets(), vectorOffset.getCurrentVectorSize());
-      }
-      for (int i = 0; i < vectorOffset.getCurrentVectorSize(); i++) {
-        blackhole.consume(vector[i]);
-      }
-      vectorOffset.advance();
-    }
-    blackhole.consume(vector);
-    blackhole.consume(vectorOffset);
-    vectorOffset.reset();
-    columnDecoder.close();
+    scanVectorized(blackhole);
   }
 
   public static void main(String[] args) throws RunnerException
diff --git a/benchmarks/src/test/java/org/apache/druid/benchmark/compression/ColumnarLongsSelectRowsFromSegmentBenchmark.java b/benchmarks/src/test/java/org/apache/druid/benchmark/compression/ColumnarLongsSelectRowsFromSegmentBenchmark.java
index 0fadb62..b1d4f97 100644
--- a/benchmarks/src/test/java/org/apache/druid/benchmark/compression/ColumnarLongsSelectRowsFromSegmentBenchmark.java
+++ b/benchmarks/src/test/java/org/apache/druid/benchmark/compression/ColumnarLongsSelectRowsFromSegmentBenchmark.java
@@ -54,21 +54,38 @@
 public class ColumnarLongsSelectRowsFromSegmentBenchmark extends BaseColumnarLongsFromSegmentsBenchmark
 {
   private Map<String, ColumnarLongs> decoders;
-  private Map<String, Integer> encodedSize;
 
   /**
    * Number of rows to read, the test will randomly set positions in a simulated offset of the specified density in
-   * {@link #setupFilters(int, double)}
+   * {@link #setupFilters(int, double, String)}
    */
-  @Param({"0.01", "0.1", "0.33", "0.66", "0.95", "1.0"})
+  @Param({
+      "0.1",
+      "0.25",
+      "0.5",
+      "0.75",
+      "0.9",
+      "1.0"
+  })
   private double filteredRowCountPercentage;
 
+  @Param({
+      "random",
+      "contiguous-start",
+      "contiguous-end",
+      "contiguous-bitmap-start",
+      "contiguous-bitmap-end",
+      "chunky-1000",
+      "chunky-10000"
+  })
+  private String filterDistribution;
+
   @Setup
   public void setup() throws Exception
   {
     decoders = new HashMap<>();
     encodedSize = new HashMap<>();
-    setupFilters(rows, filteredRowCountPercentage);
+    setupFilters(rows, filteredRowCountPercentage, filterDistribution);
 
     setupFromFile(encoding);
 
@@ -111,17 +128,7 @@
   @OutputTimeUnit(TimeUnit.MICROSECONDS)
   public void selectRows(Blackhole blackhole)
   {
-    EncodingSizeProfiler.encodedSize = encodedSize.get(encoding);
-    ColumnarLongs encoder = decoders.get(encoding);
-    if (filter == null) {
-      for (int i = 0; i < rows; i++) {
-        blackhole.consume(encoder.get(i));
-      }
-    } else {
-      for (int i = filter.nextSetBit(0); i >= 0; i = filter.nextSetBit(i + 1)) {
-        blackhole.consume(encoder.get(i));
-      }
-    }
+    scan(blackhole);
   }
 
   @Benchmark
@@ -129,24 +136,7 @@
   @OutputTimeUnit(TimeUnit.MICROSECONDS)
   public void selectRowsVectorized(Blackhole blackhole)
   {
-    EncodingSizeProfiler.encodedSize = encodedSize.get(encoding);
-    ColumnarLongs columnDecoder = decoders.get(encoding);
-    long[] vector = new long[VECTOR_SIZE];
-    while (!vectorOffset.isDone()) {
-      if (vectorOffset.isContiguous()) {
-        columnDecoder.get(vector, vectorOffset.getStartOffset(), vectorOffset.getCurrentVectorSize());
-      } else {
-        columnDecoder.get(vector, vectorOffset.getOffsets(), vectorOffset.getCurrentVectorSize());
-      }
-      for (int i = 0; i < vectorOffset.getCurrentVectorSize(); i++) {
-        blackhole.consume(vector[i]);
-      }
-      vectorOffset.advance();
-    }
-    blackhole.consume(vector);
-    blackhole.consume(vectorOffset);
-    vectorOffset.reset();
-    columnDecoder.close();
+    scanVectorized(blackhole);
   }
 
 
diff --git a/processing/src/main/java/org/apache/druid/segment/vector/BitmapVectorOffset.java b/processing/src/main/java/org/apache/druid/segment/vector/BitmapVectorOffset.java
index 7d26480..e459bb9 100644
--- a/processing/src/main/java/org/apache/druid/segment/vector/BitmapVectorOffset.java
+++ b/processing/src/main/java/org/apache/druid/segment/vector/BitmapVectorOffset.java
@@ -33,6 +33,7 @@
   private BatchIterator iterator;
   private boolean pastEnd;
   private int currentVectorSize;
+  private boolean isContiguous;
 
   public BitmapVectorOffset(
       final int vectorSize,
@@ -60,6 +61,7 @@
   public void advance()
   {
     currentVectorSize = 0;
+    isContiguous = false;
 
     if (pastEnd) {
       return;
@@ -85,6 +87,14 @@
 
       currentVectorSize = to;
     }
+
+    if (currentVectorSize > 1) {
+      final int adjusted = currentVectorSize - 1;
+      // for example:
+      //  [300, 301, 302, 303]: 4 - 1 == 3 == 303 - 300
+      //  [300, 301, 303, 304]: 4 - 1 == 3 != 304 - 300
+      isContiguous = offsets[adjusted] - offsets[0] == adjusted;
+    }
   }
 
   @Override
@@ -96,7 +106,7 @@
   @Override
   public boolean isContiguous()
   {
-    return false;
+    return isContiguous;
   }
 
   @Override
@@ -114,12 +124,18 @@
   @Override
   public int getStartOffset()
   {
+    if (isContiguous) {
+      return offsets[0];
+    }
     throw new UnsupportedOperationException("not contiguous");
   }
 
   @Override
   public int[] getOffsets()
   {
+    if (isContiguous) {
+      throw new UnsupportedOperationException("is contiguous");
+    }
     return offsets;
   }
 
@@ -129,6 +145,7 @@
     iterator = bitmap.batchIterator();
     currentVectorSize = 0;
     pastEnd = false;
+    isContiguous = false;
     advance();
   }
 }
diff --git a/processing/src/test/java/org/apache/druid/segment/vector/BitmapVectorOffsetTest.java b/processing/src/test/java/org/apache/druid/segment/vector/BitmapVectorOffsetTest.java
new file mode 100644
index 0000000..f58eac5
--- /dev/null
+++ b/processing/src/test/java/org/apache/druid/segment/vector/BitmapVectorOffsetTest.java
@@ -0,0 +1,166 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.segment.vector;
+
+import org.apache.druid.collections.bitmap.ImmutableBitmap;
+import org.apache.druid.collections.bitmap.WrappedImmutableRoaringBitmap;
+import org.junit.Assert;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+import org.roaringbitmap.buffer.MutableRoaringBitmap;
+
+import java.util.concurrent.ThreadLocalRandom;
+
+public class BitmapVectorOffsetTest
+{
+  private static final int VECTOR_SIZE = 128;
+  private static final int ROWS = VECTOR_SIZE * VECTOR_SIZE;
+
+  @Rule
+  public ExpectedException expectedException = ExpectedException.none();
+
+  @Test
+  public void testContiguousGetOffsetsIsExplode()
+  {
+    MutableRoaringBitmap wrapped = new MutableRoaringBitmap();
+    for (int i = 0; i < ROWS; i++) {
+      wrapped.add(i);
+    }
+
+    ImmutableBitmap bitmap = new WrappedImmutableRoaringBitmap(wrapped.toImmutableRoaringBitmap());
+    BitmapVectorOffset offset = new BitmapVectorOffset(VECTOR_SIZE, bitmap, 0, ROWS);
+
+    expectedException.expect(UnsupportedOperationException.class);
+    expectedException.expectMessage("is contiguous");
+    offset.getOffsets();
+  }
+
+  @Test
+  public void testNotContiguousGetStartOffsetIsExplode()
+  {
+    MutableRoaringBitmap wrapped = new MutableRoaringBitmap();
+    for (int i = 0; i < ROWS; i++) {
+      if (i % 2 != 0) {
+        wrapped.add(i);
+      }
+    }
+
+    ImmutableBitmap bitmap = new WrappedImmutableRoaringBitmap(wrapped.toImmutableRoaringBitmap());
+    BitmapVectorOffset offset = new BitmapVectorOffset(VECTOR_SIZE, bitmap, 0, ROWS);
+
+    expectedException.expect(UnsupportedOperationException.class);
+    expectedException.expectMessage("not contiguous");
+    offset.getStartOffset();
+  }
+
+  @Test
+  public void testContiguous()
+  {
+    // every bit is set, start from every offset and ensure all batches are contiguous
+    MutableRoaringBitmap wrapped = new MutableRoaringBitmap();
+    for (int i = 0; i < ROWS; i++) {
+      wrapped.add(i);
+    }
+
+    ImmutableBitmap bitmap = new WrappedImmutableRoaringBitmap(wrapped.toImmutableRoaringBitmap());
+    for (int startOffset = 0; startOffset < ROWS; startOffset++) {
+      BitmapVectorOffset offset = new BitmapVectorOffset(VECTOR_SIZE, bitmap, startOffset, ROWS);
+
+      while (!offset.isDone()) {
+        if (offset.getCurrentVectorSize() > 1) {
+          Assert.assertTrue(offset.isContiguous());
+        }
+        offset.advance();
+      }
+    }
+  }
+
+  @Test
+  public void testNeverContiguous()
+  {
+    MutableRoaringBitmap wrapped = new MutableRoaringBitmap();
+    for (int i = 0; i < ROWS; i++) {
+      if (i % 2 != 0) {
+        wrapped.add(i);
+      }
+    }
+
+    ImmutableBitmap bitmap = new WrappedImmutableRoaringBitmap(wrapped.toImmutableRoaringBitmap());
+    for (int startOffset = 0; startOffset < ROWS; startOffset++) {
+      BitmapVectorOffset offset = new BitmapVectorOffset(VECTOR_SIZE, bitmap, startOffset, ROWS);
+      while (!offset.isDone()) {
+        Assert.assertFalse(offset.isContiguous());
+        offset.advance();
+      }
+    }
+  }
+
+  @Test
+  public void testSometimesContiguous()
+  {
+    // this test is sort of vague
+    // set a lot of the rows so that there will be some contiguous and always at least 1 non-contiguous group
+    // (i imagine this is somewhat dependent on underlying bitmap iterator implementation)
+    MutableRoaringBitmap wrapped = new MutableRoaringBitmap();
+    for (int i = 0; i < ROWS - VECTOR_SIZE + 1; i++) {
+      int set = ThreadLocalRandom.current().nextInt(0, ROWS);
+      while (wrapped.contains(set)) {
+        set = ThreadLocalRandom.current().nextInt(0, ROWS);
+      }
+      wrapped.add(set);
+    }
+
+    ImmutableBitmap bitmap = new WrappedImmutableRoaringBitmap(wrapped.toImmutableRoaringBitmap());
+
+    int contiguousCount = 0;
+    int nonContiguousCount = 0;
+    int noContiguous = 0;
+    int allContiguous = 0;
+    for (int startOffset = 0; startOffset < ROWS; startOffset++) {
+      BitmapVectorOffset offset = new BitmapVectorOffset(VECTOR_SIZE, bitmap, startOffset, ROWS);
+
+      boolean none = true;
+      boolean all = true;
+      while (!offset.isDone()) {
+        if (offset.isContiguous()) {
+          contiguousCount++;
+          none = false;
+        } else {
+          nonContiguousCount++;
+          all = false;
+        }
+        offset.advance();
+      }
+      if (none) {
+        noContiguous++;
+      }
+      if (all) {
+        allContiguous++;
+      }
+    }
+
+    Assert.assertTrue(contiguousCount > 0);
+    Assert.assertTrue(nonContiguousCount > 0);
+    // depending on the distribution of set bits and starting offset, there are some which are never contiguous
+    Assert.assertTrue(noContiguous > 0);
+    Assert.assertEquals(0, allContiguous);
+  }
+}