Move group-varint encoding/decoding logic to DataOutput/DataInput (#12841)

diff --git a/lucene/CHANGES.txt b/lucene/CHANGES.txt
index da20a85..dd3cc38 100644
--- a/lucene/CHANGES.txt
+++ b/lucene/CHANGES.txt
@@ -120,6 +120,8 @@
 
 * GITHUB#12552: Make FSTPostingsFormat load FSTs off-heap. (Tony X)
 
+* GITHUB#12841: Move group-varint encoding/decoding logic to DataOutput/DataInput.  (Adrien Grand, Zhang Chao, Uwe Schindler)
+
 Bug Fixes
 ---------------------
 
diff --git a/lucene/benchmark-jmh/src/java/org/apache/lucene/benchmark/jmh/GroupVIntBenchmark.java b/lucene/benchmark-jmh/src/java/org/apache/lucene/benchmark/jmh/GroupVIntBenchmark.java
index 0a1a208..31625fd 100644
--- a/lucene/benchmark-jmh/src/java/org/apache/lucene/benchmark/jmh/GroupVIntBenchmark.java
+++ b/lucene/benchmark-jmh/src/java/org/apache/lucene/benchmark/jmh/GroupVIntBenchmark.java
@@ -21,15 +21,18 @@
 import java.util.Arrays;
 import java.util.Random;
 import java.util.concurrent.TimeUnit;
-import org.apache.lucene.codecs.lucene99.GroupVIntReader;
-import org.apache.lucene.codecs.lucene99.GroupVIntWriter;
 import org.apache.lucene.store.ByteArrayDataInput;
 import org.apache.lucene.store.ByteArrayDataOutput;
+import org.apache.lucene.store.ByteBuffersDataInput;
+import org.apache.lucene.store.ByteBuffersDataOutput;
+import org.apache.lucene.store.DataInput;
 import org.apache.lucene.store.Directory;
 import org.apache.lucene.store.IOContext;
 import org.apache.lucene.store.IndexInput;
 import org.apache.lucene.store.IndexOutput;
 import org.apache.lucene.store.MMapDirectory;
+import org.apache.lucene.store.NIOFSDirectory;
+import org.apache.lucene.util.GroupVIntUtil;
 import org.openjdk.jmh.annotations.Benchmark;
 import org.openjdk.jmh.annotations.BenchmarkMode;
 import org.openjdk.jmh.annotations.Fork;
@@ -86,12 +89,13 @@
   final long[] values = new long[maxSize];
 
   IndexInput byteBufferGVIntIn;
+  IndexInput nioGVIntIn;
   IndexInput byteBufferVIntIn;
+  ByteBuffersDataInput byteBuffersGVIntIn;
 
   ByteArrayDataInput byteArrayVIntIn;
   ByteArrayDataInput byteArrayGVIntIn;
 
-  // @Param({"16", "32", "64", "128", "248"})
   @Param({"64"})
   public int size;
 
@@ -99,8 +103,8 @@
     byte[] gVIntBytes = new byte[Integer.BYTES * maxSize * 2];
     byte[] vIntBytes = new byte[Integer.BYTES * maxSize * 2];
     ByteArrayDataOutput vIntOut = new ByteArrayDataOutput(vIntBytes);
-    GroupVIntWriter w = new GroupVIntWriter();
-    w.writeValues(new ByteArrayDataOutput(gVIntBytes), docs, docs.length);
+    ByteArrayDataOutput out = new ByteArrayDataOutput(gVIntBytes);
+    out.writeGroupVInts(docs, docs.length);
     for (long v : docs) {
       vIntOut.writeVInt((int) v);
     }
@@ -108,13 +112,26 @@
     byteArrayGVIntIn = new ByteArrayDataInput(gVIntBytes);
   }
 
+  void initNioInput(long[] docs) throws Exception {
+    Directory dir = new NIOFSDirectory(Files.createTempDirectory("groupvintdata"));
+    IndexOutput out = dir.createOutput("gvint", IOContext.DEFAULT);
+    out.writeGroupVInts(docs, docs.length);
+    out.close();
+    nioGVIntIn = dir.openInput("gvint", IOContext.DEFAULT);
+  }
+
+  void initByteBuffersInput(long[] docs) throws Exception {
+    ByteBuffersDataOutput buffer = new ByteBuffersDataOutput();
+    buffer.writeGroupVInts(docs, docs.length);
+    byteBuffersGVIntIn = buffer.toDataInput();
+  }
+
   void initByteBufferInput(long[] docs) throws Exception {
-    Directory dir = MMapDirectory.open(Files.createTempDirectory("groupvintdata"));
+    Directory dir = new MMapDirectory(Files.createTempDirectory("groupvintdata"));
     IndexOutput vintOut = dir.createOutput("vint", IOContext.DEFAULT);
     IndexOutput gvintOut = dir.createOutput("gvint", IOContext.DEFAULT);
 
-    GroupVIntWriter w = new GroupVIntWriter();
-    w.writeValues(gvintOut, docs, docs.length);
+    gvintOut.writeGroupVInts(docs, docs.length);
     for (long v : docs) {
       vintOut.writeVInt((int) v);
     }
@@ -124,6 +141,16 @@
     byteBufferVIntIn = dir.openInput("vint", IOContext.DEFAULT);
   }
 
