DRILL-8458: Use correct size of definition level bytes slice when reading Parquet v2 data page (#2838)
diff --git a/exec/java-exec/src/main/java/org/apache/parquet/hadoop/ColumnChunkIncReadStore.java b/exec/java-exec/src/main/java/org/apache/parquet/hadoop/ColumnChunkIncReadStore.java
index 773a861..7834eaa 100644
--- a/exec/java-exec/src/main/java/org/apache/parquet/hadoop/ColumnChunkIncReadStore.java
+++ b/exec/java-exec/src/main/java/org/apache/parquet/hadoop/ColumnChunkIncReadStore.java
@@ -217,13 +217,13 @@
int pageBufOffset = 0;
ByteBuffer bb = (ByteBuffer) pageBuf.position(pageBufOffset);
BytesInput repLevelBytes = BytesInput.from(
- (ByteBuffer) bb.slice().limit(pageBufOffset + repLevelSize)
+ (ByteBuffer) bb.slice().limit(repLevelSize)
);
pageBufOffset += repLevelSize;
bb = (ByteBuffer) pageBuf.position(pageBufOffset);
final BytesInput defLevelBytes = BytesInput.from(
- (ByteBuffer) bb.slice().limit(pageBufOffset + defLevelSize)
+ (ByteBuffer) bb.slice().limit(defLevelSize)
);
pageBufOffset += defLevelSize;
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/ParquetSimpleTestFileGenerator.java b/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/ParquetSimpleTestFileGenerator.java
index 232aec9..efd1b4f 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/ParquetSimpleTestFileGenerator.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/ParquetSimpleTestFileGenerator.java
@@ -206,11 +206,17 @@
" } \n" +
" } \n" +
"} \n";
+ public static String repeatedIntSchemaMsg =
+ "message ParquetRepeated { \n" +
+ " required int32 rowKey; \n" +
+ " repeated int32 repeatedInt ( INTEGER(32,true) ) ; \n" +
+ "} \n";
public static MessageType simpleSchema = MessageTypeParser.parseMessageType(simpleSchemaMsg);
public static MessageType complexSchema = MessageTypeParser.parseMessageType(complexSchemaMsg);
public static MessageType simpleNullableSchema = MessageTypeParser.parseMessageType(simpleNullableSchemaMsg);
public static MessageType complexNullableSchema = MessageTypeParser.parseMessageType(complexNullableSchemaMsg);
+ public static MessageType repeatedIntSchema = MessageTypeParser.parseMessageType(repeatedIntSchemaMsg);
public static Path initFile(String fileName) {
@@ -218,6 +224,14 @@
}
public static ParquetWriter<Group> initWriter(MessageType schema, String fileName, boolean dictEncoding) throws IOException {
+ return initWriter(schema, fileName, ParquetProperties.WriterVersion.PARQUET_1_0, dictEncoding);
+ }
+
+ public static ParquetWriter<Group> initWriter(
+ MessageType schema,
+ String fileName,
+ ParquetProperties.WriterVersion version,
+ boolean dictEncoding) throws IOException {
GroupWriteSupport.setSchema(schema, conf);
@@ -228,7 +242,7 @@
.withPageSize(1024)
.withDictionaryPageSize(512)
.withValidation(false)
- .withWriterVersion(ParquetProperties.WriterVersion.PARQUET_1_0)
+ .withWriterVersion(version)
.withConf(conf)
.build();
}
@@ -455,12 +469,32 @@
}
}
+ public static void writeRepeatedIntValues(
+ SimpleGroupFactory groupFactory,
+ ParquetWriter<Group> writer,
+ int numRows) throws IOException {
+
+ int[] repeatedValues = {666, 1492, 4711};
+
+ for (int i = 0; i< numRows; i++) {
+
+ Group g = groupFactory.newGroup();
+ g.append("rowKey", i+1);
+ for (int r :repeatedValues) {
+ g.append("repeatedInt", r);
+ }
+
+ writer.write(g);
+ }
+ }
+
public static void main(String[] args) throws IOException {
SimpleGroupFactory sgf = new SimpleGroupFactory(simpleSchema);
GroupFactory gf = new SimpleGroupFactory(complexSchema);
SimpleGroupFactory sngf = new SimpleGroupFactory(simpleNullableSchema);
GroupFactory ngf = new SimpleGroupFactory(complexNullableSchema);
+ SimpleGroupFactory repeatedIntGroupFactory = new SimpleGroupFactory(repeatedIntSchema);
// Generate files with dictionary encoding enabled and disabled
ParquetWriter<Group> simpleWriter = initWriter(simpleSchema, "drill/parquet_test_file_simple", true);
@@ -471,6 +505,7 @@
ParquetWriter<Group> complexNoDictWriter = initWriter(complexSchema, "drill/parquet_test_file_complex_nodict", false);
ParquetWriter<Group> simpleNullableNoDictWriter = initWriter(simpleNullableSchema, "drill/parquet_test_file_simple_nullable_nodict", false);
ParquetWriter<Group> complexNullableNoDictWriter = initWriter(complexNullableSchema, "drill/parquet_test_file_complex_nullable_nodict", false);
+ ParquetWriter<Group> repeatedIntV2Writer = initWriter(repeatedIntSchema, "drill/parquet_v2_repeated_int.parquet", ParquetProperties.WriterVersion.PARQUET_2_0, true);
ParquetSimpleTestFileGenerator.writeSimpleValues(sgf, simpleWriter, false);
ParquetSimpleTestFileGenerator.writeSimpleValues(sngf, simpleNullableWriter, true);
@@ -480,6 +515,7 @@
ParquetSimpleTestFileGenerator.writeSimpleValues(sngf, simpleNullableNoDictWriter, true);
ParquetSimpleTestFileGenerator.writeComplexValues(gf, complexNoDictWriter, false);
ParquetSimpleTestFileGenerator.writeComplexValues(ngf, complexNullableNoDictWriter, true);
+ ParquetSimpleTestFileGenerator.writeRepeatedIntValues(repeatedIntGroupFactory, repeatedIntV2Writer, 100);
simpleWriter.close();
complexWriter.close();
@@ -489,6 +525,7 @@
complexNoDictWriter.close();
simpleNullableNoDictWriter.close();
complexNullableNoDictWriter.close();
+ repeatedIntV2Writer.close();
}
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/TestParquetComplex.java b/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/TestParquetComplex.java
index e03af04..579f3ff 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/TestParquetComplex.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/store/parquet/TestParquetComplex.java
@@ -901,4 +901,16 @@
.baselineValues(firstValue, null, secondValue)
.go();
}
+
+
+ @Test
+ public void testSelectRepeatedInt() throws Exception {
+ // DRILL-8458
+ String query = "select repeatedInt as r from %s";
+ testBuilder()
+ .sqlQuery(query, "cp.`parquet/parquet_v2_repeated_int.parquet`")
+ .unOrdered()
+ .expectsNumRecords(100)
+ .go();
+ }
}
diff --git a/exec/java-exec/src/test/resources/parquet/parquet_v2_repeated_int.parquet b/exec/java-exec/src/test/resources/parquet/parquet_v2_repeated_int.parquet
new file mode 100644
index 0000000..91fed9b
--- /dev/null
+++ b/exec/java-exec/src/test/resources/parquet/parquet_v2_repeated_int.parquet
Binary files differ