Parquet: Fix vectorized reads for negative decimals (#1736)

diff --git a/api/src/test/java/org/apache/iceberg/util/RandomUtil.java b/api/src/test/java/org/apache/iceberg/util/RandomUtil.java
index a9254cf..f01db61 100644
--- a/api/src/test/java/org/apache/iceberg/util/RandomUtil.java
+++ b/api/src/test/java/org/apache/iceberg/util/RandomUtil.java
@@ -31,6 +31,10 @@
   private RandomUtil() {
   }
 
+  private static boolean negate(int num) {
+    return num % 2 == 1;
+  }
+
   @SuppressWarnings("RandomModInteger")
   public static Object generatePrimitive(Type.PrimitiveType primitive,
                                          Random random) {
@@ -49,7 +53,7 @@
           case 3:
             return 0;
           default:
-            return random.nextInt();
+            return negate(choice) ? -1 * random.nextInt() : random.nextInt();
         }
 
       case LONG:
@@ -61,7 +65,7 @@
           case 3:
             return 0L;
           default:
-            return random.nextLong();
+            return negate(choice) ? -1L * random.nextLong() : random.nextLong();
         }
 
       case FLOAT:
@@ -83,7 +87,7 @@
           case 8:
             return Float.NaN;
           default:
-            return random.nextFloat();
+            return negate(choice) ? -1.0F * random.nextFloat() : random.nextFloat();
         }
 
       case DOUBLE:
@@ -105,7 +109,7 @@
           case 8:
             return Double.NaN;
           default:
-            return random.nextDouble();
+            return negate(choice) ? -1.0D * random.nextDouble() : random.nextDouble();
         }
 
       case DATE:
@@ -140,7 +144,8 @@
       case DECIMAL:
         Types.DecimalType type = (Types.DecimalType) primitive;
         BigInteger unscaled = randomUnscaled(type.precision(), random);
-        return new BigDecimal(unscaled, type.scale());
+        BigDecimal bigDecimal = new BigDecimal(unscaled, type.scale());
+        return negate(choice) ? bigDecimal.negate() : bigDecimal;
 
       default:
         throw new IllegalArgumentException(
@@ -155,11 +160,11 @@
         return true; // doesn't really matter for booleans since they are not dictionary encoded
       case INTEGER:
       case DATE:
-        return value;
+        return negate(value) ? -1 * value : value;
       case FLOAT:
-        return (float) value;
+        return negate(value) ? -1.0F * (float) value : (float) value;
       case DOUBLE:
-        return (double) value;
+        return negate(value) ? -1.0D * (double) value : (double) value;
       case LONG:
       case TIME:
       case TIMESTAMP:
@@ -177,7 +182,8 @@
       case DECIMAL:
         Types.DecimalType type = (Types.DecimalType) primitive;
         BigInteger unscaled = new BigInteger(String.valueOf(value + 1));
-        return new BigDecimal(unscaled, type.scale());
+        BigDecimal bd = new BigDecimal(unscaled, type.scale());
+        return negate(value) ? bd.negate() : bd;
       default:
         throw new IllegalArgumentException(
             "Cannot generate random value for unknown type: " + primitive);
diff --git a/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/VectorizedArrowReader.java b/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/VectorizedArrowReader.java
index bb571e0..4eb8091 100644
--- a/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/VectorizedArrowReader.java
+++ b/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/VectorizedArrowReader.java
@@ -92,7 +92,8 @@
 
   private enum ReadType {
     FIXED_LENGTH_DECIMAL,
-    INT_LONG_BACKED_DECIMAL,
+    INT_BACKED_DECIMAL,
+    LONG_BACKED_DECIMAL,
     VARCHAR,
     VARBINARY,
     FIXED_WIDTH_BINARY,
@@ -130,8 +131,11 @@
           case FIXED_LENGTH_DECIMAL:
             vectorizedColumnIterator.nextBatchFixedLengthDecimal(vec, typeWidth, nullabilityHolder);
             break;
-          case INT_LONG_BACKED_DECIMAL:
-            vectorizedColumnIterator.nextBatchIntLongBackedDecimal(vec, typeWidth, nullabilityHolder);
+          case INT_BACKED_DECIMAL:
+            vectorizedColumnIterator.nextBatchIntBackedDecimal(vec, nullabilityHolder);
+            break;
+          case LONG_BACKED_DECIMAL:
+            vectorizedColumnIterator.nextBatchLongBackedDecimal(vec, nullabilityHolder);
             break;
           case VARBINARY:
             vectorizedColumnIterator.nextBatchVarWidthType(vec, nullabilityHolder);
@@ -237,11 +241,11 @@
                 this.typeWidth = primitive.getTypeLength();
                 break;
               case INT64:
-                this.readType = ReadType.INT_LONG_BACKED_DECIMAL;
+                this.readType = ReadType.LONG_BACKED_DECIMAL;
                 this.typeWidth = (int) BigIntVector.TYPE_WIDTH;
                 break;
               case INT32:
-                this.readType = ReadType.INT_LONG_BACKED_DECIMAL;
+                this.readType = ReadType.INT_BACKED_DECIMAL;
                 this.typeWidth = (int) IntVector.TYPE_WIDTH;
                 break;
               default:
diff --git a/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/parquet/VectorizedColumnIterator.java b/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/parquet/VectorizedColumnIterator.java
index cb9d278..d963c45 100644
--- a/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/parquet/VectorizedColumnIterator.java
+++ b/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/parquet/VectorizedColumnIterator.java
@@ -129,16 +129,30 @@
     }
   }
 