+  private void readGroupVIntsBaseline(DataInput in, long[] dst, int limit) throws IOException {
+    int i;
+    for (i = 0; i <= limit - 4; i += 4) {
+      GroupVIntUtil.readGroupVInt(in, dst, i);
+    }
+    for (; i < limit; ++i) {
+      dst[i] = in.readVInt();
+    }
+  }
+
   @Setup(Level.Trial)
   public void init() throws Exception {
     long[] docs = new long[maxSize];
@@ -140,10 +167,12 @@
     }
     initByteBufferInput(docs);
     initArrayInput(docs);
+    initNioInput(docs);
+    initByteBuffersInput(docs);
   }
 
   @Benchmark
-  public void byteBufferReadVInt(Blackhole bh) throws IOException {
+  public void benchMMapDirectoryInputs_readVInt(Blackhole bh) throws IOException {
     byteBufferVIntIn.seek(0);
     for (int i = 0; i < size; i++) {
       values[i] = byteBufferVIntIn.readVInt();
@@ -152,14 +181,21 @@
   }
 
   @Benchmark
-  public void byteBufferReadGroupVInt(Blackhole bh) throws IOException {
+  public void benchMMapDirectoryInputs_readGroupVInt(Blackhole bh) throws IOException {
     byteBufferGVIntIn.seek(0);
-    GroupVIntReader.readValues(byteBufferGVIntIn, values, size);
+    byteBufferGVIntIn.readGroupVInts(values, size);
     bh.consume(values);
   }
 
   @Benchmark
-  public void byteArrayReadVInt(Blackhole bh) {
+  public void benchMMapDirectoryInputs_readGroupVIntBaseline(Blackhole bh) throws IOException {
+    byteBufferGVIntIn.seek(0);
+    this.readGroupVIntsBaseline(byteBufferGVIntIn, values, size);
+    bh.consume(values);
+  }
+
+  @Benchmark
+  public void benchByteArrayDataInput_readVInt(Blackhole bh) {
     byteArrayVIntIn.rewind();
     for (int i = 0; i < size; i++) {
       values[i] = byteArrayVIntIn.readVInt();
@@ -168,9 +204,37 @@
   }
 
   @Benchmark
-  public void byteArrayReadGroupVInt(Blackhole bh) throws IOException {
+  public void benchByteArrayDataInput_readGroupVInt(Blackhole bh) throws IOException {
     byteArrayGVIntIn.rewind();
-    GroupVIntReader.readValues(byteArrayGVIntIn, values, size);
+    byteArrayGVIntIn.readGroupVInts(values, size);
+    bh.consume(values);
+  }
+
+  @Benchmark
+  public void benchNIOFSDirectoryInputs_readGroupVInt(Blackhole bh) throws IOException {
+    nioGVIntIn.seek(0);
+    nioGVIntIn.readGroupVInts(values, size);
+    bh.consume(values);
+  }
+
+  @Benchmark
+  public void benchNIOFSDirectoryInputs_readGroupVIntBaseline(Blackhole bh) throws IOException {
+    nioGVIntIn.seek(0);
+    this.readGroupVIntsBaseline(nioGVIntIn, values, size);
+    bh.consume(values);
+  }
+
+  @Benchmark
+  public void benchByteBuffersIndexInput_readGroupVInt(Blackhole bh) throws IOException {
+    byteBuffersGVIntIn.seek(0);
+    byteBuffersGVIntIn.readGroupVInts(values, size);
+    bh.consume(values);
+  }
+
+  @Benchmark
+  public void benchByteBuffersIndexInput_readGroupVIntBaseline(Blackhole bh) throws IOException {
+    byteBuffersGVIntIn.seek(0);
+    this.readGroupVIntsBaseline(byteBuffersGVIntIn, values, size);
     bh.consume(values);
   }
 }
diff --git a/lucene/core/src/java/org/apache/lucene/codecs/lucene99/GroupVIntReader.java b/lucene/core/src/java/org/apache/lucene/codecs/lucene99/GroupVIntReader.java
deleted file mode 100644
index 5fbd206..0000000
--- a/lucene/core/src/java/org/apache/lucene/codecs/lucene99/GroupVIntReader.java
+++ /dev/null
@@ -1,57 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.lucene.codecs.lucene99;
-
-import java.io.IOException;
-import org.apache.lucene.store.DataInput;
-
-/** Decode integers using group-varint. */
-public class GroupVIntReader {
-
-  public static void readValues(DataInput in, long[] docs, int limit) throws IOException {
-    int i;
-    for (i = 0; i <= limit - 4; i += 4) {
-      final int flag = in.readByte() & 0xFF;
-
-      final int n1Minus1 = flag >> 6;
-      final int n2Minus1 = (flag >> 4) & 0x03;
-      final int n3Minus1 = (flag >> 2) & 0x03;
-      final int n4Minus1 = flag & 0x03;
-
-      docs[i] = readLong(in, n1Minus1);
-      docs[i + 1] = readLong(in, n2Minus1);
-      docs[i + 2] = readLong(in, n3Minus1);
-      docs[i + 3] = readLong(in, n4Minus1);
-    }
-    for (; i < limit; ++i) {
-      docs[i] = in.readVInt();
-    }
-  }
-
-  private static long readLong(DataInput in, int numBytesMinus1) throws IOException {
-    switch (numBytesMinus1) {
-      case 0:
-        return in.readByte() & 0xFFL;
-      case 1:
-        return in.readShort() & 0xFFFFL;
-      case 2:
-        return (in.readShort() & 0xFFFFL) | ((in.readByte() & 0xFFL) << 16);
-      default:
-        return in.readInt() & 0xFFFFFFFFL;
-    }
-  }
-}
diff --git a/lucene/core/src/java/org/apache/lucene/codecs/lucene99/GroupVIntWriter.java b/lucene/core/src/java/org/apache/lucene/codecs/lucene99/GroupVIntWriter.java
deleted file mode 100644
index 905cab2..0000000
--- a/lucene/core/src/java/org/apache/lucene/codecs/lucene99/GroupVIntWriter.java
+++ /dev/null
@@ -1,63 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.lucene.codecs.lucene99;
-
-import java.io.IOException;
-import org.apache.lucene.store.DataOutput;
-
-/**
- * Encode integers using group-varint. It uses VInt to encode tail values that are not enough for a
- * group
- */
-public class GroupVIntWriter {
-
-  // the maximum size of one group is 4 integers + 1 byte flag.
-  private byte[] bytes = new byte[17];
-  private int byteOffset = 0;
-
-  public GroupVIntWriter() {}
-
-  private int encodeValue(int v) {
-    int lastOff = byteOffset;
-    do {
-      bytes[byteOffset++] = (byte) (v & 0xFF);
-      v >>>= 8;
-    } while (v != 0);
-    return byteOffset - lastOff;
-  }
-
-  public void writeValues(DataOutput out, long[] values, int limit) throws IOException {
-    int off = 0;
-
-    // encode each group
-    while ((limit - off) >= 4) {
-      byte flag = 0;
-      byteOffset = 1;
-      flag |= (encodeValue((int) values[off++]) - 1) << 6;
-      flag |= (encodeValue((int) values[off++]) - 1) << 4;
-      flag |= (encodeValue((int) values[off++]) - 1) << 2;
-      flag |= (encodeValue((int) values[off++]) - 1);
-      bytes[0] = flag;
-      out.writeBytes(bytes, byteOffset);
-    }
-
-    // tail vints
-    for (; off < limit; off++) {
-      out.writeVInt((int) values[off]);
-    }
-  }
-}
diff --git a/lucene/core/src/java/org/apache/lucene/codecs/lucene99/Lucene99PostingsReader.java b/lucene/core/src/java/org/apache/lucene/codecs/lucene99/Lucene99PostingsReader.java
index 599d1f9..80d2491 100644
--- a/lucene/core/src/java/org/apache/lucene/codecs/lucene99/Lucene99PostingsReader.java
+++ b/lucene/core/src/java/org/apache/lucene/codecs/lucene99/Lucene99PostingsReader.java
@@ -149,7 +149,7 @@
       boolean indexHasFreq,
       boolean decodeFreq)
       throws IOException {
-    GroupVIntReader.readValues(docIn, docBuffer, num);
+    docIn.readGroupVInts(docBuffer, num);
     if (indexHasFreq && decodeFreq) {
       for (int i = 0; i < num; ++i) {
         freqBuffer[i] = docBuffer[i] & 0x01;
diff --git a/lucene/core/src/java/org/apache/lucene/codecs/lucene99/Lucene99PostingsWriter.java b/lucene/core/src/java/org/apache/lucene/codecs/lucene99/Lucene99PostingsWriter.java
index 2bd562f..a001bea 100644
--- a/lucene/core/src/java/org/apache/lucene/codecs/lucene99/Lucene99PostingsWriter.java
+++ b/lucene/core/src/java/org/apache/lucene/codecs/lucene99/Lucene99PostingsWriter.java
@@ -92,7 +92,6 @@
   private final PForUtil pforUtil;
   private final ForDeltaUtil forDeltaUtil;
   private final Lucene99SkipWriter skipWriter;
-  private final GroupVIntWriter docGroupVIntWriter;
 
   private boolean fieldHasNorms;
   private NumericDocValues norms;
@@ -173,7 +172,6 @@
     skipWriter =
         new Lucene99SkipWriter(
             MAX_SKIP_LEVELS, BLOCK_SIZE, state.segmentInfo.maxDoc(), docOut, posOut, payOut);
-    docGroupVIntWriter = new GroupVIntWriter();
   }
 
   @Override
@@ -378,7 +376,7 @@
           docDeltaBuffer[i] = (docDeltaBuffer[i] << 1) | (freqBuffer[i] == 1 ? 1 : 0);
         }
       }
-      docGroupVIntWriter.writeValues(docOut, docDeltaBuffer, docBufferUpto);
+      docOut.writeGroupVInts(docDeltaBuffer, docBufferUpto);
       if (writeFreqs) {
         for (int i = 0; i < docBufferUpto; i++) {
           final int freq = (int) freqBuffer[i];
diff --git a/lucene/core/src/java/org/apache/lucene/store/BufferedIndexInput.java b/lucene/core/src/java/org/apache/lucene/store/BufferedIndexInput.java
index 32ab161..33e3a6d 100644
--- a/lucene/core/src/java/org/apache/lucene/store/BufferedIndexInput.java
+++ b/lucene/core/src/java/org/apache/lucene/store/BufferedIndexInput.java
@@ -20,6 +20,7 @@
 import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.nio.ByteOrder;
+import org.apache.lucene.util.GroupVIntUtil;
 
 /** Base implementation class for buffered {@link IndexInput}. */
 public abstract class BufferedIndexInput extends IndexInput implements RandomAccessInput {
@@ -150,6 +151,16 @@
   }
 
   @Override
+  protected void readGroupVInt(long[] dst, int offset) throws IOException {
+    final int len =
+        GroupVIntUtil.readGroupVInt(
+            this, buffer.remaining(), p -> buffer.getInt((int) p), buffer.position(), dst, offset);
+    if (len > 0) {
+      buffer.position(buffer.position() + len);
+    }
+  }
+
+  @Override
   public final long readLong() throws IOException {
     if (Long.BYTES <= buffer.remaining()) {
       return buffer.getLong();
diff --git a/lucene/core/src/java/org/apache/lucene/store/ByteBuffersDataInput.java b/lucene/core/src/java/org/apache/lucene/store/ByteBuffersDataInput.java
index 9d0ea74..7812b2c 100644
--- a/lucene/core/src/java/org/apache/lucene/store/ByteBuffersDataInput.java
+++ b/lucene/core/src/java/org/apache/lucene/store/ByteBuffersDataInput.java
@@ -29,6 +29,7 @@
 import java.util.Locale;
 import java.util.stream.Collectors;
 import org.apache.lucene.util.Accountable;
+import org.apache.lucene.util.GroupVIntUtil;
 import org.apache.lucene.util.RamUsageEstimator;
 
 /**
@@ -213,6 +214,25 @@
   }
 
   @Override
+  protected void readGroupVInt(long[] dst, int offset) throws IOException {
+    final ByteBuffer block = blocks[blockIndex(pos)];
+    final int blockOffset = blockOffset(pos);
+    // We MUST save the return value to local variable, could not use pos += readGroupVInt(...).
+    // because `pos +=` in java will move current value(not address) of pos to register first,
+    // then call the function, but we will update pos value in function via readByte(), then
+    // `pos +=` will use an old pos value plus return value, thereby missing 1 byte.
+    final int len =
+        GroupVIntUtil.readGroupVInt(
+            this,
+            block.limit() - blockOffset,
+            p -> block.getInt((int) p),
+            blockOffset,
+            dst,
+            offset);
+    pos += len;
+  }
+
+  @Override
   public long length() {
     return length;
   }
diff --git a/lucene/core/src/java/org/apache/lucene/store/ByteBuffersIndexInput.java b/lucene/core/src/java/org/apache/lucene/store/ByteBuffersIndexInput.java
index 3a13e03..58101a1 100644
--- a/lucene/core/src/java/org/apache/lucene/store/ByteBuffersIndexInput.java
+++ b/lucene/core/src/java/org/apache/lucene/store/ByteBuffersIndexInput.java
@@ -206,6 +206,12 @@
   }
 
   @Override
+  protected void readGroupVInt(long[] dst, int offset) throws IOException {
+    ensureOpen();
+    in.readGroupVInt(dst, offset);
+  }
+
+  @Override
   public IndexInput clone() {
     ensureOpen();
     ByteBuffersIndexInput cloned =
diff --git a/lucene/core/src/java/org/apache/lucene/store/DataInput.java b/lucene/core/src/java/org/apache/lucene/store/DataInput.java
index 24aae03..781066f 100644
--- a/lucene/core/src/java/org/apache/lucene/store/DataInput.java
+++ b/lucene/core/src/java/org/apache/lucene/store/DataInput.java
@@ -27,6 +27,7 @@
 import java.util.TreeMap;
 import java.util.TreeSet;
 import org.apache.lucene.util.BitUtil;
+import org.apache.lucene.util.GroupVIntUtil;
 
 /**
  * Abstract base class for performing read operations of Lucene's low-level data types.
@@ -99,6 +100,32 @@
   }
 
   /**
+   * Read all the group varints, including the tail vints. we need a long[] because this is what
+   * postings are using, all longs are actually required to be integers.
+   *
+   * @param dst the array to read ints into.
+   * @param limit the number of int values to read.
+   * @lucene.experimental
+   */
+  public final void readGroupVInts(long[] dst, int limit) throws IOException {
+    int i;
+    for (i = 0; i <= limit - 4; i += 4) {
+      readGroupVInt(dst, i);
+    }
+    for (; i < limit; ++i) {
+      dst[i] = readVInt();
+    }
+  }
+
+  /**
+   * Override if you have a efficient implementation. In general this is when the input supports
+   * random access.
+   */
+  protected void readGroupVInt(long[] dst, int offset) throws IOException {
+    GroupVIntUtil.readGroupVInt(this, dst, offset);
+  }
+
+  /**
    * Reads an int stored in variable-length format. Reads between one and five bytes. Smaller values
    * take fewer bytes. Negative numbers are supported, but should be avoided.
    *
diff --git a/lucene/core/src/java/org/apache/lucene/store/DataOutput.java b/lucene/core/src/java/org/apache/lucene/store/DataOutput.java
index be5fbf76b4..047cfa1 100644
--- a/lucene/core/src/java/org/apache/lucene/store/DataOutput.java
+++ b/lucene/core/src/java/org/apache/lucene/store/DataOutput.java
@@ -21,6 +21,7 @@
 import java.util.Set;
 import org.apache.lucene.util.BitUtil;
 import org.apache.lucene.util.BytesRef;
+import org.apache.lucene.util.BytesRefBuilder;
 
 /**
  * Abstract base class for performing write operations of Lucene's low-level data types.
@@ -29,6 +30,7 @@
  * internal state like file position).
  */
 public abstract class DataOutput {
+  private final BytesRefBuilder groupVIntBytes = new BytesRefBuilder();
 
   /**
    * Writes a single byte.
@@ -322,4 +324,43 @@
       writeString(value);
     }
   }
+
+  /**
+   * Encode integers using group-varint. It uses {@link DataOutput#writeVInt VInt} to encode tail
+   * values that are not enough for a group. we need a long[] because this is what postings are
+   * using, all longs are actually required to be integers.
+   *
+   * @param values the values to write
+   * @param limit the number of values to write.
+   * @lucene.experimental
+   */
+  public void writeGroupVInts(long[] values, int limit) throws IOException {
+    int off = 0;
+
+    // encode each group
+    while ((limit - off) >= 4) {
+      byte flag = 0;
+      groupVIntBytes.setLength(1);
+      flag |= (encodeGroupValue(Math.toIntExact(values[off++])) - 1) << 6;
+      flag |= (encodeGroupValue(Math.toIntExact(values[off++])) - 1) << 4;
+      flag |= (encodeGroupValue(Math.toIntExact(values[off++])) - 1) << 2;
+      flag |= (encodeGroupValue(Math.toIntExact(values[off++])) - 1);
+      groupVIntBytes.setByteAt(0, flag);
+      writeBytes(groupVIntBytes.bytes(), groupVIntBytes.length());
+    }
+
+    // tail vints
+    for (; off < limit; off++) {
+      writeVInt(Math.toIntExact(values[off]));
+    }
+  }
+
+  private int encodeGroupValue(int v) {
+    int lastOff = groupVIntBytes.length();
+    do {
+      groupVIntBytes.append((byte) (v & 0xFF));
+      v >>>= 8;
+    } while (v != 0);
+    return groupVIntBytes.length() - lastOff;
+  }
 }
diff --git a/lucene/core/src/java/org/apache/lucene/util/GroupVIntUtil.java b/lucene/core/src/java/org/apache/lucene/util/GroupVIntUtil.java
new file mode 100644
index 0000000..f98b971
--- /dev/null
+++ b/lucene/core/src/java/org/apache/lucene/util/GroupVIntUtil.java
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.lucene.util;
+
+import java.io.IOException;
+import org.apache.lucene.store.DataInput;
+
+/**
+ * This class contains utility methods and constants for group varint
+ *
+ * @lucene.internal
+ */
+public final class GroupVIntUtil {
+  // the maximum length of a single group-varint is 4 integers + 1 byte flag.
+  public static final int MAX_LENGTH_PER_GROUP = 17;
+  private static final int[] MASKS = new int[] {0xFF, 0xFFFF, 0xFFFFFF, 0xFFFFFFFF};
+
+  /**
+   * Default implementation of read single group, for optimal performance, you should use {@link
+   * DataInput#readGroupVInts(long[], int)} instead.
+   *
+   * @param dst the array to read ints into.
+   * @param offset the offset in the array to start storing ints.
+   */
+  public static void readGroupVInt(DataInput in, long[] dst, int offset) throws IOException {
+    final int flag = in.readByte() & 0xFF;
+
+    final int n1Minus1 = flag >> 6;
+    final int n2Minus1 = (flag >> 4) & 0x03;
+    final int n3Minus1 = (flag >> 2) & 0x03;
+    final int n4Minus1 = flag & 0x03;
+
+    dst[offset] = readLongInGroup(in, n1Minus1);
+    dst[offset + 1] = readLongInGroup(in, n2Minus1);
+    dst[offset + 2] = readLongInGroup(in, n3Minus1);
+    dst[offset + 3] = readLongInGroup(in, n4Minus1);
+  }
+
+  private static long readLongInGroup(DataInput in, int numBytesMinus1) throws IOException {
+    switch (numBytesMinus1) {
+      case 0:
+        return in.readByte() & 0xFFL;
+      case 1:
+        return in.readShort() & 0xFFFFL;
+      case 2:
+        return (in.readShort() & 0xFFFFL) | ((in.readByte() & 0xFFL) << 16);
+      default:
+        return in.readInt() & 0xFFFFFFFFL;
+    }
+  }
+
+  /**
+   * Provides an abstraction for read int values, so that decoding logic can be reused in different
+   * DataInput.
+   */
+  @FunctionalInterface
+  public static interface IntReader {
+    int read(long v);
+  }
+
+  /**
+   * Faster implementation of read single group, It read values from the buffer that would not cross
+   * boundaries.
+   *
+   * @param in the input to use to read data.
+   * @param remaining the number of remaining bytes allowed to read for current block/segment.
+   * @param reader the supplier of read int.
+   * @param pos the start pos to read from the reader.
+   * @param dst the array to read ints into.
+   * @param offset the offset in the array to start storing ints.
+   * @return the number of bytes read excluding the flag. this indicates the number of positions
+   *     should to be increased for caller, it is 0 or positive number and less than {@link
+   *     #MAX_LENGTH_PER_GROUP}
+   */
+  public static int readGroupVInt(
+      DataInput in, long remaining, IntReader reader, long pos, long[] dst, int offset)
+      throws IOException {
+    if (remaining < MAX_LENGTH_PER_GROUP) {
+      readGroupVInt(in, dst, offset);
+      return 0;
+    }
+    final int flag = in.readByte() & 0xFF;
+    final long posStart = ++pos; // exclude the flag bytes, the position has updated via readByte().
+    final int n1Minus1 = flag >> 6;
+    final int n2Minus1 = (flag >> 4) & 0x03;
+    final int n3Minus1 = (flag >> 2) & 0x03;
+    final int n4Minus1 = flag & 0x03;
+
+    // This code path has fewer conditionals and tends to be significantly faster in benchmarks
+    dst[offset] = reader.read(pos) & MASKS[n1Minus1];
+    pos += 1 + n1Minus1;
+    dst[offset + 1] = reader.read(pos) & MASKS[n2Minus1];
+    pos += 1 + n2Minus1;
+    dst[offset + 2] = reader.read(pos) & MASKS[n3Minus1];
+    pos += 1 + n3Minus1;
+    dst[offset + 3] = reader.read(pos) & MASKS[n4Minus1];
+    pos += 1 + n4Minus1;
+    return (int) (pos - posStart);
+  }
+}
diff --git a/lucene/core/src/java19/org/apache/lucene/store/MemorySegmentIndexInput.java b/lucene/core/src/java19/org/apache/lucene/store/MemorySegmentIndexInput.java
index 44ecff5..72c7d69 100644
--- a/lucene/core/src/java19/org/apache/lucene/store/MemorySegmentIndexInput.java
+++ b/lucene/core/src/java19/org/apache/lucene/store/MemorySegmentIndexInput.java
@@ -25,6 +25,7 @@
 import java.util.Arrays;
 import java.util.Objects;
 import org.apache.lucene.util.ArrayUtil;
+import org.apache.lucene.util.GroupVIntUtil;
 
 /**
  * Base IndexInput implementation that uses an array of MemorySegments to represent a file.
@@ -306,6 +307,23 @@
   }
 
   @Override
+  protected void readGroupVInt(long[] dst, int offset) throws IOException {
+    try {
+      final int len =
+          GroupVIntUtil.readGroupVInt(
+              this,
+              curSegment.byteSize() - curPosition,
+              p -> curSegment.get(LAYOUT_LE_INT, p),
+              curPosition,
+              dst,
+              offset);
+      curPosition += len;
+    } catch (NullPointerException | IllegalStateException e) {
+      throw alreadyClosed(e);
+    }
+  }
+
+  @Override
   public void readBytes(long pos, byte[] b, int offset, int len) throws IOException {
     try {
       int si = (int) (pos >> chunkSizePower);
diff --git a/lucene/core/src/java20/org/apache/lucene/store/MemorySegmentIndexInput.java b/lucene/core/src/java20/org/apache/lucene/store/MemorySegmentIndexInput.java
index 0f2fdcc..c8eec75 100644
--- a/lucene/core/src/java20/org/apache/lucene/store/MemorySegmentIndexInput.java
+++ b/lucene/core/src/java20/org/apache/lucene/store/MemorySegmentIndexInput.java
@@ -25,6 +25,7 @@
 import java.util.Arrays;
 import java.util.Objects;
 import org.apache.lucene.util.ArrayUtil;
+import org.apache.lucene.util.GroupVIntUtil;
 
 /**
  * Base IndexInput implementation that uses an array of MemorySegments to represent a file.
@@ -304,6 +305,23 @@
   }
 
   @Override
+  protected void readGroupVInt(long[] dst, int offset) throws IOException {
+    try {
+      final int len =
+          GroupVIntUtil.readGroupVInt(
+              this,
+              curSegment.byteSize() - curPosition,
+              p -> curSegment.get(LAYOUT_LE_INT, p),
+              curPosition,
+              dst,
+              offset);
+      curPosition += len;
+    } catch (NullPointerException | IllegalStateException e) {
+      throw alreadyClosed(e);
+    }
+  }
+
+  @Override
   public void readBytes(long pos, byte[] b, int offset, int len) throws IOException {
     try {
       int si = (int) (pos >> chunkSizePower);
diff --git a/lucene/core/src/java21/org/apache/lucene/store/MemorySegmentIndexInput.java b/lucene/core/src/java21/org/apache/lucene/store/MemorySegmentIndexInput.java
index 0f2fdcc..c8eec75 100644
--- a/lucene/core/src/java21/org/apache/lucene/store/MemorySegmentIndexInput.java
+++ b/lucene/core/src/java21/org/apache/lucene/store/MemorySegmentIndexInput.java
@@ -25,6 +25,7 @@
 import java.util.Arrays;
 import java.util.Objects;
 import org.apache.lucene.util.ArrayUtil;
+import org.apache.lucene.util.GroupVIntUtil;
 
 /**
  * Base IndexInput implementation that uses an array of MemorySegments to represent a file.
@@ -304,6 +305,23 @@
   }
 
   @Override
+  protected void readGroupVInt(long[] dst, int offset) throws IOException {
+    try {
+      final int len =
+          GroupVIntUtil.readGroupVInt(
+              this,
+              curSegment.byteSize() - curPosition,
+              p -> curSegment.get(LAYOUT_LE_INT, p),
+              curPosition,
+              dst,
+              offset);
+      curPosition += len;
+    } catch (NullPointerException | IllegalStateException e) {
+      throw alreadyClosed(e);
+    }
+  }
+
+  @Override
   public void readBytes(long pos, byte[] b, int offset, int len) throws IOException {
     try {
       int si = (int) (pos >> chunkSizePower);
diff --git a/lucene/core/src/test/org/apache/lucene/codecs/lucene99/TestGroupVInt.java b/lucene/core/src/test/org/apache/lucene/codecs/lucene99/TestGroupVInt.java
deleted file mode 100644
index 6c81930..0000000
--- a/lucene/core/src/test/org/apache/lucene/codecs/lucene99/TestGroupVInt.java
+++ /dev/null
@@ -1,55 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.lucene.codecs.lucene99;
-
-import com.carrotsearch.randomizedtesting.generators.RandomNumbers;
-import java.io.IOException;
-import org.apache.lucene.store.ByteArrayDataInput;
-import org.apache.lucene.store.ByteArrayDataOutput;
-import org.apache.lucene.tests.util.LuceneTestCase;
-import org.apache.lucene.tests.util.TestUtil;
-import org.apache.lucene.util.ArrayUtil;
-import org.apache.lucene.util.packed.PackedInts;
-
-public class TestGroupVInt extends LuceneTestCase {
-
-  public void testEncodeDecode() throws IOException {
-    long[] values = new long[ForUtil.BLOCK_SIZE];
-    long[] restored = new long[ForUtil.BLOCK_SIZE];
-    final int iterations = atLeast(100);
-
-    final GroupVIntWriter w = new GroupVIntWriter();
-    byte[] encoded = new byte[(int) (Integer.BYTES * ForUtil.BLOCK_SIZE * 1.25)];
-
-    for (int i = 0; i < iterations; i++) {
-      final int bpv = TestUtil.nextInt(random(), 1, 31);
-      final int numValues = TestUtil.nextInt(random(), 1, ForUtil.BLOCK_SIZE);
-
-      // encode
-      for (int j = 0; j < numValues; j++) {
-        values[j] = RandomNumbers.randomIntBetween(random(), 0, (int) PackedInts.maxValue(bpv));
-      }
-      w.writeValues(new ByteArrayDataOutput(encoded), values, numValues);
-
-      // decode
-      GroupVIntReader.readValues(new ByteArrayDataInput(encoded), restored, numValues);
-      assertArrayEquals(
-          ArrayUtil.copyOfSubArray(values, 0, numValues),
-          ArrayUtil.copyOfSubArray(restored, 0, numValues));
-    }
-  }
-}
diff --git a/lucene/test-framework/src/java/org/apache/lucene/tests/store/BaseChunkedDirectoryTestCase.java b/lucene/test-framework/src/java/org/apache/lucene/tests/store/BaseChunkedDirectoryTestCase.java
index a2e2d51..dd956c6 100644
--- a/lucene/test-framework/src/java/org/apache/lucene/tests/store/BaseChunkedDirectoryTestCase.java
+++ b/lucene/test-framework/src/java/org/apache/lucene/tests/store/BaseChunkedDirectoryTestCase.java
@@ -50,10 +50,19 @@
   /** Creates a new directory with the specified max chunk size */
   protected abstract Directory getDirectory(Path path, int maxChunkSize) throws IOException;
 
+  public void testGroupVIntMultiBlocks() throws IOException {
+    final int maxChunkSize = random().nextInt(64, 512);
+    try (Directory dir = getDirectory(createTempDir(), maxChunkSize)) {
+      doTestGroupVInt(dir, 10, 1, 31, 1024);
+    }
+  }
+
   public void testCloneClose() throws Exception {
     Directory dir = getDirectory(createTempDir("testCloneClose"));
     IndexOutput io = dir.createOutput("bytes", newIOContext(random()));
+    final long[] values = new long[] {0, 7, 11, 9};
     io.writeVInt(5);
+    io.writeGroupVInts(values, values.length);
     io.close();
     IndexInput one = dir.openInput("bytes", IOContext.DEFAULT);
     IndexInput two = one.clone();
@@ -65,6 +74,11 @@
         () -> {
           two.readVInt();
         });
+    expectThrows(
+        AlreadyClosedException.class,
+        () -> {
+          two.readGroupVInts(values, values.length);
+        });
     assertEquals(5, three.readVInt());
     one.close();
     three.close();
@@ -74,11 +88,13 @@
   public void testCloneSliceClose() throws Exception {
     Directory dir = getDirectory(createTempDir("testCloneSliceClose"));
     IndexOutput io = dir.createOutput("bytes", newIOContext(random()));
+    final long[] values = new long[] {0, 7, 11, 9};
     io.writeInt(1);
     io.writeInt(2);
+    io.writeGroupVInts(values, values.length); // will write 5 bytes
     io.close();
     IndexInput slicer = dir.openInput("bytes", newIOContext(random()));
-    IndexInput one = slicer.slice("first int", 0, 4);
+    IndexInput one = slicer.slice("first int", 0, 4 + 5);
     IndexInput two = slicer.slice("second int", 4, 4);
     one.close();
     expectThrows(
@@ -86,6 +102,11 @@
         () -> {
           one.readInt();
         });
+    expectThrows(
+        AlreadyClosedException.class,
+        () -> {
+          one.readGroupVInts(values, values.length);
+        });
     assertEquals(2, two.readInt());
     // reopen a new slice "another":
     IndexInput another = slicer.slice("first int", 0, 4);
diff --git a/lucene/test-framework/src/java/org/apache/lucene/tests/store/BaseDirectoryTestCase.java b/lucene/test-framework/src/java/org/apache/lucene/tests/store/BaseDirectoryTestCase.java
index b98c22d..24d8db0 100644
--- a/lucene/test-framework/src/java/org/apache/lucene/tests/store/BaseDirectoryTestCase.java
+++ b/lucene/test-framework/src/java/org/apache/lucene/tests/store/BaseDirectoryTestCase.java
@@ -18,6 +18,7 @@
 
 import com.carrotsearch.randomizedtesting.RandomizedTest;
 import com.carrotsearch.randomizedtesting.generators.RandomBytes;
+import com.carrotsearch.randomizedtesting.generators.RandomNumbers;
 import com.carrotsearch.randomizedtesting.generators.RandomPicks;
 import java.io.EOFException;
 import java.io.FileNotFoundException;
@@ -58,6 +59,7 @@
 import org.apache.lucene.tests.util.TestUtil;
 import org.apache.lucene.util.ArrayUtil;
 import org.apache.lucene.util.IOUtils;
+import org.apache.lucene.util.packed.PackedInts;
 import org.junit.Assert;
 
 /** Base class for {@link Directory} implementations. */
@@ -1438,4 +1440,76 @@
       assertArrayEquals(expected, actual);
     }
   }
+
+  public void testDataTypes() throws IOException {
+    final long[] values = new long[] {43, 12345, 123456, 1234567890};
+    try (Directory dir = getDirectory(createTempDir("testDataTypes"))) {
+      IndexOutput out = dir.createOutput("test", IOContext.DEFAULT);
+      out.writeByte((byte) 43);
+      out.writeShort((short) 12345);
+      out.writeInt(1234567890);
+      out.writeGroupVInts(values, 4);
+      out.writeLong(1234567890123456789L);
+      out.close();
+
+      long[] restored = new long[4];
+      IndexInput in = dir.openInput("test", IOContext.DEFAULT);
+      assertEquals(43, in.readByte());
+      assertEquals(12345, in.readShort());
+      assertEquals(1234567890, in.readInt());
+      in.readGroupVInts(restored, 4);
+      assertArrayEquals(values, restored);
+      assertEquals(1234567890123456789L, in.readLong());
+      in.close();
+    }
+  }
+
+  public void testGroupVInt() throws IOException {
+    try (Directory dir = getDirectory(createTempDir("testGroupVInt"))) {
+      // test fallback to default implementation of readGroupVInt
+      doTestGroupVInt(dir, 5, 1, 6, 8);
+
+      // use more iterations to covers all bpv
+      doTestGroupVInt(dir, atLeast(100), 1, 31, 128);
+
+      // we use BaseChunkedDirectoryTestCase#testGroupVIntMultiBlocks cover multiple blocks for
+      // ByteBuffersDataInput and MMapDirectory
+    }
+  }
+
+  protected void doTestGroupVInt(
+      Directory dir, int iterations, int minBpv, int maxBpv, int maxNumValues) throws IOException {
+    long[] values = new long[maxNumValues];
+    int[] numValuesArray = new int[iterations];
+    IndexOutput groupVIntOut = dir.createOutput("group-varint", IOContext.DEFAULT);
+    IndexOutput vIntOut = dir.createOutput("vint", IOContext.DEFAULT);
+
+    // encode
+    for (int iter = 0; iter < iterations; iter++) {
+      final int bpv = TestUtil.nextInt(random(), minBpv, maxBpv);
+      numValuesArray[iter] = TestUtil.nextInt(random(), 1, maxNumValues);
+      for (int j = 0; j < numValuesArray[iter]; j++) {
+        values[j] = RandomNumbers.randomIntBetween(random(), 0, (int) PackedInts.maxValue(bpv));
+        vIntOut.writeVInt((int) values[j]);
+      }
+      groupVIntOut.writeGroupVInts(values, numValuesArray[iter]);
+    }
+    groupVIntOut.close();
+    vIntOut.close();
+
+    // decode
+    IndexInput groupVIntIn = dir.openInput("group-varint", IOContext.DEFAULT);
+    IndexInput vIntIn = dir.openInput("vint", IOContext.DEFAULT);
+    for (int iter = 0; iter < iterations; iter++) {
+      groupVIntIn.readGroupVInts(values, numValuesArray[iter]);
+      for (int j = 0; j < numValuesArray[iter]; j++) {
+        assertEquals(vIntIn.readVInt(), values[j]);
+      }
+    }
+
+    groupVIntIn.close();
+    vIntIn.close();
+    dir.deleteFile("group-varint");
+    dir.deleteFile("vint");
+  }
 }