PARQUET-1633: Fix integer overflow (#902)

Unit test:
- Updated ParquetWriter to support setting row group size in long
- Removed Xmx settings in the pom to allow more memory for the tests

Co-authored-by: Gabor Szadovszky <gabor@apache.org>
diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java
index 1fa9c1f..3a68e01 100644
--- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java
+++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java
@@ -944,7 +944,7 @@
           currentParts = new ConsecutivePartList(startingPos);
           allParts.add(currentParts);
         }
-        currentParts.addChunk(new ChunkDescriptor(columnDescriptor, mc, startingPos, (int)mc.getTotalSize()));
+        currentParts.addChunk(new ChunkDescriptor(columnDescriptor, mc, startingPos, mc.getTotalSize()));
       }
     }
     // actually read all the chunks
@@ -1066,7 +1066,7 @@
             allParts.add(currentParts);
           }
           ChunkDescriptor chunkDescriptor = new ChunkDescriptor(columnDescriptor, mc, startingPos,
-              Math.toIntExact(range.getLength()));
+              range.getLength());
           currentParts.addChunk(chunkDescriptor);
           builder.setOffsetIndex(chunkDescriptor, filteredOffsetIndex);
         }
@@ -1691,7 +1691,7 @@
     private final ColumnDescriptor col;
     private final ColumnChunkMetaData metadata;
     private final long fileOffset;
-    private final int size;
+    private final long size;
 
     /**
      * @param col column this chunk is part of
@@ -1703,7 +1703,7 @@
         ColumnDescriptor col,
         ColumnChunkMetaData metadata,
         long fileOffset,
-        int size) {
+        long size) {
       super();
       this.col = col;
       this.metadata = metadata;
@@ -1735,7 +1735,7 @@
   private class ConsecutivePartList {
 
     private final long offset;
-    private int length;
+    private long length;
     private final List<ChunkDescriptor> chunks = new ArrayList<>();
 
     /**
@@ -1763,8 +1763,8 @@
     public void readAll(SeekableInputStream f, ChunkListBuilder builder) throws IOException {
       f.seek(offset);
 
-      int fullAllocations = length / options.getMaxAllocationSize();
-      int lastAllocationSize = length % options.getMaxAllocationSize();
+      int fullAllocations = Math.toIntExact(length / options.getMaxAllocationSize());
+      int lastAllocationSize = Math.toIntExact(length % options.getMaxAllocationSize());
 
       int numAllocations = fullAllocations + (lastAllocationSize > 0 ? 1 : 0);
       List<ByteBuffer> buffers = new ArrayList<>(numAllocations);
diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetWriter.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetWriter.java
index 696fec3..b9953a5 100644
--- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetWriter.java
+++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetWriter.java
@@ -269,7 +269,7 @@
       ParquetFileWriter.Mode mode,
       WriteSupport<T> writeSupport,
       CompressionCodecName compressionCodecName,
-      int rowGroupSize,
+      long rowGroupSize,
       boolean validating,
       Configuration conf,
       int maxPaddingSize,
@@ -355,7 +355,7 @@
     private Configuration conf = new Configuration();
     private ParquetFileWriter.Mode mode;
     private CompressionCodecName codecName = DEFAULT_COMPRESSION_CODEC_NAME;
-    private int rowGroupSize = DEFAULT_BLOCK_SIZE;
+    private long rowGroupSize = DEFAULT_BLOCK_SIZE;
     private int maxPaddingSize = MAX_PADDING_SIZE_DEFAULT;
     private boolean enableValidation = DEFAULT_IS_VALIDATING_ENABLED;
     private ParquetProperties.Builder encodingPropsBuilder =
@@ -432,8 +432,20 @@
      *
      * @param rowGroupSize an integer size in bytes
      * @return this builder for method chaining.
+     * @deprecated Use {@link #withRowGroupSize(long)} instead
      */
+    @Deprecated
     public SELF withRowGroupSize(int rowGroupSize) {
+      return withRowGroupSize((long) rowGroupSize);
+    }
+
+    /**
+     * Set the Parquet format row group size used by the constructed writer.
+     *
+     * @param rowGroupSize an integer size in bytes
+     * @return this builder for method chaining.
+     */
+    public SELF withRowGroupSize(long rowGroupSize) {
       this.rowGroupSize = rowGroupSize;
       return self();
     }
