PARQUET-1633: Fix integer overflow (#902)
Unit test:
- Updated ParquetWriter to support setting row group size in long
- Removed Xmx settings in the pom to allow more memory for the tests
Co-authored-by: Gabor Szadovszky <gabor@apache.org>
diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java
index 1fa9c1f..3a68e01 100644
--- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java
+++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java
@@ -944,7 +944,7 @@
currentParts = new ConsecutivePartList(startingPos);
allParts.add(currentParts);
}
- currentParts.addChunk(new ChunkDescriptor(columnDescriptor, mc, startingPos, (int)mc.getTotalSize()));
+ currentParts.addChunk(new ChunkDescriptor(columnDescriptor, mc, startingPos, mc.getTotalSize()));
}
}
// actually read all the chunks
@@ -1066,7 +1066,7 @@
allParts.add(currentParts);
}
ChunkDescriptor chunkDescriptor = new ChunkDescriptor(columnDescriptor, mc, startingPos,
- Math.toIntExact(range.getLength()));
+ range.getLength());
currentParts.addChunk(chunkDescriptor);
builder.setOffsetIndex(chunkDescriptor, filteredOffsetIndex);
}
@@ -1691,7 +1691,7 @@
private final ColumnDescriptor col;
private final ColumnChunkMetaData metadata;
private final long fileOffset;
- private final int size;
+ private final long size;
/**
* @param col column this chunk is part of
@@ -1703,7 +1703,7 @@
ColumnDescriptor col,
ColumnChunkMetaData metadata,
long fileOffset,
- int size) {
+ long size) {
super();
this.col = col;
this.metadata = metadata;
@@ -1735,7 +1735,7 @@
private class ConsecutivePartList {
private final long offset;
- private int length;
+ private long length;
private final List<ChunkDescriptor> chunks = new ArrayList<>();
/**
@@ -1763,8 +1763,8 @@
public void readAll(SeekableInputStream f, ChunkListBuilder builder) throws IOException {
f.seek(offset);
- int fullAllocations = length / options.getMaxAllocationSize();
- int lastAllocationSize = length % options.getMaxAllocationSize();
+ int fullAllocations = Math.toIntExact(length / options.getMaxAllocationSize());
+ int lastAllocationSize = Math.toIntExact(length % options.getMaxAllocationSize());
int numAllocations = fullAllocations + (lastAllocationSize > 0 ? 1 : 0);
List<ByteBuffer> buffers = new ArrayList<>(numAllocations);
diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetWriter.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetWriter.java
index 696fec3..b9953a5 100644
--- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetWriter.java
+++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetWriter.java
@@ -269,7 +269,7 @@
ParquetFileWriter.Mode mode,
WriteSupport<T> writeSupport,
CompressionCodecName compressionCodecName,
- int rowGroupSize,
+ long rowGroupSize,
boolean validating,
Configuration conf,
int maxPaddingSize,
@@ -355,7 +355,7 @@
private Configuration conf = new Configuration();
private ParquetFileWriter.Mode mode;
private CompressionCodecName codecName = DEFAULT_COMPRESSION_CODEC_NAME;
- private int rowGroupSize = DEFAULT_BLOCK_SIZE;
+ private long rowGroupSize = DEFAULT_BLOCK_SIZE;
private int maxPaddingSize = MAX_PADDING_SIZE_DEFAULT;
private boolean enableValidation = DEFAULT_IS_VALIDATING_ENABLED;
private ParquetProperties.Builder encodingPropsBuilder =
@@ -432,8 +432,20 @@
*
* @param rowGroupSize an integer size in bytes
* @return this builder for method chaining.
+ * @deprecated Use {@link #withRowGroupSize(long)} instead
*/
+ @Deprecated
public SELF withRowGroupSize(int rowGroupSize) {
+ return withRowGroupSize((long) rowGroupSize);
+ }
+
+ /**
+ * Set the Parquet format row group size used by the constructed writer.
+ *
+ * @param rowGroupSize an integer size in bytes
+ * @return this builder for method chaining.
+ */
+ public SELF withRowGroupSize(long rowGroupSize) {
this.rowGroupSize = rowGroupSize;
return self();
}
diff --git a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestLargeColumnChunk.java b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestLargeColumnChunk.java
new file mode 100644
index 0000000..90015f5
--- /dev/null
+++ b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestLargeColumnChunk.java
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.parquet.hadoop;
+
+import static org.apache.parquet.filter2.predicate.FilterApi.binaryColumn;
+import static org.apache.parquet.filter2.predicate.FilterApi.eq;
+import static org.apache.parquet.hadoop.ParquetFileWriter.Mode.OVERWRITE;
+import static org.apache.parquet.hadoop.metadata.CompressionCodecName.UNCOMPRESSED;
+import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.BINARY;
+import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.INT64;
+import static org.apache.parquet.schema.Types.buildMessage;
+import static org.apache.parquet.schema.Types.required;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+
+import java.io.IOException;
+import java.util.Random;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.parquet.example.data.Group;
+import org.apache.parquet.example.data.GroupFactory;
+import org.apache.parquet.example.data.simple.SimpleGroupFactory;
+import org.apache.parquet.filter2.compat.FilterCompat;
+import org.apache.parquet.hadoop.example.ExampleParquetWriter;
+import org.apache.parquet.hadoop.example.GroupReadSupport;
+import org.apache.parquet.hadoop.example.GroupWriteSupport;
+import org.apache.parquet.hadoop.util.HadoopOutputFile;
+import org.apache.parquet.io.api.Binary;
+import org.apache.parquet.schema.MessageType;
+import org.junit.*;
+import org.junit.rules.TemporaryFolder;
+
+/**
+ * This test is to test parquet-mr working with potential int overflows (when the sizes are greater than
+ * Integer.MAX_VALUE). The test requires ~3GB memory so it is likely to fail in the CI environment, so these
+ * tests are flagged to be ignored.
+ */
+@Ignore
+public class TestLargeColumnChunk {
+ private static final MessageType SCHEMA = buildMessage().addFields(
+ required(INT64).named("id"),
+ required(BINARY).named("data"))
+ .named("schema");
+ private static final int DATA_SIZE = 256;
+ // Ensure that the size of the column chunk would overflow an int
+ private static final int ROW_COUNT = Integer.MAX_VALUE / DATA_SIZE + 1000;
+ private static final long RANDOM_SEED = 42;
+ private static final int ID_INDEX = SCHEMA.getFieldIndex("id");
+ private static final int DATA_INDEX = SCHEMA.getFieldIndex("data");
+
+ private static final long ID_OF_FILTERED_DATA = ROW_COUNT / 2;
+ private static Binary VALUE_IN_DATA;
+ private static Binary VALUE_NOT_IN_DATA;
+ private static Path file;
+
+ @ClassRule
+ public static TemporaryFolder folder = new TemporaryFolder();
+
+ @BeforeClass
+ public static void createFile() throws IOException {
+ file = new Path(folder.newFile().getAbsolutePath());
+
+ GroupFactory factory = new SimpleGroupFactory(SCHEMA);
+ Random random = new Random(RANDOM_SEED);
+ Configuration conf = new Configuration();
+ GroupWriteSupport.setSchema(SCHEMA, conf);
+ try (ParquetWriter<Group> writer = ExampleParquetWriter
+ .builder(HadoopOutputFile.fromPath(file, conf))
+ .withWriteMode(OVERWRITE)
+ .withConf(conf)
+ .withCompressionCodec(UNCOMPRESSED)
+ .withRowGroupSize(4L * 1024 * 1024 * 1024) // 4G to ensure all data goes to one row group
+ .withBloomFilterEnabled(true)
+ .build()) {
+ for (long id = 0; id < ROW_COUNT; ++id) {
+ Group group = factory.newGroup();
+ group.add(ID_INDEX, id);
+ Binary data = nextBinary(random);
+ group.add(DATA_INDEX, data);
+ writer.write(group);
+ if (id == ID_OF_FILTERED_DATA) {
+ VALUE_IN_DATA = data;
+ }
+ }
+ }
+ VALUE_NOT_IN_DATA = nextBinary(random);
+ }
+
+ private static Binary nextBinary(Random random) {
+ byte[] bytes = new byte[DATA_SIZE];
+ random.nextBytes(bytes);
+ return Binary.fromConstantByteArray(bytes);
+ }
+
+ @Test
+ public void validateAllData() throws IOException {
+ Random random = new Random(RANDOM_SEED);
+ try (ParquetReader<Group> reader = ParquetReader.builder(new GroupReadSupport(), file).build()) {
+ for (long id = 0; id < ROW_COUNT; ++id) {
+ Group group = reader.read();
+ assertEquals(id, group.getLong(ID_INDEX, 0));
+ assertEquals(nextBinary(random), group.getBinary(DATA_INDEX, 0));
+ }
+ assertNull("No more record should be read", reader.read());
+ }
+ }
+
+ @Test
+ public void validateFiltering() throws IOException {
+ try (ParquetReader<Group> reader = ParquetReader.builder(new GroupReadSupport(), file)
+ .withFilter(FilterCompat.get(eq(binaryColumn("data"), VALUE_IN_DATA)))
+ .build()) {
+ Group group = reader.read();
+ assertEquals(ID_OF_FILTERED_DATA, group.getLong(ID_INDEX, 0));
+ assertEquals(VALUE_IN_DATA, group.getBinary(DATA_INDEX, 0));
+ assertNull("No more record should be read", reader.read());
+ }
+ try (ParquetReader<Group> reader = ParquetReader.builder(new GroupReadSupport(), file)
+ .withFilter(FilterCompat.get(eq(binaryColumn("data"), VALUE_NOT_IN_DATA)))
+ .build()) {
+ assertNull("No record should be read", reader.read());
+ }
+ }
+}
diff --git a/pom.xml b/pom.xml
index 738b527..090ae96 100644
--- a/pom.xml
+++ b/pom.xml
@@ -109,7 +109,6 @@
<commons-text.version>1.8</commons-text.version>
<!-- properties for the profiles -->
- <surefire.argLine>-Xmx512m</surefire.argLine>
<surefire.logLevel>INFO</surefire.logLevel>
</properties>
@@ -562,7 +561,7 @@
<id>ci-test</id>
<properties>
<surefire.logLevel>WARN</surefire.logLevel>
- <surefire.argLine>-Xmx512m -XX:MaxJavaStackTraceDepth=10</surefire.argLine>
+ <surefire.argLine>-XX:MaxJavaStackTraceDepth=10</surefire.argLine>
</properties>
</profile>