-  public void nextBatchIntLongBackedDecimal(
+  public void nextBatchIntBackedDecimal(
       FieldVector fieldVector,
-      int typeWidth,
       NullabilityHolder nullabilityHolder) {
     int rowsReadSoFar = 0;
     while (rowsReadSoFar < batchSize && hasNext()) {
       advance();
       int rowsInThisBatch =
-          vectorizedPageIterator.nextBatchIntLongBackedDecimal(fieldVector, batchSize - rowsReadSoFar,
-              rowsReadSoFar, typeWidth, nullabilityHolder);
+          vectorizedPageIterator.nextBatchIntBackedDecimal(fieldVector, batchSize - rowsReadSoFar,
+              rowsReadSoFar, nullabilityHolder);
+      rowsReadSoFar += rowsInThisBatch;
+      this.triplesRead += rowsInThisBatch;
+      fieldVector.setValueCount(rowsReadSoFar);
+    }
+  }
+
+  public void nextBatchLongBackedDecimal(
+          FieldVector fieldVector,
+          NullabilityHolder nullabilityHolder) {
+    int rowsReadSoFar = 0;
+    while (rowsReadSoFar < batchSize && hasNext()) {
+      advance();
+      int rowsInThisBatch =
+              vectorizedPageIterator.nextBatchLongBackedDecimal(fieldVector, batchSize - rowsReadSoFar,
+                      rowsReadSoFar, nullabilityHolder);
       rowsReadSoFar += rowsInThisBatch;
       this.triplesRead += rowsInThisBatch;
       fieldVector.setValueCount(rowsReadSoFar);
diff --git a/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/parquet/VectorizedDictionaryEncodedParquetValuesReader.java b/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/parquet/VectorizedDictionaryEncodedParquetValuesReader.java
index 52e389e..74d9e15 100644
--- a/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/parquet/VectorizedDictionaryEncodedParquetValuesReader.java
+++ b/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/parquet/VectorizedDictionaryEncodedParquetValuesReader.java
@@ -286,8 +286,8 @@
         case RLE:
           for (int i = 0; i < num; i++) {
             byte[] decimalBytes = dict.decodeToBinary(currentValue).getBytesUnsafe();
-            byte[] vectorBytes = new byte[DecimalVector.TYPE_WIDTH];
-            System.arraycopy(decimalBytes, 0, vectorBytes, DecimalVector.TYPE_WIDTH - typeWidth, typeWidth);
+            byte[] vectorBytes = new byte[typeWidth];
+            System.arraycopy(decimalBytes, 0, vectorBytes, 0, typeWidth);
             ((DecimalVector) vector).setBigEndian(idx, vectorBytes);
             nullabilityHolder.setNotNull(idx);
             idx++;
@@ -296,8 +296,8 @@
         case PACKED:
           for (int i = 0; i < num; i++) {
             byte[] decimalBytes = dict.decodeToBinary(packedValuesBuffer[packedValuesBufferIdx++]).getBytesUnsafe();
-            byte[] vectorBytes = new byte[DecimalVector.TYPE_WIDTH];
-            System.arraycopy(decimalBytes, 0, vectorBytes, DecimalVector.TYPE_WIDTH - typeWidth, typeWidth);
+            byte[] vectorBytes = new byte[typeWidth];
+            System.arraycopy(decimalBytes, 0, vectorBytes, 0, typeWidth);
             ((DecimalVector) vector).setBigEndian(idx, vectorBytes);
             nullabilityHolder.setNotNull(idx);
             idx++;
@@ -343,7 +343,7 @@
     }
   }
 
-  void readBatchOfDictionaryEncodedIntLongBackedDecimals(FieldVector vector, int typeWidth, int startOffset,
+  void readBatchOfDictionaryEncodedIntBackedDecimals(FieldVector vector, int startOffset,
                                                          int numValuesToRead, Dictionary dict,
                                                          NullabilityHolder nullabilityHolder) {
     int left = numValuesToRead;
@@ -358,7 +358,7 @@
           for (int i = 0; i < num; i++) {
             ((DecimalVector) vector).set(
                 idx,
-                typeWidth == Integer.BYTES ? dict.decodeToInt(currentValue) : dict.decodeToLong(currentValue));
+                dict.decodeToInt(currentValue));
             nullabilityHolder.setNotNull(idx);
             idx++;
           }
@@ -366,10 +366,41 @@
         case PACKED:
           for (int i = 0; i < num; i++) {
             ((DecimalVector) vector).set(
-                idx,
-                typeWidth == Integer.BYTES ?
-                    dict.decodeToInt(packedValuesBuffer[packedValuesBufferIdx++])
-                    : dict.decodeToLong(packedValuesBuffer[packedValuesBufferIdx++]));
+                idx, dict.decodeToInt(packedValuesBuffer[packedValuesBufferIdx++]));
+            nullabilityHolder.setNotNull(idx);
+            idx++;
+          }
+          break;
+      }
+      left -= num;
+      currentCount -= num;
+    }
+  }
+
+  void readBatchOfDictionaryEncodedLongBackedDecimals(FieldVector vector, int startOffset,
+                                                     int numValuesToRead, Dictionary dict,
+                                                     NullabilityHolder nullabilityHolder) {
+    int left = numValuesToRead;
+    int idx = startOffset;
+    while (left > 0) {
+      if (this.currentCount == 0) {
+        this.readNextGroup();
+      }
+      int num = Math.min(left, this.currentCount);
+      switch (mode) {
+        case RLE:
+          for (int i = 0; i < num; i++) {
+            ((DecimalVector) vector).set(
+                    idx,
+                    dict.decodeToLong(currentValue));
+            nullabilityHolder.setNotNull(idx);
+            idx++;
+          }
+          break;
+        case PACKED:
+          for (int i = 0; i < num; i++) {
+            ((DecimalVector) vector).set(
+                    idx, dict.decodeToLong(packedValuesBuffer[packedValuesBufferIdx++]));
             nullabilityHolder.setNotNull(idx);
             idx++;
           }
diff --git a/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/parquet/VectorizedPageIterator.java b/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/parquet/VectorizedPageIterator.java
index 2aa6f2c..9876962 100644
--- a/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/parquet/VectorizedPageIterator.java
+++ b/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/parquet/VectorizedPageIterator.java
@@ -274,28 +274,26 @@
    * Method for reading a batch of decimals backed by INT32 and INT64 parquet data types. Since Arrow stores all
    * decimals in 16 bytes, byte arrays are appropriately padded before being written to Arrow data buffers.
    */
-  public int nextBatchIntLongBackedDecimal(
+  public int nextBatchIntBackedDecimal(
       final FieldVector vector, final int expectedBatchSize, final int numValsInVector,
-      final int typeWidth, NullabilityHolder nullabilityHolder) {
+      NullabilityHolder nullabilityHolder) {
     final int actualBatchSize = getActualBatchSize(expectedBatchSize);
     if (actualBatchSize <= 0) {
       return 0;
     }
     if (dictionaryDecodeMode == DictionaryDecodeMode.EAGER) {
       vectorizedDefinitionLevelReader
-          .readBatchOfDictionaryEncodedIntLongBackedDecimals(
+          .readBatchOfDictionaryEncodedIntBackedDecimals(
               vector,
               numValsInVector,
-              typeWidth,
               actualBatchSize,
               nullabilityHolder,
               dictionaryEncodedValuesReader,
               dictionary);
     } else {
-      vectorizedDefinitionLevelReader.readBatchOfIntLongBackedDecimals(
+      vectorizedDefinitionLevelReader.readBatchOfIntBackedDecimals(
           vector,
           numValsInVector,
-          typeWidth,
           actualBatchSize,
           nullabilityHolder,
           plainValuesReader);
@@ -305,6 +303,35 @@
     return actualBatchSize;
   }
 
+  public int nextBatchLongBackedDecimal(
+          final FieldVector vector, final int expectedBatchSize, final int numValsInVector,
+          NullabilityHolder nullabilityHolder) {
+    final int actualBatchSize = getActualBatchSize(expectedBatchSize);
+    if (actualBatchSize <= 0) {
+      return 0;
+    }
+    if (dictionaryDecodeMode == DictionaryDecodeMode.EAGER) {
+      vectorizedDefinitionLevelReader
+              .readBatchOfDictionaryEncodedLongBackedDecimals(
+                      vector,
+                      numValsInVector,
+                      actualBatchSize,
+                      nullabilityHolder,
+                      dictionaryEncodedValuesReader,
+                      dictionary);
+    } else {
+      vectorizedDefinitionLevelReader.readBatchOfLongBackedDecimals(
+              vector,
+              numValsInVector,
+              actualBatchSize,
+              nullabilityHolder,
+              plainValuesReader);
+    }
+    triplesRead += actualBatchSize;
+    this.hasNext = triplesRead < triplesCount;
+    return actualBatchSize;
+  }
+
   /**
    * Method for reading a batch of decimals backed by fixed length byte array parquet data type. Arrow stores all
    * decimals in 16 bytes. This method provides the necessary padding to the decimals read. Moreover, Arrow interprets
diff --git a/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/parquet/VectorizedParquetDefinitionLevelReader.java b/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/parquet/VectorizedParquetDefinitionLevelReader.java
index 7c21119..d330f09 100644
--- a/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/parquet/VectorizedParquetDefinitionLevelReader.java
+++ b/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/parquet/VectorizedParquetDefinitionLevelReader.java
@@ -623,12 +623,12 @@
         this.readNextGroup();
       }
       int num = Math.min(left, this.currentCount);
-      byte[] byteArray = new byte[DecimalVector.TYPE_WIDTH];
+      byte[] byteArray = new byte[typeWidth];
       switch (mode) {
         case RLE:
           if (currentValue == maxDefLevel) {
             for (int i = 0; i < num; i++) {
-              valuesReader.getBuffer(typeWidth).get(byteArray, DecimalVector.TYPE_WIDTH - typeWidth, typeWidth);
+              valuesReader.getBuffer(typeWidth).get(byteArray, 0, typeWidth);
               ((DecimalVector) vector).setBigEndian(bufferIdx, byteArray);
               nullabilityHolder.setNotNull(bufferIdx);
               bufferIdx++;
@@ -641,7 +641,7 @@
         case PACKED:
           for (int i = 0; i < num; ++i) {
             if (packedValuesBuffer[packedValuesBufferIdx++] == maxDefLevel) {
-              valuesReader.getBuffer(typeWidth).get(byteArray, DecimalVector.TYPE_WIDTH - typeWidth, typeWidth);
+              valuesReader.getBuffer(typeWidth).get(byteArray, 0, typeWidth);
               ((DecimalVector) vector).setBigEndian(bufferIdx, byteArray);
               nullabilityHolder.setNotNull(bufferIdx);
             } else {
@@ -685,8 +685,8 @@
           for (int i = 0; i < num; i++) {
             if (packedValuesBuffer[packedValuesBufferIdx++] == maxDefLevel) {
               ByteBuffer decimalBytes = dict.decodeToBinary(dictionaryEncodedValuesReader.readInteger()).toByteBuffer();
-              byte[] vectorBytes = new byte[DecimalVector.TYPE_WIDTH];
-              System.arraycopy(decimalBytes, 0, vectorBytes, DecimalVector.TYPE_WIDTH - typeWidth, typeWidth);
+              byte[] vectorBytes = new byte[typeWidth];
+              System.arraycopy(decimalBytes, 0, vectorBytes, 0, typeWidth);
               ((DecimalVector) vector).setBigEndian(idx, vectorBytes);
               nullabilityHolder.setNotNull(idx);
             } else {
@@ -805,10 +805,9 @@
     }
   }
 
-  public void readBatchOfIntLongBackedDecimals(
+  public void readBatchOfIntBackedDecimals(
       final FieldVector vector, final int startOffset,
-      final int typeWidth, final int numValsToRead, NullabilityHolder nullabilityHolder,
-      ValuesAsBytesReader valuesReader) {
+      final int numValsToRead, NullabilityHolder nullabilityHolder, ValuesAsBytesReader valuesReader) {
     int bufferIdx = startOffset;
     int left = numValsToRead;
     while (left > 0) {
@@ -816,12 +815,12 @@
         this.readNextGroup();
       }
       int num = Math.min(left, this.currentCount);
-      byte[] byteArray = new byte[DecimalVector.TYPE_WIDTH];
+      byte[] byteArray = new byte[Integer.BYTES];
       switch (mode) {
         case RLE:
           if (currentValue == maxDefLevel) {
             for (int i = 0; i < num; i++) {
-              setIntLongBackedDecimal(vector, typeWidth, nullabilityHolder, valuesReader, bufferIdx, byteArray);
+              setIntBackedDecimal(vector, nullabilityHolder, valuesReader, bufferIdx, byteArray);
               bufferIdx++;
             }
           } else {
@@ -832,7 +831,7 @@
         case PACKED:
           for (int i = 0; i < num; ++i) {
             if (packedValuesBuffer[packedValuesBufferIdx++] == maxDefLevel) {
-              setIntLongBackedDecimal(vector, typeWidth, nullabilityHolder, valuesReader, bufferIdx, byteArray);
+              setIntBackedDecimal(vector, nullabilityHolder, valuesReader, bufferIdx, byteArray);
             } else {
               setNull(nullabilityHolder, bufferIdx, vector.getValidityBuffer());
             }
@@ -845,20 +844,60 @@
     }
   }
 
-  private void setIntLongBackedDecimal(FieldVector vector, int typeWidth, NullabilityHolder nullabilityHolder,
-                                       ValuesAsBytesReader valuesReader, int bufferIdx, byte[] byteArray) {
-    valuesReader.getBuffer(typeWidth).get(byteArray, 0, typeWidth);
-    vector.getDataBuffer().setBytes(bufferIdx * DecimalVector.TYPE_WIDTH, byteArray);
-    nullabilityHolder.setNotNull(bufferIdx);
-    if (setArrowValidityVector) {
-      BitVectorHelper.setBit(vector.getValidityBuffer(), bufferIdx);
+  public void readBatchOfLongBackedDecimals(
+          final FieldVector vector, final int startOffset,
+          final int numValsToRead, NullabilityHolder nullabilityHolder, ValuesAsBytesReader valuesReader) {
+    int bufferIdx = startOffset;
+    int left = numValsToRead;
+    while (left > 0) {
+      if (this.currentCount == 0) {
+        this.readNextGroup();
+      }
+      int num = Math.min(left, this.currentCount);
+      byte[] byteArray = new byte[Long.BYTES];
+      switch (mode) {
+        case RLE:
+          if (currentValue == maxDefLevel) {
+            for (int i = 0; i < num; i++) {
+              setLongBackedDecimal(vector, nullabilityHolder, valuesReader, bufferIdx, byteArray);
+              bufferIdx++;
+            }
+          } else {
+            setNulls(nullabilityHolder, bufferIdx, num, vector.getValidityBuffer());
+            bufferIdx += num;
+          }
+          break;
+        case PACKED:
+          for (int i = 0; i < num; ++i) {
+            if (packedValuesBuffer[packedValuesBufferIdx++] == maxDefLevel) {
+              setLongBackedDecimal(vector, nullabilityHolder, valuesReader, bufferIdx, byteArray);
+            } else {
+              setNull(nullabilityHolder, bufferIdx, vector.getValidityBuffer());
+            }
+            bufferIdx++;
+          }
+          break;
+      }
+      left -= num;
+      currentCount -= num;
     }
   }
 
-  public void readBatchOfDictionaryEncodedIntLongBackedDecimals(
+  private void setIntBackedDecimal(FieldVector vector, NullabilityHolder nullabilityHolder,
+                                       ValuesAsBytesReader valuesReader, int bufferIdx, byte[] byteArray) {
+    ((DecimalVector) vector).set(bufferIdx, valuesReader.getBuffer(Integer.BYTES).getInt());
+    nullabilityHolder.setNotNull(bufferIdx);
+  }
+
+  private void setLongBackedDecimal(FieldVector vector, NullabilityHolder nullabilityHolder,
+                                   ValuesAsBytesReader valuesReader, int bufferIdx, byte[] byteArray) {
+    ((DecimalVector) vector).set(bufferIdx, valuesReader.getBuffer(Long.BYTES).getLong());
+    nullabilityHolder.setNotNull(bufferIdx);
+  }
+
+  public void readBatchOfDictionaryEncodedIntBackedDecimals(
       final FieldVector vector,
       final int startOffset,
-      final int typeWidth,
       final int numValsToRead,
       NullabilityHolder nullabilityHolder,
       VectorizedDictionaryEncodedParquetValuesReader dictionaryEncodedValuesReader,
@@ -873,7 +912,7 @@
       switch (mode) {
         case RLE:
           if (currentValue == maxDefLevel) {
-            dictionaryEncodedValuesReader.readBatchOfDictionaryEncodedIntLongBackedDecimals(vector, typeWidth, idx,
+            dictionaryEncodedValuesReader.readBatchOfDictionaryEncodedIntBackedDecimals(vector, idx,
                 num, dict, nullabilityHolder);
           } else {
             setNulls(nullabilityHolder, idx, num, vector.getValidityBuffer());
@@ -885,9 +924,49 @@
             if (packedValuesBuffer[packedValuesBufferIdx++] == maxDefLevel) {
               ((DecimalVector) vector).set(
                   idx,
-                  typeWidth == Integer.BYTES ?
-                      dict.decodeToInt(dictionaryEncodedValuesReader.readInteger())
-                      : dict.decodeToLong(dictionaryEncodedValuesReader.readInteger()));
+                      dict.decodeToInt(dictionaryEncodedValuesReader.readInteger()));
+              nullabilityHolder.setNotNull(idx);
+            } else {
+              setNull(nullabilityHolder, idx, vector.getValidityBuffer());
+            }
+            idx++;
+          }
+          break;
+      }
+      left -= num;
+      currentCount -= num;
+    }
+  }
+
+  public void readBatchOfDictionaryEncodedLongBackedDecimals(
+          final FieldVector vector,
+          final int startOffset,
+          final int numValsToRead,
+          NullabilityHolder nullabilityHolder,
+          VectorizedDictionaryEncodedParquetValuesReader dictionaryEncodedValuesReader,
+          Dictionary dict) {
+    int idx = startOffset;
+    int left = numValsToRead;
+    while (left > 0) {
+      if (this.currentCount == 0) {
+        this.readNextGroup();
+      }
+      int num = Math.min(left, this.currentCount);
+      switch (mode) {
+        case RLE:
+          if (currentValue == maxDefLevel) {
+            dictionaryEncodedValuesReader.readBatchOfDictionaryEncodedLongBackedDecimals(vector, idx,
+                    num, dict, nullabilityHolder);
+          } else {
+            setNulls(nullabilityHolder, idx, num, vector.getValidityBuffer());
+          }
+          idx += num;
+          break;
+        case PACKED:
+          for (int i = 0; i < num; i++) {
+            if (packedValuesBuffer[packedValuesBufferIdx++] == maxDefLevel) {
+              ((DecimalVector) vector).set(
+                      idx, dict.decodeToLong(dictionaryEncodedValuesReader.readInteger()));
               nullabilityHolder.setNotNull(idx);
             } else {
               setNull(nullabilityHolder, idx, vector.getValidityBuffer());