diff --git a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestLargeColumnChunk.java b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestLargeColumnChunk.java
new file mode 100644
index 0000000..90015f5
--- /dev/null
+++ b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestLargeColumnChunk.java
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.parquet.hadoop;
+
+import static org.apache.parquet.filter2.predicate.FilterApi.binaryColumn;
+import static org.apache.parquet.filter2.predicate.FilterApi.eq;
+import static org.apache.parquet.hadoop.ParquetFileWriter.Mode.OVERWRITE;
+import static org.apache.parquet.hadoop.metadata.CompressionCodecName.UNCOMPRESSED;
+import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.BINARY;
+import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.INT64;
+import static org.apache.parquet.schema.Types.buildMessage;
+import static org.apache.parquet.schema.Types.required;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+
+import java.io.IOException;
+import java.util.Random;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.parquet.example.data.Group;
+import org.apache.parquet.example.data.GroupFactory;
+import org.apache.parquet.example.data.simple.SimpleGroupFactory;
+import org.apache.parquet.filter2.compat.FilterCompat;
+import org.apache.parquet.hadoop.example.ExampleParquetWriter;
+import org.apache.parquet.hadoop.example.GroupReadSupport;
+import org.apache.parquet.hadoop.example.GroupWriteSupport;
+import org.apache.parquet.hadoop.util.HadoopOutputFile;
+import org.apache.parquet.io.api.Binary;
+import org.apache.parquet.schema.MessageType;
+import org.junit.*;
+import org.junit.rules.TemporaryFolder;
+
+/**
+ * This test is to test parquet-mr working with potential int overflows (when the sizes are greater than
+ * Integer.MAX_VALUE). The test requires ~3GB memory so it is likely to fail in the CI environment, so these
+ * tests are flagged to be ignored.
+ */
+@Ignore
+public class TestLargeColumnChunk {
+  private static final MessageType SCHEMA = buildMessage().addFields(
+      required(INT64).named("id"),
+      required(BINARY).named("data"))
+      .named("schema");
+  private static final int DATA_SIZE = 256;
+  // Ensure that the size of the column chunk would overflow an int
+  private static final int ROW_COUNT = Integer.MAX_VALUE / DATA_SIZE + 1000;
+  private static final long RANDOM_SEED = 42;
+  private static final int ID_INDEX = SCHEMA.getFieldIndex("id");
+  private static final int DATA_INDEX = SCHEMA.getFieldIndex("data");
+
+  private static final long ID_OF_FILTERED_DATA = ROW_COUNT / 2;
+  private static Binary VALUE_IN_DATA;
+  private static Binary VALUE_NOT_IN_DATA;
+  private static Path file;
+
+  @ClassRule
+  public static TemporaryFolder folder = new TemporaryFolder();
+
+  @BeforeClass
+  public static void createFile() throws IOException {
+    file = new Path(folder.newFile().getAbsolutePath());
+
+    GroupFactory factory = new SimpleGroupFactory(SCHEMA);
+    Random random = new Random(RANDOM_SEED);
+    Configuration conf = new Configuration();
+    GroupWriteSupport.setSchema(SCHEMA, conf);
+    try (ParquetWriter<Group> writer = ExampleParquetWriter
+        .builder(HadoopOutputFile.fromPath(file, conf))
+        .withWriteMode(OVERWRITE)
+        .withConf(conf)
+        .withCompressionCodec(UNCOMPRESSED)
+        .withRowGroupSize(4L * 1024 * 1024 * 1024) // 4G to ensure all data goes to one row group
+        .withBloomFilterEnabled(true)
+        .build()) {
+      for (long id = 0; id < ROW_COUNT; ++id) {
+        Group group = factory.newGroup();
+        group.add(ID_INDEX, id);
+        Binary data = nextBinary(random);
+        group.add(DATA_INDEX, data);
+        writer.write(group);
+        if (id == ID_OF_FILTERED_DATA) {
+          VALUE_IN_DATA = data;
+        }
+      }
+    }
+    VALUE_NOT_IN_DATA = nextBinary(random);
+  }
+
+  private static Binary nextBinary(Random random) {
+    byte[] bytes = new byte[DATA_SIZE];
+    random.nextBytes(bytes);
+    return Binary.fromConstantByteArray(bytes);
+  }
+
+  @Test
+  public void validateAllData() throws IOException {
+    Random random = new Random(RANDOM_SEED);
+    try (ParquetReader<Group> reader = ParquetReader.builder(new GroupReadSupport(), file).build()) {
+      for (long id = 0; id < ROW_COUNT; ++id) {
+        Group group = reader.read();
+        assertEquals(id, group.getLong(ID_INDEX, 0));
+        assertEquals(nextBinary(random), group.getBinary(DATA_INDEX, 0));
+      }
+      assertNull("No more record should be read", reader.read());
+    }
+  }
+
+  @Test
+  public void validateFiltering() throws IOException {
+    try (ParquetReader<Group> reader = ParquetReader.builder(new GroupReadSupport(), file)
+        .withFilter(FilterCompat.get(eq(binaryColumn("data"), VALUE_IN_DATA)))
+        .build()) {
+      Group group = reader.read();
+      assertEquals(ID_OF_FILTERED_DATA, group.getLong(ID_INDEX, 0));
+      assertEquals(VALUE_IN_DATA, group.getBinary(DATA_INDEX, 0));
+      assertNull("No more record should be read", reader.read());
+    }
+    try (ParquetReader<Group> reader = ParquetReader.builder(new GroupReadSupport(), file)
+        .withFilter(FilterCompat.get(eq(binaryColumn("data"), VALUE_NOT_IN_DATA)))
+        .build()) {
+      assertNull("No record should be read", reader.read());
+    }
+  }
+}
diff --git a/pom.xml b/pom.xml
index 738b527..090ae96 100644
--- a/pom.xml
+++ b/pom.xml
@@ -109,7 +109,6 @@
     <commons-text.version>1.8</commons-text.version>
 
     <!-- properties for the profiles -->
-    <surefire.argLine>-Xmx512m</surefire.argLine>
     <surefire.logLevel>INFO</surefire.logLevel>
   </properties>
 
@@ -562,7 +561,7 @@
       <id>ci-test</id>
       <properties>
         <surefire.logLevel>WARN</surefire.logLevel>
-        <surefire.argLine>-Xmx512m -XX:MaxJavaStackTraceDepth=10</surefire.argLine>
+        <surefire.argLine>-XX:MaxJavaStackTraceDepth=10</surefire.argLine>
       </properties>
     </profile>