PARQUET-1580: Page-level CRC checksum verfication for DataPageV1 (#647)
* Page-level checksums for DataPageV1
* Got rid of redundant constant
* Use more direct way of obtaining defaults
* Revised implementation, updated tests, addressed review comments
* Revert auto whitespace trimming
* Variable rename for consistency
* Revert whitespace changes
* Revert more whitespace changes
* Addressed code review comments
* Enable writing out checksums by default
* Added benchmarks
* Addressed review comments
* Addressed test failures
* Added run script for checksum benchmarks
* Addressed code review comments
diff --git a/parquet-benchmarks/run_checksums.sh b/parquet-benchmarks/run_checksums.sh
new file mode 100755
index 0000000..e798488
--- /dev/null
+++ b/parquet-benchmarks/run_checksums.sh
@@ -0,0 +1,28 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+# !/usr/bin/env bash
+
+SCRIPT_PATH=$( cd "$(dirname "$0")" ; pwd -P )
+
+echo "Page level CRC checksum benchmarks"
+echo "Running write benchmarks"
+java -jar ${SCRIPT_PATH}/target/parquet-benchmarks.jar p*PageChecksumWriteBenchmarks -bm ss "$@"
+echo "Running read benchmarks"
+java -jar ${SCRIPT_PATH}/target/parquet-benchmarks.jar p*PageChecksumReadBenchmarks -bm ss "$@"
diff --git a/parquet-benchmarks/src/main/java/org/apache/parquet/benchmarks/BenchmarkFiles.java b/parquet-benchmarks/src/main/java/org/apache/parquet/benchmarks/BenchmarkFiles.java
index d9ef4fd..f039403 100644
--- a/parquet-benchmarks/src/main/java/org/apache/parquet/benchmarks/BenchmarkFiles.java
+++ b/parquet-benchmarks/src/main/java/org/apache/parquet/benchmarks/BenchmarkFiles.java
@@ -37,4 +37,26 @@
// public final Path parquetFile_1M_LZO = new Path("target/tests/ParquetBenchmarks/PARQUET-1M-LZO");
public static final Path file_1M_SNAPPY = new Path(TARGET_DIR + "/PARQUET-1M-SNAPPY");
public static final Path file_1M_GZIP = new Path(TARGET_DIR + "/PARQUET-1M-GZIP");
+
+ // Page checksum files
+ public static final Path file_100K_CHECKSUMS_UNCOMPRESSED = new Path(TARGET_DIR + "/PARQUET-100K-CHECKSUMS-UNCOMPRESSED");
+ public static final Path file_100K_NOCHECKSUMS_UNCOMPRESSED = new Path(TARGET_DIR + "/PARQUET-100K-NOCHECKSUMS-UNCOMPRESSED");
+ public static final Path file_1M_CHECKSUMS_UNCOMPRESSED = new Path(TARGET_DIR + "/PARQUET-1M-CHECKSUMS-UNCOMPRESSED");
+ public static final Path file_1M_NOCHECKSUMS_UNCOMPRESSED = new Path(TARGET_DIR + "/PARQUET-1M-NOCHECKSUMS-UNCOMPRESSED");
+ public static final Path file_10M_CHECKSUMS_UNCOMPRESSED = new Path(TARGET_DIR + "/PARQUET-10M-CHECKSUMS-UNCOMPRESSED");
+ public static final Path file_10M_NOCHECKSUMS_UNCOMPRESSED = new Path(TARGET_DIR + "/PARQUET-10M-NOCHECKSUMS-UNCOMPRESSED");
+
+ public static final Path file_100K_CHECKSUMS_GZIP = new Path(TARGET_DIR + "/PARQUET-100K-CHECKSUMS-GZIP");
+ public static final Path file_100K_NOCHECKSUMS_GZIP = new Path(TARGET_DIR + "/PARQUET-100K-NOCHECKSUMS-GZIP");
+ public static final Path file_1M_CHECKSUMS_GZIP = new Path(TARGET_DIR + "/PARQUET-1M-CHECKSUMS-GZIP");
+ public static final Path file_1M_NOCHECKSUMS_GZIP = new Path(TARGET_DIR + "/PARQUET-1M-NOCHECKSUMS-GZIP");
+ public static final Path file_10M_CHECKSUMS_GZIP = new Path(TARGET_DIR + "/PARQUET-10M-CHECKSUMS-GZIP");
+ public static final Path file_10M_NOCHECKSUMS_GZIP = new Path(TARGET_DIR + "/PARQUET-10M-NOCHECKSUMS-GZIP");
+
+ public static final Path file_100K_CHECKSUMS_SNAPPY = new Path(TARGET_DIR + "/PARQUET-100K-CHECKSUMS-SNAPPY");
+ public static final Path file_100K_NOCHECKSUMS_SNAPPY = new Path(TARGET_DIR + "/PARQUET-100K-NOCHECKSUMS-SNAPPY");
+ public static final Path file_1M_CHECKSUMS_SNAPPY = new Path(TARGET_DIR + "/PARQUET-1M-CHECKSUMS-SNAPPY");
+ public static final Path file_1M_NOCHECKSUMS_SNAPPY = new Path(TARGET_DIR + "/PARQUET-1M-NOCHECKSUMS-SNAPPY");
+ public static final Path file_10M_CHECKSUMS_SNAPPY = new Path(TARGET_DIR + "/PARQUET-10M-CHECKSUMS-SNAPPY");
+ public static final Path file_10M_NOCHECKSUMS_SNAPPY = new Path(TARGET_DIR + "/PARQUET-10M-NOCHECKSUMS-SNAPPY");
}
diff --git a/parquet-benchmarks/src/main/java/org/apache/parquet/benchmarks/PageChecksumDataGenerator.java b/parquet-benchmarks/src/main/java/org/apache/parquet/benchmarks/PageChecksumDataGenerator.java
new file mode 100644
index 0000000..6c62cc6
--- /dev/null
+++ b/parquet-benchmarks/src/main/java/org/apache/parquet/benchmarks/PageChecksumDataGenerator.java
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.benchmarks;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.parquet.example.data.GroupFactory;
+import org.apache.parquet.example.data.Group;
+import org.apache.parquet.example.data.simple.SimpleGroupFactory;
+import org.apache.parquet.hadoop.ParquetFileWriter;
+import org.apache.parquet.hadoop.ParquetWriter;
+import org.apache.parquet.hadoop.example.ExampleParquetWriter;
+import org.apache.parquet.hadoop.metadata.CompressionCodecName;
+import org.apache.parquet.schema.MessageType;
+import org.apache.parquet.schema.MessageTypeParser;
+
+import static java.util.UUID.randomUUID;
+import static org.apache.parquet.benchmarks.BenchmarkConstants.*;
+import static org.apache.parquet.benchmarks.BenchmarkFiles.*;
+
+import java.io.IOException;
+import java.util.Random;
+
+import static org.apache.parquet.benchmarks.BenchmarkUtils.deleteIfExists;
+import static org.apache.parquet.benchmarks.BenchmarkUtils.exists;
+import static org.apache.parquet.hadoop.metadata.CompressionCodecName.*;
+
+public class PageChecksumDataGenerator {
+
+ private final MessageType SCHEMA = MessageTypeParser.parseMessageType(
+ "message m {" +
+ " required int64 long_field;" +
+ " required binary binary_field;" +
+ " required group group {" +
+ " repeated int32 int_field;" +
+ " }" +
+ "}");
+
+ public void generateData(Path outFile, int nRows, boolean writeChecksums,
+ CompressionCodecName compression) throws IOException {
+ if (exists(configuration, outFile)) {
+ System.out.println("File already exists " + outFile);
+ return;
+ }
+
+ ParquetWriter<Group> writer = ExampleParquetWriter.builder(outFile)
+ .withConf(configuration)
+ .withWriteMode(ParquetFileWriter.Mode.OVERWRITE)
+ .withCompressionCodec(compression)
+ .withDictionaryEncoding(true)
+ .withType(SCHEMA)
+ .withPageWriteChecksumEnabled(writeChecksums)
+ .build();
+
+ GroupFactory groupFactory = new SimpleGroupFactory(SCHEMA);
+ Random rand = new Random(42);
+ for (int i = 0; i < nRows; i++) {
+ Group group = groupFactory.newGroup();
+ group
+ .append("long_field", (long) i)
+ .append("binary_field", randomUUID().toString())
+ .addGroup("group")
+ // Force dictionary encoding by performing modulo
+ .append("int_field", rand.nextInt() % 100)
+ .append("int_field", rand.nextInt() % 100)
+ .append("int_field", rand.nextInt() % 100)
+ .append("int_field", rand.nextInt() % 100);
+ writer.write(group);
+ }
+
+ writer.close();
+ }
+
+ public void generateAll() {
+ try {
+ // No need to generate the non-checksum versions, as the files generated here are only used in
+ // the read benchmarks
+ generateData(file_100K_CHECKSUMS_UNCOMPRESSED, 100 * ONE_K, true, UNCOMPRESSED);
+ generateData(file_100K_CHECKSUMS_GZIP, 100 * ONE_K, true, GZIP);
+ generateData(file_100K_CHECKSUMS_SNAPPY, 100 * ONE_K, true, SNAPPY);
+ generateData(file_1M_CHECKSUMS_UNCOMPRESSED, ONE_MILLION, true, UNCOMPRESSED);
+ generateData(file_1M_CHECKSUMS_GZIP, ONE_MILLION, true, GZIP);
+ generateData(file_1M_CHECKSUMS_SNAPPY, ONE_MILLION, true, SNAPPY);
+ generateData(file_10M_CHECKSUMS_UNCOMPRESSED, 10 * ONE_MILLION, true, UNCOMPRESSED);
+ generateData(file_10M_CHECKSUMS_GZIP, 10 * ONE_MILLION, true, GZIP);
+ generateData(file_10M_CHECKSUMS_SNAPPY, 10 * ONE_MILLION, true, SNAPPY);
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ public void cleanup() {
+ deleteIfExists(configuration, file_100K_NOCHECKSUMS_UNCOMPRESSED);
+ deleteIfExists(configuration, file_100K_CHECKSUMS_UNCOMPRESSED);
+ deleteIfExists(configuration, file_100K_NOCHECKSUMS_GZIP);
+ deleteIfExists(configuration, file_100K_CHECKSUMS_GZIP);
+ deleteIfExists(configuration, file_100K_NOCHECKSUMS_SNAPPY);
+ deleteIfExists(configuration, file_100K_CHECKSUMS_SNAPPY);
+ deleteIfExists(configuration, file_1M_NOCHECKSUMS_UNCOMPRESSED);
+ deleteIfExists(configuration, file_1M_CHECKSUMS_UNCOMPRESSED);
+ deleteIfExists(configuration, file_1M_NOCHECKSUMS_GZIP);
+ deleteIfExists(configuration, file_1M_CHECKSUMS_GZIP);
+ deleteIfExists(configuration, file_1M_NOCHECKSUMS_SNAPPY);
+ deleteIfExists(configuration, file_1M_CHECKSUMS_SNAPPY);
+ deleteIfExists(configuration, file_10M_NOCHECKSUMS_UNCOMPRESSED);
+ deleteIfExists(configuration, file_10M_CHECKSUMS_UNCOMPRESSED);
+ deleteIfExists(configuration, file_10M_NOCHECKSUMS_GZIP);
+ deleteIfExists(configuration, file_10M_CHECKSUMS_GZIP);
+ deleteIfExists(configuration, file_10M_NOCHECKSUMS_SNAPPY);
+ deleteIfExists(configuration, file_10M_CHECKSUMS_SNAPPY);
+ }
+}
diff --git a/parquet-benchmarks/src/main/java/org/apache/parquet/benchmarks/PageChecksumReadBenchmarks.java b/parquet-benchmarks/src/main/java/org/apache/parquet/benchmarks/PageChecksumReadBenchmarks.java
new file mode 100644
index 0000000..db23eeb
--- /dev/null
+++ b/parquet-benchmarks/src/main/java/org/apache/parquet/benchmarks/PageChecksumReadBenchmarks.java
@@ -0,0 +1,179 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.benchmarks;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.parquet.example.data.Group;
+import org.apache.parquet.hadoop.ParquetReader;
+import org.apache.parquet.hadoop.example.GroupReadSupport;
+import org.openjdk.jmh.annotations.Benchmark;
+import org.openjdk.jmh.annotations.BenchmarkMode;
+import org.openjdk.jmh.annotations.Level;
+import org.openjdk.jmh.annotations.Mode;
+import org.openjdk.jmh.annotations.Scope;
+import org.openjdk.jmh.annotations.Setup;
+import org.openjdk.jmh.annotations.State;
+import org.openjdk.jmh.infra.Blackhole;
+
+import static org.apache.parquet.benchmarks.BenchmarkConstants.ONE_K;
+import static org.apache.parquet.benchmarks.BenchmarkConstants.ONE_MILLION;
+import static org.apache.parquet.benchmarks.BenchmarkFiles.configuration;
+import static org.apache.parquet.benchmarks.BenchmarkFiles.file_100K_CHECKSUMS_UNCOMPRESSED;
+import static org.apache.parquet.benchmarks.BenchmarkFiles.file_100K_CHECKSUMS_GZIP;
+import static org.apache.parquet.benchmarks.BenchmarkFiles.file_100K_CHECKSUMS_SNAPPY;
+import static org.apache.parquet.benchmarks.BenchmarkFiles.file_1M_CHECKSUMS_UNCOMPRESSED;
+import static org.apache.parquet.benchmarks.BenchmarkFiles.file_1M_CHECKSUMS_GZIP;
+import static org.apache.parquet.benchmarks.BenchmarkFiles.file_1M_CHECKSUMS_SNAPPY;
+import static org.apache.parquet.benchmarks.BenchmarkFiles.file_10M_CHECKSUMS_UNCOMPRESSED;
+import static org.apache.parquet.benchmarks.BenchmarkFiles.file_10M_CHECKSUMS_GZIP;
+import static org.apache.parquet.benchmarks.BenchmarkFiles.file_10M_CHECKSUMS_SNAPPY;
+
+import java.io.IOException;
+
+@State(Scope.Thread)
+public class PageChecksumReadBenchmarks {
+
+ private PageChecksumDataGenerator pageChecksumDataGenerator = new PageChecksumDataGenerator();
+
+ @Setup(Level.Trial)
+ public void setup() {
+ pageChecksumDataGenerator.generateAll();
+ }
+
+ @Setup(Level.Trial)
+ public void cleanup() {
+ pageChecksumDataGenerator.cleanup();
+ }
+
+ private void readFile(Path file, int nRows, boolean verifyChecksums, Blackhole blackhole)
+ throws IOException {
+ try (ParquetReader<Group> reader = ParquetReader.builder(new GroupReadSupport(), file)
+ .withConf(configuration)
+ .usePageChecksumVerification(verifyChecksums)
+ .build()) {
+ for (int i = 0; i < nRows; i++) {
+ Group group = reader.read();
+ blackhole.consume(group.getLong("long_field", 0));
+ blackhole.consume(group.getBinary("binary_field", 0));
+ Group subgroup = group.getGroup("group", 0);
+ blackhole.consume(subgroup.getInteger("int_field", 0));
+ blackhole.consume(subgroup.getInteger("int_field", 1));
+ blackhole.consume(subgroup.getInteger("int_field", 2));
+ blackhole.consume(subgroup.getInteger("int_field", 3));
+ }
+ }
+ }
+
+ // 100k rows, uncompressed, GZIP, Snappy
+
+ @Benchmark @BenchmarkMode(Mode.SingleShotTime)
+ public void read100KRowsUncompressedWithoutVerification(Blackhole blackhole) throws IOException {
+ readFile(file_100K_CHECKSUMS_UNCOMPRESSED, 100 * ONE_K, false, blackhole);
+ }
+
+ @Benchmark @BenchmarkMode(Mode.SingleShotTime)
+ public void read100KRowsUncompressedWithVerification(Blackhole blackhole) throws IOException {
+ readFile(file_100K_CHECKSUMS_UNCOMPRESSED, 100 * ONE_K, true, blackhole);
+ }
+
+ @Benchmark @BenchmarkMode(Mode.SingleShotTime)
+ public void read100KRowsGzipWithoutVerification(Blackhole blackhole) throws IOException {
+ readFile(file_100K_CHECKSUMS_GZIP, 100 * ONE_K, false, blackhole);
+ }
+
+ @Benchmark @BenchmarkMode(Mode.SingleShotTime)
+ public void read100KRowsGzipWithVerification(Blackhole blackhole) throws IOException {
+ readFile(file_100K_CHECKSUMS_GZIP, 100 * ONE_K, true, blackhole);
+ }
+
+ @Benchmark @BenchmarkMode(Mode.SingleShotTime)
+ public void read100KRowsSnappyWithoutVerification(Blackhole blackhole) throws IOException {
+ readFile(file_100K_CHECKSUMS_SNAPPY, 100 * ONE_K, false, blackhole);
+ }
+
+ @Benchmark @BenchmarkMode(Mode.SingleShotTime)
+ public void read100KRowsSnappyWithVerification(Blackhole blackhole) throws IOException {
+ readFile(file_100K_CHECKSUMS_SNAPPY, 100 * ONE_K, true, blackhole);
+ }
+
+ // 1M rows, uncompressed, GZIP, Snappy
+
+ @Benchmark @BenchmarkMode(Mode.SingleShotTime)
+ public void read1MRowsUncompressedWithoutVerification(Blackhole blackhole) throws IOException {
+ readFile(file_1M_CHECKSUMS_UNCOMPRESSED, ONE_MILLION, false, blackhole);
+ }
+
+ @Benchmark @BenchmarkMode(Mode.SingleShotTime)
+ public void read1MRowsUncompressedWithVerification(Blackhole blackhole) throws IOException {
+ readFile(file_1M_CHECKSUMS_UNCOMPRESSED, ONE_MILLION, true, blackhole);
+ }
+
+ @Benchmark @BenchmarkMode(Mode.SingleShotTime)
+ public void read1MRowsGzipWithoutVerification(Blackhole blackhole) throws IOException {
+ readFile(file_1M_CHECKSUMS_GZIP, ONE_MILLION, false, blackhole);
+ }
+
+ @Benchmark @BenchmarkMode(Mode.SingleShotTime)
+ public void read1MRowsGzipWithVerification(Blackhole blackhole) throws IOException {
+ readFile(file_1M_CHECKSUMS_GZIP, ONE_MILLION, true, blackhole);
+ }
+
+ @Benchmark @BenchmarkMode(Mode.SingleShotTime)
+ public void read1MRowsSnappyWithoutVerification(Blackhole blackhole) throws IOException {
+ readFile(file_1M_CHECKSUMS_SNAPPY, ONE_MILLION, false, blackhole);
+ }
+
+ @Benchmark @BenchmarkMode(Mode.SingleShotTime)
+ public void read1MRowsSnappyWithVerification(Blackhole blackhole) throws IOException {
+ readFile(file_1M_CHECKSUMS_SNAPPY, ONE_MILLION, true, blackhole);
+ }
+
+ // 10M rows, uncompressed, GZIP, Snappy
+
+ @Benchmark @BenchmarkMode(Mode.SingleShotTime)
+ public void read10MRowsUncompressedWithoutVerification(Blackhole blackhole) throws IOException {
+ readFile(file_10M_CHECKSUMS_UNCOMPRESSED, 10 * ONE_MILLION, false, blackhole);
+ }
+
+ @Benchmark @BenchmarkMode(Mode.SingleShotTime)
+ public void read10MRowsUncompressedWithVerification(Blackhole blackhole) throws IOException {
+ readFile(file_10M_CHECKSUMS_UNCOMPRESSED, 10 * ONE_MILLION, true, blackhole);
+ }
+
+ @Benchmark @BenchmarkMode(Mode.SingleShotTime)
+ public void read10MRowsGzipWithoutVerification(Blackhole blackhole) throws IOException {
+ readFile(file_10M_CHECKSUMS_GZIP, 10 * ONE_MILLION, false, blackhole);
+ }
+
+ @Benchmark @BenchmarkMode(Mode.SingleShotTime)
+ public void read10MRowsGzipWithVerification(Blackhole blackhole) throws IOException {
+ readFile(file_10M_CHECKSUMS_GZIP, 10 * ONE_MILLION, true, blackhole);
+ }
+
+ @Benchmark @BenchmarkMode(Mode.SingleShotTime)
+ public void read10MRowsSnappyWithoutVerification(Blackhole blackhole) throws IOException {
+ readFile(file_10M_CHECKSUMS_SNAPPY, 10 * ONE_MILLION, false, blackhole);
+ }
+
+ @Benchmark @BenchmarkMode(Mode.SingleShotTime)
+ public void read10MRowsSnappyWithVerification(Blackhole blackhole) throws IOException {
+ readFile(file_10M_CHECKSUMS_SNAPPY, 10 * ONE_MILLION, true, blackhole);
+ }
+
+}
diff --git a/parquet-benchmarks/src/main/java/org/apache/parquet/benchmarks/PageChecksumWriteBenchmarks.java b/parquet-benchmarks/src/main/java/org/apache/parquet/benchmarks/PageChecksumWriteBenchmarks.java
new file mode 100644
index 0000000..c743dde
--- /dev/null
+++ b/parquet-benchmarks/src/main/java/org/apache/parquet/benchmarks/PageChecksumWriteBenchmarks.java
@@ -0,0 +1,160 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.benchmarks;
+
+import org.openjdk.jmh.annotations.Benchmark;
+import org.openjdk.jmh.annotations.BenchmarkMode;
+import org.openjdk.jmh.annotations.Level;
+import org.openjdk.jmh.annotations.Mode;
+import org.openjdk.jmh.annotations.Scope;
+import org.openjdk.jmh.annotations.Setup;
+import org.openjdk.jmh.annotations.State;
+
+import static org.apache.parquet.benchmarks.BenchmarkConstants.ONE_K;
+import static org.apache.parquet.benchmarks.BenchmarkConstants.ONE_MILLION;
+import static org.apache.parquet.benchmarks.BenchmarkFiles.file_100K_CHECKSUMS_UNCOMPRESSED;
+import static org.apache.parquet.benchmarks.BenchmarkFiles.file_100K_NOCHECKSUMS_UNCOMPRESSED;
+import static org.apache.parquet.benchmarks.BenchmarkFiles.file_100K_CHECKSUMS_GZIP;
+import static org.apache.parquet.benchmarks.BenchmarkFiles.file_100K_NOCHECKSUMS_GZIP;
+import static org.apache.parquet.benchmarks.BenchmarkFiles.file_100K_CHECKSUMS_SNAPPY;
+import static org.apache.parquet.benchmarks.BenchmarkFiles.file_100K_NOCHECKSUMS_SNAPPY;
+import static org.apache.parquet.benchmarks.BenchmarkFiles.file_1M_CHECKSUMS_UNCOMPRESSED;
+import static org.apache.parquet.benchmarks.BenchmarkFiles.file_1M_NOCHECKSUMS_UNCOMPRESSED;
+import static org.apache.parquet.benchmarks.BenchmarkFiles.file_1M_CHECKSUMS_GZIP;
+import static org.apache.parquet.benchmarks.BenchmarkFiles.file_1M_NOCHECKSUMS_GZIP;
+import static org.apache.parquet.benchmarks.BenchmarkFiles.file_1M_CHECKSUMS_SNAPPY;
+import static org.apache.parquet.benchmarks.BenchmarkFiles.file_1M_NOCHECKSUMS_SNAPPY;
+import static org.apache.parquet.benchmarks.BenchmarkFiles.file_10M_CHECKSUMS_UNCOMPRESSED;
+import static org.apache.parquet.benchmarks.BenchmarkFiles.file_10M_NOCHECKSUMS_UNCOMPRESSED;
+import static org.apache.parquet.benchmarks.BenchmarkFiles.file_10M_CHECKSUMS_GZIP;
+import static org.apache.parquet.benchmarks.BenchmarkFiles.file_10M_NOCHECKSUMS_GZIP;
+import static org.apache.parquet.benchmarks.BenchmarkFiles.file_10M_CHECKSUMS_SNAPPY;
+import static org.apache.parquet.benchmarks.BenchmarkFiles.file_10M_NOCHECKSUMS_SNAPPY;
+
+import java.io.IOException;
+
+import static org.apache.parquet.hadoop.metadata.CompressionCodecName.*;
+
+@State(Scope.Thread)
+public class PageChecksumWriteBenchmarks {
+
+ private PageChecksumDataGenerator pageChecksumDataGenerator = new PageChecksumDataGenerator();
+
+ @Setup(Level.Iteration)
+ public void cleanup() {
+ pageChecksumDataGenerator.cleanup();
+ }
+
+ // 100k rows, uncompressed, GZIP, Snappy
+
+ @Benchmark @BenchmarkMode(Mode.SingleShotTime)
+ public void write100KRowsUncompressedWithoutChecksums() throws IOException {
+ pageChecksumDataGenerator.generateData(file_100K_NOCHECKSUMS_UNCOMPRESSED, 100 * ONE_K, false, UNCOMPRESSED);
+ }
+
+ @Benchmark @BenchmarkMode(Mode.SingleShotTime)
+ public void write100KRowsUncompressedWithChecksums() throws IOException {
+ pageChecksumDataGenerator.generateData(file_100K_CHECKSUMS_UNCOMPRESSED, 100 * ONE_K, true, UNCOMPRESSED);
+ }
+
+ @Benchmark @BenchmarkMode(Mode.SingleShotTime)
+ public void write100KRowsGzipWithoutChecksums() throws IOException {
+ pageChecksumDataGenerator.generateData(file_100K_NOCHECKSUMS_GZIP, 100 * ONE_K, false, GZIP);
+ }
+
+ @Benchmark @BenchmarkMode(Mode.SingleShotTime)
+ public void write100KRowsGzipWithChecksums() throws IOException {
+ pageChecksumDataGenerator.generateData(file_100K_CHECKSUMS_GZIP, 100 * ONE_K, true, GZIP);
+ }
+
+ @Benchmark @BenchmarkMode(Mode.SingleShotTime)
+ public void write100KRowsSnappyWithoutChecksums() throws IOException {
+ pageChecksumDataGenerator.generateData(file_100K_NOCHECKSUMS_SNAPPY, 100 * ONE_K, false, SNAPPY);
+ }
+
+ @Benchmark @BenchmarkMode(Mode.SingleShotTime)
+ public void write100KRowsSnappyWithChecksums() throws IOException {
+ pageChecksumDataGenerator.generateData(file_100K_CHECKSUMS_SNAPPY, 100 * ONE_K, true, SNAPPY);
+ }
+
+ // 1M rows, uncompressed, GZIP, Snappy
+
+ @Benchmark @BenchmarkMode(Mode.SingleShotTime)
+ public void write1MRowsUncompressedWithoutChecksums() throws IOException {
+ pageChecksumDataGenerator.generateData(file_1M_NOCHECKSUMS_UNCOMPRESSED, ONE_MILLION, false, UNCOMPRESSED);
+ }
+
+ @Benchmark @BenchmarkMode(Mode.SingleShotTime)
+ public void write1MRowsUncompressedWithChecksums() throws IOException {
+ pageChecksumDataGenerator.generateData(file_1M_CHECKSUMS_UNCOMPRESSED, ONE_MILLION, true, UNCOMPRESSED);
+ }
+
+ @Benchmark @BenchmarkMode(Mode.SingleShotTime)
+ public void write1MRowsGzipWithoutChecksums() throws IOException {
+ pageChecksumDataGenerator.generateData(file_1M_NOCHECKSUMS_GZIP, ONE_MILLION, false, GZIP);
+ }
+
+ @Benchmark @BenchmarkMode(Mode.SingleShotTime)
+ public void write1MRowsGzipWithChecksums() throws IOException {
+ pageChecksumDataGenerator.generateData(file_1M_CHECKSUMS_GZIP, ONE_MILLION, true, GZIP);
+ }
+
+ @Benchmark @BenchmarkMode(Mode.SingleShotTime)
+ public void write1MRowsSnappyWithoutChecksums() throws IOException {
+ pageChecksumDataGenerator.generateData(file_1M_NOCHECKSUMS_SNAPPY, ONE_MILLION, false, SNAPPY);
+ }
+
+ @Benchmark @BenchmarkMode(Mode.SingleShotTime)
+ public void write1MRowsSnappyWithChecksums() throws IOException {
+ pageChecksumDataGenerator.generateData(file_1M_CHECKSUMS_SNAPPY, ONE_MILLION, true, SNAPPY);
+ }
+
+ // 10M rows, uncompressed, GZIP, Snappy
+
+ @Benchmark @BenchmarkMode(Mode.SingleShotTime)
+ public void write10MRowsUncompressedWithoutChecksums() throws IOException {
+ pageChecksumDataGenerator.generateData(file_10M_NOCHECKSUMS_UNCOMPRESSED, 10 * ONE_MILLION, false, UNCOMPRESSED);
+ }
+
+ @Benchmark @BenchmarkMode(Mode.SingleShotTime)
+ public void write10MRowsUncompressedWithChecksums() throws IOException {
+ pageChecksumDataGenerator.generateData(file_10M_CHECKSUMS_UNCOMPRESSED, 10 * ONE_MILLION, true, UNCOMPRESSED);
+ }
+
+ @Benchmark @BenchmarkMode(Mode.SingleShotTime)
+ public void write10MRowsGzipWithoutChecksums() throws IOException {
+ pageChecksumDataGenerator.generateData(file_10M_NOCHECKSUMS_GZIP, 10 * ONE_MILLION, false, GZIP);
+ }
+
+ @Benchmark @BenchmarkMode(Mode.SingleShotTime)
+ public void write10MRowsGzipWithChecksums() throws IOException {
+ pageChecksumDataGenerator.generateData(file_10M_CHECKSUMS_GZIP, 10 * ONE_MILLION, true, GZIP);
+ }
+
+ @Benchmark @BenchmarkMode(Mode.SingleShotTime)
+ public void write10MRowsSnappyWithoutChecksums() throws IOException {
+ pageChecksumDataGenerator.generateData(file_10M_NOCHECKSUMS_SNAPPY, 10 * ONE_MILLION, false, SNAPPY);
+ }
+
+ @Benchmark @BenchmarkMode(Mode.SingleShotTime)
+ public void write10MRowsSnappyWithChecksums() throws IOException {
+ pageChecksumDataGenerator.generateData(file_10M_CHECKSUMS_SNAPPY, 10 * ONE_MILLION, true, SNAPPY);
+ }
+
+}
diff --git a/parquet-column/src/main/java/org/apache/parquet/column/ParquetProperties.java b/parquet-column/src/main/java/org/apache/parquet/column/ParquetProperties.java
index 41e482c..7492b54 100644
--- a/parquet-column/src/main/java/org/apache/parquet/column/ParquetProperties.java
+++ b/parquet-column/src/main/java/org/apache/parquet/column/ParquetProperties.java
@@ -50,6 +50,8 @@
public static final int DEFAULT_COLUMN_INDEX_TRUNCATE_LENGTH = 64;
public static final int DEFAULT_PAGE_ROW_COUNT_LIMIT = 20_000;
+ public static final boolean DEFAULT_PAGE_WRITE_CHECKSUM_ENABLED = true;
+
public static final ValuesWriterFactory DEFAULT_VALUES_WRITER_FACTORY = new DefaultValuesWriterFactory();
private static final int MIN_SLAB_SIZE = 64;
@@ -87,10 +89,12 @@
private final ValuesWriterFactory valuesWriterFactory;
private final int columnIndexTruncateLength;
private final int pageRowCountLimit;
+ private final boolean pageWriteChecksumEnabled;
private ParquetProperties(WriterVersion writerVersion, int pageSize, int dictPageSize, boolean enableDict, int minRowCountForPageSizeCheck,
int maxRowCountForPageSizeCheck, boolean estimateNextSizeCheck, ByteBufferAllocator allocator,
- ValuesWriterFactory writerFactory, int columnIndexMinMaxTruncateLength, int pageRowCountLimit) {
+ ValuesWriterFactory writerFactory, int columnIndexMinMaxTruncateLength, int pageRowCountLimit,
+ boolean pageWriteChecksumEnabled) {
this.pageSizeThreshold = pageSize;
this.initialSlabSize = CapacityByteArrayOutputStream
.initialSlabSizeHeuristic(MIN_SLAB_SIZE, pageSizeThreshold, 10);
@@ -105,6 +109,7 @@
this.valuesWriterFactory = writerFactory;
this.columnIndexTruncateLength = columnIndexMinMaxTruncateLength;
this.pageRowCountLimit = pageRowCountLimit;
+ this.pageWriteChecksumEnabled = pageWriteChecksumEnabled;
}
public ValuesWriter newRepetitionLevelWriter(ColumnDescriptor path) {
@@ -201,6 +206,10 @@
return pageRowCountLimit;
}
+ public boolean getPageWriteChecksumEnabled() {
+ return pageWriteChecksumEnabled;
+ }
+
public static Builder builder() {
return new Builder();
}
@@ -221,6 +230,7 @@
private ValuesWriterFactory valuesWriterFactory = DEFAULT_VALUES_WRITER_FACTORY;
private int columnIndexTruncateLength = DEFAULT_COLUMN_INDEX_TRUNCATE_LENGTH;
private int pageRowCountLimit = DEFAULT_PAGE_ROW_COUNT_LIMIT;
+ private boolean pageWriteChecksumEnabled = DEFAULT_PAGE_WRITE_CHECKSUM_ENABLED;
private Builder() {
}
@@ -236,6 +246,7 @@
this.valuesWriterFactory = toCopy.valuesWriterFactory;
this.allocator = toCopy.allocator;
this.pageRowCountLimit = toCopy.pageRowCountLimit;
+ this.pageWriteChecksumEnabled = toCopy.pageWriteChecksumEnabled;
}
/**
@@ -330,11 +341,17 @@
return this;
}
+ public Builder withPageWriteChecksumEnabled(boolean val) {
+ this.pageWriteChecksumEnabled = val;
+ return this;
+ }
+
public ParquetProperties build() {
ParquetProperties properties =
new ParquetProperties(writerVersion, pageSize, dictPageSize,
enableDict, minRowCountForPageSizeCheck, maxRowCountForPageSizeCheck,
- estimateNextSizeCheck, allocator, valuesWriterFactory, columnIndexTruncateLength, pageRowCountLimit);
+ estimateNextSizeCheck, allocator, valuesWriterFactory, columnIndexTruncateLength,
+ pageRowCountLimit, pageWriteChecksumEnabled);
// we pass a constructed but uninitialized factory to ParquetProperties above as currently
// creation of ValuesWriters is invoked from within ParquetProperties. In the future
// we'd like to decouple that and won't need to pass an object to properties and then pass the
diff --git a/parquet-column/src/main/java/org/apache/parquet/column/page/Page.java b/parquet-column/src/main/java/org/apache/parquet/column/page/Page.java
index 606f9f7..0489449 100644
--- a/parquet-column/src/main/java/org/apache/parquet/column/page/Page.java
+++ b/parquet-column/src/main/java/org/apache/parquet/column/page/Page.java
@@ -18,6 +18,8 @@
*/
package org.apache.parquet.column.page;
+import java.util.OptionalInt;
+
/**
* one page in a chunk
*/
@@ -43,4 +45,18 @@
return uncompressedSize;
}
+ // Note: the following field is only used for testing purposes and are NOT used in checksum
+ // verification. There crc value here will merely be a copy of the actual crc field read in
+ // ParquetFileReader.Chunk.readAllPages()
+ private OptionalInt crc = OptionalInt.empty();
+
+ // Visible for testing
+ public void setCrc(int crc) {
+ this.crc = OptionalInt.of(crc);
+ }
+
+ // Visible for testing
+ public OptionalInt getCrc() {
+ return crc;
+ }
}
diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/HadoopReadOptions.java b/parquet-hadoop/src/main/java/org/apache/parquet/HadoopReadOptions.java
index 4f5c78a..13ab80b 100644
--- a/parquet-hadoop/src/main/java/org/apache/parquet/HadoopReadOptions.java
+++ b/parquet-hadoop/src/main/java/org/apache/parquet/HadoopReadOptions.java
@@ -30,9 +30,10 @@
import static org.apache.parquet.hadoop.ParquetInputFormat.COLUMN_INDEX_FILTERING_ENABLED;
import static org.apache.parquet.hadoop.ParquetInputFormat.DICTIONARY_FILTERING_ENABLED;
+import static org.apache.parquet.hadoop.ParquetInputFormat.getFilter;
+import static org.apache.parquet.hadoop.ParquetInputFormat.PAGE_VERIFY_CHECKSUM_ENABLED;
import static org.apache.parquet.hadoop.ParquetInputFormat.RECORD_FILTERING_ENABLED;
import static org.apache.parquet.hadoop.ParquetInputFormat.STATS_FILTERING_ENABLED;
-import static org.apache.parquet.hadoop.ParquetInputFormat.getFilter;
import static org.apache.parquet.hadoop.UnmaterializableRecordCounter.BAD_RECORD_THRESHOLD_CONF_KEY;
public class HadoopReadOptions extends ParquetReadOptions {
@@ -45,6 +46,7 @@
boolean useDictionaryFilter,
boolean useRecordFilter,
boolean useColumnIndexFilter,
+ boolean usePageChecksumVerification,
FilterCompat.Filter recordFilter,
MetadataFilter metadataFilter,
CompressionCodecFactory codecFactory,
@@ -54,7 +56,8 @@
Configuration conf) {
super(
useSignedStringMinMax, useStatsFilter, useDictionaryFilter, useRecordFilter, useColumnIndexFilter,
- recordFilter, metadataFilter, codecFactory, allocator, maxAllocationSize, properties
+ usePageChecksumVerification, recordFilter, metadataFilter, codecFactory, allocator, maxAllocationSize,
+ properties
);
this.conf = conf;
}
@@ -86,6 +89,8 @@
useStatsFilter(conf.getBoolean(STATS_FILTERING_ENABLED, true));
useRecordFilter(conf.getBoolean(RECORD_FILTERING_ENABLED, true));
useColumnIndexFilter(conf.getBoolean(COLUMN_INDEX_FILTERING_ENABLED, true));
+ usePageChecksumVerification(conf.getBoolean(PAGE_VERIFY_CHECKSUM_ENABLED,
+ usePageChecksumVerification));
withCodecFactory(HadoopCodecs.newFactory(conf, 0));
withRecordFilter(getFilter(conf));
withMaxAllocationInBytes(conf.getInt(ALLOCATION_SIZE, 8388608));
@@ -98,9 +103,9 @@
@Override
public ParquetReadOptions build() {
return new HadoopReadOptions(
- useSignedStringMinMax, useStatsFilter, useDictionaryFilter, useRecordFilter, useColumnIndexFilter,
- recordFilter, metadataFilter, codecFactory, allocator, maxAllocationSize, properties,
- conf);
+ useSignedStringMinMax, useStatsFilter, useDictionaryFilter, useRecordFilter,
+ useColumnIndexFilter, usePageChecksumVerification, recordFilter, metadataFilter,
+ codecFactory, allocator, maxAllocationSize, properties, conf);
}
}
}
diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/ParquetReadOptions.java b/parquet-hadoop/src/main/java/org/apache/parquet/ParquetReadOptions.java
index 846d3bd..f059023 100644
--- a/parquet-hadoop/src/main/java/org/apache/parquet/ParquetReadOptions.java
+++ b/parquet-hadoop/src/main/java/org/apache/parquet/ParquetReadOptions.java
@@ -40,12 +40,14 @@
private static final boolean DICTIONARY_FILTERING_ENABLED_DEFAULT = true;
private static final boolean COLUMN_INDEX_FILTERING_ENABLED_DEFAULT = true;
private static final int ALLOCATION_SIZE_DEFAULT = 8388608; // 8MB
+ private static final boolean PAGE_VERIFY_CHECKSUM_ENABLED_DEFAULT = false;
private final boolean useSignedStringMinMax;
private final boolean useStatsFilter;
private final boolean useDictionaryFilter;
private final boolean useRecordFilter;
private final boolean useColumnIndexFilter;
+ private final boolean usePageChecksumVerification;
private final FilterCompat.Filter recordFilter;
private final ParquetMetadataConverter.MetadataFilter metadataFilter;
private final CompressionCodecFactory codecFactory;
@@ -58,6 +60,7 @@
boolean useDictionaryFilter,
boolean useRecordFilter,
boolean useColumnIndexFilter,
+ boolean usePageChecksumVerification,
FilterCompat.Filter recordFilter,
ParquetMetadataConverter.MetadataFilter metadataFilter,
CompressionCodecFactory codecFactory,
@@ -69,6 +72,7 @@
this.useDictionaryFilter = useDictionaryFilter;
this.useRecordFilter = useRecordFilter;
this.useColumnIndexFilter = useColumnIndexFilter;
+ this.usePageChecksumVerification = usePageChecksumVerification;
this.recordFilter = recordFilter;
this.metadataFilter = metadataFilter;
this.codecFactory = codecFactory;
@@ -97,6 +101,10 @@
return useColumnIndexFilter;
}
+ public boolean usePageChecksumVerification() {
+ return usePageChecksumVerification;
+ }
+
public FilterCompat.Filter getRecordFilter() {
return recordFilter;
}
@@ -143,6 +151,7 @@
protected boolean useDictionaryFilter = DICTIONARY_FILTERING_ENABLED_DEFAULT;
protected boolean useRecordFilter = RECORD_FILTERING_ENABLED_DEFAULT;
protected boolean useColumnIndexFilter = COLUMN_INDEX_FILTERING_ENABLED_DEFAULT;
+ protected boolean usePageChecksumVerification = PAGE_VERIFY_CHECKSUM_ENABLED_DEFAULT;
protected FilterCompat.Filter recordFilter = null;
protected ParquetMetadataConverter.MetadataFilter metadataFilter = NO_FILTER;
// the page size parameter isn't used when only using the codec factory to get decompressors
@@ -200,6 +209,16 @@
return useColumnIndexFilter(true);
}
+
+ public Builder usePageChecksumVerification(boolean usePageChecksumVerification) {
+ this.usePageChecksumVerification = usePageChecksumVerification;
+ return this;
+ }
+
+ public Builder usePageChecksumVerification() {
+ return usePageChecksumVerification(true);
+ }
+
public Builder withRecordFilter(FilterCompat.Filter rowGroupFilter) {
this.recordFilter = rowGroupFilter;
return this;
@@ -235,6 +254,11 @@
return this;
}
+ public Builder withPageChecksumVerification(boolean val) {
+ this.usePageChecksumVerification = val;
+ return this;
+ }
+
public Builder set(String key, String value) {
properties.put(key, value);
return this;
@@ -249,6 +273,7 @@
withMetadataFilter(options.metadataFilter);
withCodecFactory(options.codecFactory);
withAllocator(options.allocator);
+ withPageChecksumVerification(options.usePageChecksumVerification);
for (Map.Entry<String, String> keyValue : options.properties.entrySet()) {
set(keyValue.getKey(), keyValue.getValue());
}
@@ -257,8 +282,9 @@
public ParquetReadOptions build() {
return new ParquetReadOptions(
- useSignedStringMinMax, useStatsFilter, useDictionaryFilter, useRecordFilter, useColumnIndexFilter,
- recordFilter, metadataFilter, codecFactory, allocator, maxAllocationSize, properties);
+ useSignedStringMinMax, useStatsFilter, useDictionaryFilter, useRecordFilter,
+ useColumnIndexFilter, usePageChecksumVerification, recordFilter, metadataFilter,
+ codecFactory, allocator, maxAllocationSize, properties);
}
}
}
diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/format/converter/ParquetMetadataConverter.java b/parquet-hadoop/src/main/java/org/apache/parquet/format/converter/ParquetMetadataConverter.java
index fb0ca7b..deeda65 100644
--- a/parquet-hadoop/src/main/java/org/apache/parquet/format/converter/ParquetMetadataConverter.java
+++ b/parquet-hadoop/src/main/java/org/apache/parquet/format/converter/ParquetMetadataConverter.java
@@ -1365,13 +1365,29 @@
}
private PageHeader newDataPageHeader(
+ int uncompressedSize, int compressedSize,
+ int valueCount,
+ org.apache.parquet.column.Encoding rlEncoding,
+ org.apache.parquet.column.Encoding dlEncoding,
+ org.apache.parquet.column.Encoding valuesEncoding) {
+ PageHeader pageHeader = new PageHeader(PageType.DATA_PAGE, uncompressedSize, compressedSize);
+ pageHeader.setData_page_header(new DataPageHeader(
+ valueCount,
+ getEncoding(valuesEncoding),
+ getEncoding(dlEncoding),
+ getEncoding(rlEncoding)));
+ return pageHeader;
+ }
+
+ private PageHeader newDataPageHeader(
int uncompressedSize, int compressedSize,
int valueCount,
org.apache.parquet.column.Encoding rlEncoding,
org.apache.parquet.column.Encoding dlEncoding,
- org.apache.parquet.column.Encoding valuesEncoding) {
+ org.apache.parquet.column.Encoding valuesEncoding,
+ int crc) {
PageHeader pageHeader = new PageHeader(PageType.DATA_PAGE, uncompressedSize, compressedSize);
- // TODO: pageHeader.crc = ...;
+ pageHeader.setCrc(crc);
pageHeader.setData_page_header(new DataPageHeader(
valueCount,
getEncoding(valuesEncoding),
@@ -1398,19 +1414,37 @@
}
public void writeDataPageV1Header(
+ int uncompressedSize,
+ int compressedSize,
+ int valueCount,
+ org.apache.parquet.column.Encoding rlEncoding,
+ org.apache.parquet.column.Encoding dlEncoding,
+ org.apache.parquet.column.Encoding valuesEncoding,
+ OutputStream to) throws IOException {
+ writePageHeader(newDataPageHeader(uncompressedSize,
+ compressedSize,
+ valueCount,
+ rlEncoding,
+ dlEncoding,
+ valuesEncoding), to);
+ }
+
+ public void writeDataPageV1Header(
int uncompressedSize,
int compressedSize,
int valueCount,
org.apache.parquet.column.Encoding rlEncoding,
org.apache.parquet.column.Encoding dlEncoding,
org.apache.parquet.column.Encoding valuesEncoding,
+ int crc,
OutputStream to) throws IOException {
writePageHeader(newDataPageHeader(uncompressedSize,
compressedSize,
valueCount,
rlEncoding,
dlEncoding,
- valuesEncoding), to);
+ valuesEncoding,
+ crc), to);
}
public void writeDataPageV2Header(
@@ -1443,13 +1477,22 @@
}
public void writeDictionaryPageHeader(
- int uncompressedSize, int compressedSize, int valueCount,
- org.apache.parquet.column.Encoding valuesEncoding, OutputStream to) throws IOException {
+ int uncompressedSize, int compressedSize, int valueCount,
+ org.apache.parquet.column.Encoding valuesEncoding, OutputStream to) throws IOException {
PageHeader pageHeader = new PageHeader(PageType.DICTIONARY_PAGE, uncompressedSize, compressedSize);
pageHeader.setDictionary_page_header(new DictionaryPageHeader(valueCount, getEncoding(valuesEncoding)));
writePageHeader(pageHeader, to);
}
+ public void writeDictionaryPageHeader(
+ int uncompressedSize, int compressedSize, int valueCount,
+ org.apache.parquet.column.Encoding valuesEncoding, int crc, OutputStream to) throws IOException {
+ PageHeader pageHeader = new PageHeader(PageType.DICTIONARY_PAGE, uncompressedSize, compressedSize);
+ pageHeader.setCrc(crc);
+ pageHeader.setDictionary_page_header(new DictionaryPageHeader(valueCount, getEncoding(valuesEncoding)));
+ writePageHeader(pageHeader, to);
+ }
+
private static BoundaryOrder toParquetBoundaryOrder(
org.apache.parquet.internal.column.columnindex.BoundaryOrder boundaryOrder) {
switch (boundaryOrder) {
diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ColumnChunkPageReadStore.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ColumnChunkPageReadStore.java
index 0ca9fe3..2e646e7 100644
--- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ColumnChunkPageReadStore.java
+++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ColumnChunkPageReadStore.java
@@ -99,8 +99,9 @@
public DataPage visit(DataPageV1 dataPageV1) {
try {
BytesInput decompressed = decompressor.decompress(dataPageV1.getBytes(), dataPageV1.getUncompressedSize());
+ final DataPageV1 decompressedPage;
if (offsetIndex == null) {
- return new DataPageV1(
+ decompressedPage = new DataPageV1(
decompressed,
dataPageV1.getValueCount(),
dataPageV1.getUncompressedSize(),
@@ -110,7 +111,7 @@
dataPageV1.getValueEncoding());
} else {
long firstRowIndex = offsetIndex.getFirstRowIndex(currentPageIndex);
- return new DataPageV1(
+ decompressedPage = new DataPageV1(
decompressed,
dataPageV1.getValueCount(),
dataPageV1.getUncompressedSize(),
@@ -121,6 +122,10 @@
dataPageV1.getDlEncoding(),
dataPageV1.getValueEncoding());
}
+ if (dataPageV1.getCrc().isPresent()) {
+ decompressedPage.setCrc(dataPageV1.getCrc().getAsInt());
+ }
+ return decompressedPage;
} catch (IOException e) {
throw new ParquetDecodingException("could not decompress page", e);
}
@@ -185,10 +190,14 @@
return null;
}
try {
- return new DictionaryPage(
- decompressor.decompress(compressedDictionaryPage.getBytes(), compressedDictionaryPage.getUncompressedSize()),
- compressedDictionaryPage.getDictionarySize(),
- compressedDictionaryPage.getEncoding());
+ DictionaryPage decompressedPage = new DictionaryPage(
+ decompressor.decompress(compressedDictionaryPage.getBytes(), compressedDictionaryPage.getUncompressedSize()),
+ compressedDictionaryPage.getDictionarySize(),
+ compressedDictionaryPage.getEncoding());
+ if (compressedDictionaryPage.getCrc().isPresent()) {
+ decompressedPage.setCrc(compressedDictionaryPage.getCrc().getAsInt());
+ }
+ return decompressedPage;
} catch (IOException e) {
throw new ParquetDecodingException("Could not decompress dictionary page", e);
}
diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ColumnChunkPageWriteStore.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ColumnChunkPageWriteStore.java
index f85d374..72f26fc 100644
--- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ColumnChunkPageWriteStore.java
+++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ColumnChunkPageWriteStore.java
@@ -26,11 +26,13 @@
import java.util.List;
import java.util.Map;
import java.util.Set;
+import java.util.zip.CRC32;
import org.apache.parquet.bytes.BytesInput;
import org.apache.parquet.bytes.ConcatenatingByteArrayCollector;
import org.apache.parquet.column.ColumnDescriptor;
import org.apache.parquet.column.Encoding;
+import org.apache.parquet.column.ParquetProperties;
import org.apache.parquet.column.page.DictionaryPage;
import org.apache.parquet.column.page.PageWriteStore;
import org.apache.parquet.column.page.PageWriter;
@@ -74,16 +76,22 @@
private Statistics totalStatistics;
private final ByteBufferAllocator allocator;
+ private final CRC32 crc;
+ boolean pageWriteChecksumEnabled;
+
private ColumnChunkPageWriter(ColumnDescriptor path,
BytesCompressor compressor,
ByteBufferAllocator allocator,
- int columnIndexTruncateLength) {
+ int columnIndexTruncateLength,
+ boolean pageWriteChecksumEnabled) {
this.path = path;
this.compressor = compressor;
this.allocator = allocator;
this.buf = new ConcatenatingByteArrayCollector();
this.columnIndexBuilder = ColumnIndexBuilder.getBuilder(path.getPrimitiveType(), columnIndexTruncateLength);
this.offsetIndexBuilder = OffsetIndexBuilder.getBuilder();
+ this.pageWriteChecksumEnabled = pageWriteChecksumEnabled;
+ this.crc = pageWriteChecksumEnabled ? new CRC32() : null;
}
@Override
@@ -119,7 +127,20 @@
+ compressedSize);
}
tempOutputStream.reset();
- parquetMetadataConverter.writeDataPageV1Header(
+ if (pageWriteChecksumEnabled) {
+ crc.reset();
+ crc.update(compressedBytes.toByteArray());
+ parquetMetadataConverter.writeDataPageV1Header(
+ (int)uncompressedSize,
+ (int)compressedSize,
+ valueCount,
+ rlEncoding,
+ dlEncoding,
+ valuesEncoding,
+ (int) crc.getValue(),
+ tempOutputStream);
+ } else {
+ parquetMetadataConverter.writeDataPageV1Header(
(int)uncompressedSize,
(int)compressedSize,
valueCount,
@@ -127,6 +148,7 @@
dlEncoding,
valuesEncoding,
tempOutputStream);
+ }
this.uncompressedLength += uncompressedSize;
this.compressedLength += compressedSize;
this.totalValueCount += valueCount;
@@ -273,10 +295,16 @@
private final MessageType schema;
public ColumnChunkPageWriteStore(BytesCompressor compressor, MessageType schema, ByteBufferAllocator allocator,
- int columnIndexTruncateLength) {
+ int columnIndexTruncateLength) {
+ this(compressor, schema, allocator, columnIndexTruncateLength,
+ ParquetProperties.DEFAULT_PAGE_WRITE_CHECKSUM_ENABLED);
+ }
+
+ public ColumnChunkPageWriteStore(BytesCompressor compressor, MessageType schema, ByteBufferAllocator allocator,
+ int columnIndexTruncateLength, boolean pageWriteChecksumEnabled) {
this.schema = schema;
for (ColumnDescriptor path : schema.getColumns()) {
- writers.put(path, new ColumnChunkPageWriter(path, compressor, allocator, columnIndexTruncateLength));
+ writers.put(path, new ColumnChunkPageWriter(path, compressor, allocator, columnIndexTruncateLength, pageWriteChecksumEnabled));
}
}
diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/InternalParquetRecordWriter.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/InternalParquetRecordWriter.java
index d8af379..c3da323 100644
--- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/InternalParquetRecordWriter.java
+++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/InternalParquetRecordWriter.java
@@ -102,7 +102,7 @@
private void initStore() {
pageStore = new ColumnChunkPageWriteStore(compressor, schema, props.getAllocator(),
- props.getColumnIndexTruncateLength());
+ props.getColumnIndexTruncateLength(), props.getPageWriteChecksumEnabled());
columnStore = props.newColumnWriteStore(schema, pageStore);
MessageColumnIO columnIO = new ColumnIOFactory(validating).getColumnIO(schema);
this.recordConsumer = columnIO.getRecordWriter(columnStore);
diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java
index 8e205f6..4acd4c4 100644
--- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java
+++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java
@@ -50,6 +50,7 @@
import java.util.concurrent.Future;
import java.util.stream.Collectors;
import java.util.stream.Stream;
+import java.util.zip.CRC32;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
@@ -113,6 +114,8 @@
private final ParquetMetadataConverter converter;
+ private final CRC32 crc;
+
/**
* for files provided, check if there's a summary file.
* If a summary file is found it is used otherwise the file footer is used.
@@ -659,6 +662,7 @@
for (ColumnDescriptor col : columns) {
paths.put(ColumnPath.get(col.getPath()), col);
}
+ this.crc = options.usePageChecksumVerification() ? new CRC32() : null;
}
/**
@@ -695,6 +699,7 @@
for (ColumnDescriptor col : footer.getFileMetaData().getSchema().getColumns()) {
paths.put(ColumnPath.get(col.getPath()), col);
}
+ this.crc = options.usePageChecksumVerification() ? new CRC32() : null;
}
public ParquetFileReader(InputFile file, ParquetReadOptions options) throws IOException {
@@ -717,6 +722,7 @@
for (ColumnDescriptor col : footer.getFileMetaData().getSchema().getColumns()) {
paths.put(ColumnPath.get(col.getPath()), col);
}
+ this.crc = options.usePageChecksumVerification() ? new CRC32() : null;
}
private static <T> List<T> listWithNulls(int size) {
@@ -1164,6 +1170,18 @@
}
/**
+ * Calculate checksum of input bytes, throw decoding exception if it does not match the provided
+ * reference crc
+ */
+ private void verifyCrc(int referenceCrc, byte[] bytes, String exceptionMsg) {
+ crc.reset();
+ crc.update(bytes);
+ if (crc.getValue() != ((long) referenceCrc & 0xffffffffL)) {
+ throw new ParquetDecodingException(exceptionMsg);
+ }
+ }
+
+ /**
* Read all of the pages in a given column chunk.
* @return the list of pages
*/
@@ -1178,36 +1196,54 @@
PageHeader pageHeader = readPageHeader();
int uncompressedPageSize = pageHeader.getUncompressed_page_size();
int compressedPageSize = pageHeader.getCompressed_page_size();
+ final BytesInput pageBytes;
switch (pageHeader.type) {
case DICTIONARY_PAGE:
// there is only one dictionary page per column chunk
if (dictionaryPage != null) {
throw new ParquetDecodingException("more than one dictionary page in column " + descriptor.col);
}
+ pageBytes = this.readAsBytesInput(compressedPageSize);
+ if (options.usePageChecksumVerification() && pageHeader.isSetCrc()) {
+ verifyCrc(pageHeader.getCrc(), pageBytes.toByteArray(),
+ "could not verify dictionary page integrity, CRC checksum verification failed");
+ }
DictionaryPageHeader dicHeader = pageHeader.getDictionary_page_header();
dictionaryPage =
new DictionaryPage(
- this.readAsBytesInput(compressedPageSize),
+ pageBytes,
uncompressedPageSize,
dicHeader.getNum_values(),
converter.getEncoding(dicHeader.getEncoding())
);
+ // Copy crc to new page, used for testing
+ if (pageHeader.isSetCrc()) {
+ dictionaryPage.setCrc(pageHeader.getCrc());
+ }
break;
case DATA_PAGE:
DataPageHeader dataHeaderV1 = pageHeader.getData_page_header();
- pagesInChunk.add(
- new DataPageV1(
- this.readAsBytesInput(compressedPageSize),
- dataHeaderV1.getNum_values(),
- uncompressedPageSize,
- converter.fromParquetStatistics(
- getFileMetaData().getCreatedBy(),
- dataHeaderV1.getStatistics(),
- type),
- converter.getEncoding(dataHeaderV1.getRepetition_level_encoding()),
- converter.getEncoding(dataHeaderV1.getDefinition_level_encoding()),
- converter.getEncoding(dataHeaderV1.getEncoding())
- ));
+ pageBytes = this.readAsBytesInput(compressedPageSize);
+ if (options.usePageChecksumVerification() && pageHeader.isSetCrc()) {
+ verifyCrc(pageHeader.getCrc(), pageBytes.toByteArray(),
+ "could not verify page integrity, CRC checksum verification failed");
+ }
+ DataPageV1 dataPageV1 = new DataPageV1(
+ pageBytes,
+ dataHeaderV1.getNum_values(),
+ uncompressedPageSize,
+ converter.fromParquetStatistics(
+ getFileMetaData().getCreatedBy(),
+ dataHeaderV1.getStatistics(),
+ type),
+ converter.getEncoding(dataHeaderV1.getRepetition_level_encoding()),
+ converter.getEncoding(dataHeaderV1.getDefinition_level_encoding()),
+ converter.getEncoding(dataHeaderV1.getEncoding()));
+ // Copy crc to new page, used for testing
+ if (pageHeader.isSetCrc()) {
+ dataPageV1.setCrc(pageHeader.getCrc());
+ }
+ pagesInChunk.add(dataPageV1);
valuesCountReadSoFar += dataHeaderV1.getNum_values();
++dataPageCountReadSoFar;
break;
diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileWriter.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileWriter.java
index c875702..50cd31e 100644
--- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileWriter.java
+++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileWriter.java
@@ -33,6 +33,7 @@
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
+import java.util.zip.CRC32;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
@@ -141,6 +142,9 @@
// set when end is called
private ParquetMetadata footer = null;
+ private final CRC32 crc;
+ private boolean pageWriteChecksumEnabled;
+
/**
* Captures the order in which methods should be called
*/
@@ -200,7 +204,7 @@
*/
@Deprecated
public ParquetFileWriter(Configuration configuration, MessageType schema,
- Path file) throws IOException {
+ Path file) throws IOException {
this(HadoopOutputFile.fromPath(file, configuration),
schema, Mode.CREATE, DEFAULT_BLOCK_SIZE, MAX_PADDING_SIZE_DEFAULT);
}
@@ -253,7 +257,8 @@
long rowGroupSize, int maxPaddingSize)
throws IOException {
this(file, schema, mode, rowGroupSize, maxPaddingSize,
- ParquetProperties.DEFAULT_COLUMN_INDEX_TRUNCATE_LENGTH);
+ ParquetProperties.DEFAULT_COLUMN_INDEX_TRUNCATE_LENGTH,
+ ParquetProperties.DEFAULT_PAGE_WRITE_CHECKSUM_ENABLED);
}
/**
* @param file OutputFile to create or overwrite
@@ -262,10 +267,12 @@
* @param rowGroupSize the row group size
* @param maxPaddingSize the maximum padding
* @param columnIndexTruncateLength the length which the min/max values in column indexes tried to be truncated to
+ * @param pageWriteChecksumEnabled whether to write out page level checksums
* @throws IOException if the file can not be created
*/
public ParquetFileWriter(OutputFile file, MessageType schema, Mode mode,
- long rowGroupSize, int maxPaddingSize, int columnIndexTruncateLength)
+ long rowGroupSize, int maxPaddingSize, int columnIndexTruncateLength,
+ boolean pageWriteChecksumEnabled)
throws IOException {
TypeUtil.checkValidWriteSchema(schema);
@@ -287,6 +294,8 @@
this.encodingStatsBuilder = new EncodingStats.Builder();
this.columnIndexTruncateLength = columnIndexTruncateLength;
+ this.pageWriteChecksumEnabled = pageWriteChecksumEnabled;
+ this.crc = pageWriteChecksumEnabled ? new CRC32() : null;
}
/**
@@ -311,6 +320,8 @@
this.encodingStatsBuilder = new EncodingStats.Builder();
// no truncation is needed for testing
this.columnIndexTruncateLength = Integer.MAX_VALUE;
+ this.pageWriteChecksumEnabled = ParquetOutputFormat.getPageWriteChecksumEnabled(configuration);
+ this.crc = pageWriteChecksumEnabled ? new CRC32() : null;
}
/**
* start the file
@@ -380,12 +391,24 @@
currentChunkDictionaryPageOffset = out.getPos();
int uncompressedSize = dictionaryPage.getUncompressedSize();
int compressedPageSize = (int)dictionaryPage.getBytes().size(); // TODO: fix casts
- metadataConverter.writeDictionaryPageHeader(
+ if (pageWriteChecksumEnabled) {
+ crc.reset();
+ crc.update(dictionaryPage.getBytes().toByteArray());
+ metadataConverter.writeDictionaryPageHeader(
+ uncompressedSize,
+ compressedPageSize,
+ dictionaryPage.getDictionarySize(),
+ dictionaryPage.getEncoding(),
+ (int) crc.getValue(),
+ out);
+ } else {
+ metadataConverter.writeDictionaryPageHeader(
uncompressedSize,
compressedPageSize,
dictionaryPage.getDictionarySize(),
dictionaryPage.getEncoding(),
out);
+ }
long headerSize = out.getPos() - currentChunkDictionaryPageOffset;
this.uncompressedLength += uncompressedSize + headerSize;
this.compressedLength += compressedPageSize + headerSize;
@@ -505,13 +528,26 @@
}
LOG.debug("{}: write data page: {} values", beforeHeader, valueCount);
int compressedPageSize = (int) bytes.size();
- metadataConverter.writeDataPageV1Header(
+ if (pageWriteChecksumEnabled) {
+ crc.reset();
+ crc.update(bytes.toByteArray());
+ metadataConverter.writeDataPageV1Header(
+ uncompressedPageSize, compressedPageSize,
+ valueCount,
+ rlEncoding,
+ dlEncoding,
+ valuesEncoding,
+ (int) crc.getValue(),
+ out);
+ } else {
+ metadataConverter.writeDataPageV1Header(
uncompressedPageSize, compressedPageSize,
valueCount,
rlEncoding,
dlEncoding,
valuesEncoding,
out);
+ }
long headerSize = out.getPos() - beforeHeader;
this.uncompressedLength += uncompressedPageSize + headerSize;
this.compressedLength += compressedPageSize + headerSize;
diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetInputFormat.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetInputFormat.java
index b8fce2f..7eab611 100644
--- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetInputFormat.java
+++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetInputFormat.java
@@ -135,6 +135,11 @@
public static final String COLUMN_INDEX_FILTERING_ENABLED = "parquet.filter.columnindex.enabled";
/**
+ * key to configure whether page level checksum verification is enabled
+ */
+ public static final String PAGE_VERIFY_CHECKSUM_ENABLED = "parquet.page.verify-checksum.enabled";
+
+ /**
* key to turn on or off task side metadata loading (default true)
* if true then metadata is read on the task side and some tasks may finish immediately.
* if false metadata is read on the client which is slower if there is a lot of metadata but tasks will only be spawn if there is work to do.
diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetOutputFormat.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetOutputFormat.java
index cd25b23..afcbbff 100644
--- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetOutputFormat.java
+++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetOutputFormat.java
@@ -145,6 +145,7 @@
public static final String ESTIMATE_PAGE_SIZE_CHECK = "parquet.page.size.check.estimate";
public static final String COLUMN_INDEX_TRUNCATE_LENGTH = "parquet.columnindex.truncate.length";
public static final String PAGE_ROW_COUNT_LIMIT = "parquet.page.row.count.limit";
+ public static final String PAGE_WRITE_CHECKSUM_ENABLED = "parquet.page.write-checksum.enabled";
public static JobSummaryLevel getJobSummaryLevel(Configuration conf) {
String level = conf.get(JOB_SUMMARY_LEVEL);
@@ -338,6 +339,18 @@
return conf.getInt(PAGE_ROW_COUNT_LIMIT, ParquetProperties.DEFAULT_PAGE_ROW_COUNT_LIMIT);
}
+ public static void setPageWriteChecksumEnabled(JobContext jobContext, boolean val) {
+ setPageWriteChecksumEnabled(getConfiguration(jobContext), val);
+ }
+
+ public static void setPageWriteChecksumEnabled(Configuration conf, boolean val) {
+ conf.setBoolean(PAGE_WRITE_CHECKSUM_ENABLED, val);
+ }
+
+ public static boolean getPageWriteChecksumEnabled(Configuration conf) {
+ return conf.getBoolean(PAGE_WRITE_CHECKSUM_ENABLED, ParquetProperties.DEFAULT_PAGE_WRITE_CHECKSUM_ENABLED);
+ }
+
private WriteSupport<T> writeSupport;
private ParquetOutputCommitter committer;
@@ -409,6 +422,7 @@
.withMaxRowCountForPageSizeCheck(getMaxRowCountForPageSizeCheck(conf))
.withColumnIndexTruncateLength(getColumnIndexTruncateLength(conf))
.withPageRowCountLimit(getPageRowCountLimit(conf))
+ .withPageWriteChecksumEnabled(getPageWriteChecksumEnabled(conf))
.build();
long blockSize = getLongBlockSize(conf);
@@ -428,11 +442,13 @@
LOG.info("Max row count for page size check is: {}", props.getMaxRowCountForPageSizeCheck());
LOG.info("Truncate length for column indexes is: {}", props.getColumnIndexTruncateLength());
LOG.info("Page row count limit to {}", props.getPageRowCountLimit());
+ LOG.info("Writing page checksums is: {}", props.getPageWriteChecksumEnabled() ? "on" : "off");
}
WriteContext init = writeSupport.init(conf);
ParquetFileWriter w = new ParquetFileWriter(HadoopOutputFile.fromPath(file, conf),
- init.getSchema(), mode, blockSize, maxPaddingSize, props.getColumnIndexTruncateLength());
+ init.getSchema(), mode, blockSize, maxPaddingSize, props.getColumnIndexTruncateLength(),
+ props.getPageWriteChecksumEnabled());
w.start();
float maxLoad = conf.getFloat(ParquetOutputFormat.MEMORY_POOL_RATIO,
diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetReader.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetReader.java
index de20808..28e1967 100644
--- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetReader.java
+++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetReader.java
@@ -280,6 +280,16 @@
return this;
}
+ public Builder<T> usePageChecksumVerification(boolean usePageChecksumVerification) {
+ optionsBuilder.usePageChecksumVerification(usePageChecksumVerification);
+ return this;
+ }
+
+ public Builder<T> usePageChecksumVerification() {
+ optionsBuilder.usePageChecksumVerification();
+ return this;
+ }
+
public Builder<T> withFileRange(long start, long end) {
optionsBuilder.withRange(start, end);
return this;
diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetWriter.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetWriter.java
index 1ed5e32..7fb7186 100644
--- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetWriter.java
+++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetWriter.java
@@ -278,7 +278,8 @@
MessageType schema = writeContext.getSchema();
ParquetFileWriter fileWriter = new ParquetFileWriter(
- file, schema, mode, rowGroupSize, maxPaddingSize, encodingProps.getColumnIndexTruncateLength());
+ file, schema, mode, rowGroupSize, maxPaddingSize,
+ encodingProps.getColumnIndexTruncateLength(), encodingProps.getPageWriteChecksumEnabled());
fileWriter.start();
this.codecFactory = new CodecFactory(conf, encodingProps.getPageSizeThreshold());
@@ -516,6 +517,27 @@
}
/**
+ * Enables writing page level checksums for the constructed writer.
+ *
+ * @return this builder for method chaining.
+ */
+ public SELF enablePageWriteChecksum() {
+ encodingPropsBuilder.withPageWriteChecksumEnabled(true);
+ return self();
+ }
+
+ /**
+ * Enables writing page level checksums for the constructed writer.
+ *
+ * @param enablePageWriteChecksum whether page checksums should be written out
+ * @return this builder for method chaining.
+ */
+ public SELF withPageWriteChecksumEnabled(boolean enablePageWriteChecksum) {
+ encodingPropsBuilder.withPageWriteChecksumEnabled(enablePageWriteChecksum);
+ return self();
+ }
+
+ /**
* Set a property that will be available to the read path. For writers that use a Hadoop
* configuration, this is the recommended way to add configuration values.
*
diff --git a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestColumnChunkPageWriteStore.java b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestColumnChunkPageWriteStore.java
index c353ee3..88c8d83 100644
--- a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestColumnChunkPageWriteStore.java
+++ b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestColumnChunkPageWriteStore.java
@@ -46,6 +46,7 @@
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
+import org.apache.parquet.column.ParquetProperties;
import org.junit.Before;
import org.junit.Test;
import org.mockito.InOrder;
diff --git a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestDataPageV1Checksums.java b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestDataPageV1Checksums.java
new file mode 100644
index 0000000..61a9d63
--- /dev/null
+++ b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestDataPageV1Checksums.java
@@ -0,0 +1,563 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.parquet.hadoop;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Random;
+import java.util.zip.CRC32;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.parquet.bytes.BytesInput;
+import org.apache.parquet.bytes.HeapByteBufferAllocator;
+import org.apache.parquet.column.ColumnDescriptor;
+import org.apache.parquet.column.Encoding;
+import org.apache.parquet.column.page.DataPageV1;
+import org.apache.parquet.column.page.DictionaryPage;
+import org.apache.parquet.column.page.Page;
+import org.apache.parquet.column.page.PageReadStore;
+import org.apache.parquet.column.page.PageWriter;
+import org.apache.parquet.column.statistics.Statistics;
+import org.apache.parquet.example.data.Group;
+import org.apache.parquet.example.data.GroupFactory;
+import org.apache.parquet.example.data.simple.SimpleGroupFactory;
+import org.apache.parquet.hadoop.codec.SnappyCompressor;
+import org.apache.parquet.hadoop.example.ExampleParquetWriter;
+import org.apache.parquet.hadoop.metadata.CompressionCodecName;
+import org.apache.parquet.hadoop.metadata.ParquetMetadata;
+import org.apache.parquet.hadoop.util.HadoopInputFile;
+import org.apache.parquet.hadoop.util.HadoopOutputFile;
+import org.apache.parquet.io.InputFile;
+import org.apache.parquet.io.OutputFile;
+import org.apache.parquet.io.PositionOutputStream;
+import org.apache.parquet.io.ParquetDecodingException;
+import org.apache.parquet.io.SeekableInputStream;
+import org.apache.parquet.schema.MessageType;
+import org.apache.parquet.schema.MessageTypeParser;
+import org.apache.parquet.schema.Types;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.INT32;
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+/**
+ * Tests that page level checksums are correctly written and that checksum verification works as
+ * expected
+ */
+public class TestDataPageV1Checksums {
+ @Rule
+ public final TemporaryFolder tempFolder = new TemporaryFolder();
+
+ private static final Statistics<?> EMPTY_STATS_INT32 = Statistics.getBuilderForReading(
+ Types.required(INT32).named("a")).build();
+
+ private CRC32 crc = new CRC32();
+
+ // Sample data, two columns 'a' and 'b' (both int32),
+
+ private static final int PAGE_SIZE = 1024 * 1024; // 1MB
+
+ private static final MessageType schemaSimple = MessageTypeParser.parseMessageType(
+ "message m {" +
+ " required int32 a;" +
+ " required int32 b;" +
+ "}");
+ private static final ColumnDescriptor colADesc = schemaSimple.getColumns().get(0);
+ private static final ColumnDescriptor colBDesc = schemaSimple.getColumns().get(1);
+ private static final byte[] colAPage1Bytes = new byte[PAGE_SIZE];
+ private static final byte[] colAPage2Bytes = new byte[PAGE_SIZE];
+ private static final byte[] colBPage1Bytes = new byte[PAGE_SIZE];
+ private static final byte[] colBPage2Bytes = new byte[PAGE_SIZE];
+ private static final int numRecordsLargeFile = (2 * PAGE_SIZE) / Integer.BYTES;
+
+ /** Write out sample Parquet file using ColumnChunkPageWriteStore directly, return path to file */
+ private Path writeSimpleParquetFile(Configuration conf, CompressionCodecName compression)
+ throws IOException {
+ File file = tempFolder.newFile();
+ file.delete();
+ Path path = new Path(file.toURI());
+
+ for (int i = 0; i < PAGE_SIZE; i++) {
+ colAPage1Bytes[i] = (byte) i;
+ colAPage2Bytes[i] = (byte) -i;
+ colBPage1Bytes[i] = (byte) (i + 100);
+ colBPage2Bytes[i] = (byte) (i - 100);
+ }
+
+ ParquetFileWriter writer = new ParquetFileWriter(conf, schemaSimple, path,
+ ParquetWriter.DEFAULT_BLOCK_SIZE, ParquetWriter.MAX_PADDING_SIZE_DEFAULT);
+
+ writer.start();
+ writer.startBlock(numRecordsLargeFile);
+
+ CodecFactory codecFactory = new CodecFactory(conf, PAGE_SIZE);
+ CodecFactory.BytesCompressor compressor = codecFactory.getCompressor(compression);
+
+ ColumnChunkPageWriteStore writeStore = new ColumnChunkPageWriteStore(
+ compressor, schemaSimple, new HeapByteBufferAllocator(),
+ Integer.MAX_VALUE, ParquetOutputFormat.getPageWriteChecksumEnabled(conf));
+
+ PageWriter pageWriter = writeStore.getPageWriter(colADesc);
+ pageWriter.writePage(BytesInput.from(colAPage1Bytes), numRecordsLargeFile / 2,
+ numRecordsLargeFile / 2, EMPTY_STATS_INT32, Encoding.RLE, Encoding.RLE, Encoding.PLAIN);
+ pageWriter.writePage(BytesInput.from(colAPage2Bytes), numRecordsLargeFile / 2,
+ numRecordsLargeFile / 2, EMPTY_STATS_INT32, Encoding.RLE, Encoding.RLE, Encoding.PLAIN);
+
+ pageWriter = writeStore.getPageWriter(colBDesc);
+ pageWriter.writePage(BytesInput.from(colBPage1Bytes), numRecordsLargeFile / 2,
+ numRecordsLargeFile / 2, EMPTY_STATS_INT32, Encoding.RLE, Encoding.RLE, Encoding.PLAIN);
+ pageWriter.writePage(BytesInput.from(colBPage2Bytes), numRecordsLargeFile / 2,
+ numRecordsLargeFile / 2, EMPTY_STATS_INT32, Encoding.RLE, Encoding.RLE, Encoding.PLAIN);
+
+ writeStore.flushToFileWriter(writer);
+
+ writer.endBlock();
+ writer.end(new HashMap<>());
+
+ codecFactory.release();
+
+ return path;
+ }
+
+ // Sample data, nested schema with nulls
+
+ private static final MessageType schemaNestedWithNulls = MessageTypeParser.parseMessageType(
+ "message m {" +
+ " optional group c {" +
+ " required int64 id;" +
+ " required group d {" +
+ " repeated int32 val;" +
+ " }" +
+ " }" +
+ "}");
+ private static final ColumnDescriptor colCIdDesc = schemaNestedWithNulls.getColumns().get(0);
+ private static final ColumnDescriptor colDValDesc = schemaNestedWithNulls.getColumns().get(1);
+
+ private static final double nullRatio = 0.3;
+ private static final int numRecordsNestedWithNullsFile = 2000;
+
+ private Path writeNestedWithNullsSampleParquetFile(Configuration conf,
+ boolean dictionaryEncoding,
+ CompressionCodecName compression)
+ throws IOException {
+ File file = tempFolder.newFile();
+ file.delete();
+ Path path = new Path(file.toURI());
+
+ try (ParquetWriter<Group> writer = ExampleParquetWriter.builder(path)
+ .withConf(conf)
+ .withWriteMode(ParquetFileWriter.Mode.OVERWRITE)
+ .withCompressionCodec(compression)
+ .withDictionaryEncoding(dictionaryEncoding)
+ .withType(schemaNestedWithNulls)
+ .withPageWriteChecksumEnabled(ParquetOutputFormat.getPageWriteChecksumEnabled(conf))
+ .build()) {
+ GroupFactory groupFactory = new SimpleGroupFactory(schemaNestedWithNulls);
+ Random rand = new Random(42);
+
+ for (int i = 0; i < numRecordsNestedWithNullsFile; i++) {
+ Group group = groupFactory.newGroup();
+ if (rand.nextDouble() > nullRatio) {
+ // With equal probability, write out either 1 or 3 values in group e. To ensure our values
+ // are dictionary encoded when required, perform modulo.
+ if (rand.nextDouble() > 0.5) {
+ group.addGroup("c").append("id", (long) i).addGroup("d")
+ .append("val", rand.nextInt() % 10);
+ } else {
+ group.addGroup("c").append("id", (long) i).addGroup("d")
+ .append("val", rand.nextInt() % 10)
+ .append("val", rand.nextInt() % 10)
+ .append("val", rand.nextInt() % 10);
+ }
+ }
+ writer.write(group);
+ }
+ }
+
+ return path;
+ }
+
+ /**
+ * Enable writing out page level crc checksum, disable verification in read path but check that
+ * the crc checksums are correct. Tests whether we successfully write out correct crc checksums
+ * without potentially failing on the read path verification .
+ */
+ @Test
+ public void testWriteOnVerifyOff() throws IOException {
+ Configuration conf = new Configuration();
+ conf.setBoolean(ParquetOutputFormat.PAGE_WRITE_CHECKSUM_ENABLED, true);
+ conf.setBoolean(ParquetInputFormat.PAGE_VERIFY_CHECKSUM_ENABLED, false);
+
+ Path path = writeSimpleParquetFile(conf, CompressionCodecName.UNCOMPRESSED);
+
+ try (ParquetFileReader reader = getParquetFileReader(path, conf,
+ Arrays.asList(colADesc, colBDesc))) {
+ PageReadStore pageReadStore = reader.readNextRowGroup();
+
+ DataPageV1 colAPage1 = readNextPage(colADesc, pageReadStore);
+ assertCrcSetAndCorrect(colAPage1, colAPage1Bytes);
+ assertCorrectContent(colAPage1.getBytes().toByteArray(), colAPage1Bytes);
+
+ DataPageV1 colAPage2 = readNextPage(colADesc, pageReadStore);
+ assertCrcSetAndCorrect(colAPage2, colAPage2Bytes);
+ assertCorrectContent(colAPage2.getBytes().toByteArray(), colAPage2Bytes);
+
+ DataPageV1 colBPage1 = readNextPage(colBDesc, pageReadStore);
+ assertCrcSetAndCorrect(colBPage1, colBPage1Bytes);
+ assertCorrectContent(colBPage1.getBytes().toByteArray(), colBPage1Bytes);
+
+ DataPageV1 colBPage2 = readNextPage(colBDesc, pageReadStore);
+ assertCrcSetAndCorrect(colBPage2, colBPage2Bytes);
+ assertCorrectContent(colBPage2.getBytes().toByteArray(), colBPage2Bytes);
+ }
+ }
+
+ /** Test that we do not write out checksums if the feature is turned off */
+ @Test
+ public void testWriteOffVerifyOff() throws IOException {
+ Configuration conf = new Configuration();
+ conf.setBoolean(ParquetOutputFormat.PAGE_WRITE_CHECKSUM_ENABLED, false);
+ conf.setBoolean(ParquetInputFormat.PAGE_VERIFY_CHECKSUM_ENABLED, false);
+
+ Path path = writeSimpleParquetFile(conf, CompressionCodecName.UNCOMPRESSED);
+
+ try (ParquetFileReader reader = getParquetFileReader(path, conf,
+ Arrays.asList(colADesc, colBDesc))) {
+ PageReadStore pageReadStore = reader.readNextRowGroup();
+
+ assertCrcNotSet(readNextPage(colADesc, pageReadStore));
+ assertCrcNotSet(readNextPage(colADesc, pageReadStore));
+ assertCrcNotSet(readNextPage(colBDesc, pageReadStore));
+ assertCrcNotSet(readNextPage(colBDesc, pageReadStore));
+ }
+ }
+
+ /**
+ * Do not write out page level crc checksums, but enable verification on the read path. Tests
+ * that the read still succeeds and does not throw an exception.
+ */
+ @Test
+ public void testWriteOffVerifyOn() throws IOException {
+ Configuration conf = new Configuration();
+ conf.setBoolean(ParquetOutputFormat.PAGE_WRITE_CHECKSUM_ENABLED, false);
+ conf.setBoolean(ParquetInputFormat.PAGE_VERIFY_CHECKSUM_ENABLED, true);
+
+ Path path = writeSimpleParquetFile(conf, CompressionCodecName.UNCOMPRESSED);
+
+ try (ParquetFileReader reader = getParquetFileReader(path, conf,
+ Arrays.asList(colADesc, colBDesc))) {
+ PageReadStore pageReadStore = reader.readNextRowGroup();
+
+ assertCorrectContent(readNextPage(colADesc, pageReadStore).getBytes().toByteArray(),
+ colAPage1Bytes);
+ assertCorrectContent(readNextPage(colADesc, pageReadStore).getBytes().toByteArray(),
+ colAPage2Bytes);
+ assertCorrectContent(readNextPage(colBDesc, pageReadStore).getBytes().toByteArray(),
+ colBPage1Bytes);
+ assertCorrectContent(readNextPage(colBDesc, pageReadStore).getBytes().toByteArray(),
+ colBPage2Bytes);
+ }
+ }
+
+ /**
+ * Write out checksums and verify them on the read path. Tests that crc is set and that we can
+ * read back what we wrote if checksums are enabled on both the write and read path.
+ */
+ @Test
+ public void testWriteOnVerifyOn() throws IOException {
+ Configuration conf = new Configuration();
+ conf.setBoolean(ParquetOutputFormat.PAGE_WRITE_CHECKSUM_ENABLED, true);
+ conf.setBoolean(ParquetInputFormat.PAGE_VERIFY_CHECKSUM_ENABLED, true);
+
+ Path path = writeSimpleParquetFile(conf, CompressionCodecName.UNCOMPRESSED);
+
+ try (ParquetFileReader reader = getParquetFileReader(path, conf,
+ Arrays.asList(colADesc, colBDesc))) {
+ PageReadStore pageReadStore = reader.readNextRowGroup();
+
+ DataPageV1 colAPage1 = readNextPage(colADesc, pageReadStore);
+ assertCrcSetAndCorrect(colAPage1, colAPage1Bytes);
+ assertCorrectContent(colAPage1.getBytes().toByteArray(), colAPage1Bytes);
+
+ DataPageV1 colAPage2 = readNextPage(colADesc, pageReadStore);
+ assertCrcSetAndCorrect(colAPage2, colAPage2Bytes);
+ assertCorrectContent(colAPage2.getBytes().toByteArray(), colAPage2Bytes);
+
+ DataPageV1 colBPage1 = readNextPage(colBDesc, pageReadStore);
+ assertCrcSetAndCorrect(colBPage1, colBPage1Bytes);
+ assertCorrectContent(colBPage1.getBytes().toByteArray(), colBPage1Bytes);
+
+ DataPageV1 colBPage2 = readNextPage(colBDesc, pageReadStore);
+ assertCrcSetAndCorrect(colBPage2, colBPage2Bytes);
+ assertCorrectContent(colBPage2.getBytes().toByteArray(), colBPage2Bytes);
+ }
+ }
+
+ /**
+ * Test whether corruption in the page content is detected by checksum verification
+ */
+ @Test
+ public void testCorruptedPage() throws IOException {
+ Configuration conf = new Configuration();
+ conf.setBoolean(ParquetOutputFormat.PAGE_WRITE_CHECKSUM_ENABLED, true);
+
+ Path path = writeSimpleParquetFile(conf, CompressionCodecName.UNCOMPRESSED);
+
+ InputFile inputFile = HadoopInputFile.fromPath(path, conf);
+ try (SeekableInputStream inputStream = inputFile.newStream()) {
+ int fileLen = (int) inputFile.getLength();
+ byte[] fileBytes = new byte[fileLen];
+ inputStream.readFully(fileBytes);
+ inputStream.close();
+
+ // There are 4 pages in total (2 per column), we corrupt the first page of the first column
+ // and the second page of the second column. We do this by altering a byte roughly in the
+ // middle of each page to be corrupted
+ fileBytes[fileLen / 8]++;
+ fileBytes[fileLen / 8 + ((fileLen / 4) * 3)]++;
+
+ OutputFile outputFile = HadoopOutputFile.fromPath(path, conf);
+ try (PositionOutputStream outputStream = outputFile.createOrOverwrite(1024 * 1024)) {
+ outputStream.write(fileBytes);
+ outputStream.close();
+
+ // First we disable checksum verification, the corruption will go undetected as it is in the
+ // data section of the page
+ conf.setBoolean(ParquetInputFormat.PAGE_VERIFY_CHECKSUM_ENABLED, false);
+ try (ParquetFileReader reader = getParquetFileReader(path, conf,
+ Arrays.asList(colADesc, colBDesc))) {
+ PageReadStore pageReadStore = reader.readNextRowGroup();
+
+ DataPageV1 colAPage1 = readNextPage(colADesc, pageReadStore);
+ assertFalse("Data in page was not corrupted",
+ Arrays.equals(colAPage1.getBytes().toByteArray(), colAPage1Bytes));
+ readNextPage(colADesc, pageReadStore);
+ readNextPage(colBDesc, pageReadStore);
+ DataPageV1 colBPage2 = readNextPage(colBDesc, pageReadStore);
+ assertFalse("Data in page was not corrupted",
+ Arrays.equals(colBPage2.getBytes().toByteArray(), colBPage2Bytes));
+ }
+
+ // Now we enable checksum verification, the corruption should be detected
+ conf.setBoolean(ParquetInputFormat.PAGE_VERIFY_CHECKSUM_ENABLED, true);
+ try (ParquetFileReader reader =
+ getParquetFileReader(path, conf, Arrays.asList(colADesc, colBDesc))) {
+ // We expect an exception on the first encountered corrupt page (in readAllPages)
+ assertVerificationFailed(reader);
+ }
+ }
+ }
+ }
+
+ /**
+ * Tests that the checksum is calculated using the compressed version of the data and that
+ * checksum verification succeeds
+ */
+ @Test
+ public void testCompression() throws IOException {
+ Configuration conf = new Configuration();
+ conf.setBoolean(ParquetOutputFormat.PAGE_WRITE_CHECKSUM_ENABLED, true);
+ conf.setBoolean(ParquetInputFormat.PAGE_VERIFY_CHECKSUM_ENABLED, true);
+
+ Path path = writeSimpleParquetFile(conf, CompressionCodecName.SNAPPY);
+
+ try (ParquetFileReader reader = getParquetFileReader(path, conf,
+ Arrays.asList(colADesc, colBDesc))) {
+ PageReadStore pageReadStore = reader.readNextRowGroup();
+
+ DataPageV1 colAPage1 = readNextPage(colADesc, pageReadStore);
+ assertCrcSetAndCorrect(colAPage1, snappy(colAPage1Bytes));
+ assertCorrectContent(colAPage1.getBytes().toByteArray(), colAPage1Bytes);
+
+ DataPageV1 colAPage2 = readNextPage(colADesc, pageReadStore);
+ assertCrcSetAndCorrect(colAPage2, snappy(colAPage2Bytes));
+ assertCorrectContent(colAPage2.getBytes().toByteArray(), colAPage2Bytes);
+
+ DataPageV1 colBPage1 = readNextPage(colBDesc, pageReadStore);
+ assertCrcSetAndCorrect(colBPage1, snappy(colBPage1Bytes));
+ assertCorrectContent(colBPage1.getBytes().toByteArray(), colBPage1Bytes);
+
+ DataPageV1 colBPage2 = readNextPage(colBDesc, pageReadStore);
+ assertCrcSetAndCorrect(colBPage2, snappy(colBPage2Bytes));
+ assertCorrectContent(colBPage2.getBytes().toByteArray(), colBPage2Bytes);
+ }
+ }
+
+ /**
+ * Tests that we adhere to the checksum calculation specification, namely that the crc is
+ * calculated using the compressed concatenation of the repetition levels, definition levels and
+ * the actual data. This is done by generating sample data with a nested schema containing nulls
+ * (generating non trivial repetition and definition levels).
+ */
+ @Test
+ public void testNestedWithNulls() throws IOException {
+ Configuration conf = new Configuration();
+
+ // Write out sample file via the non-checksum code path, extract the raw bytes to calculate the
+ // reference crc with
+ conf.setBoolean(ParquetOutputFormat.PAGE_WRITE_CHECKSUM_ENABLED, false);
+ conf.setBoolean(ParquetInputFormat.PAGE_VERIFY_CHECKSUM_ENABLED, false);
+ Path refPath = writeNestedWithNullsSampleParquetFile(conf, false, CompressionCodecName.SNAPPY);
+
+ try (ParquetFileReader refReader = getParquetFileReader(refPath, conf,
+ Arrays.asList(colCIdDesc, colDValDesc))) {
+ PageReadStore refPageReadStore = refReader.readNextRowGroup();
+ byte[] colCIdPageBytes = readNextPage(colCIdDesc, refPageReadStore).getBytes().toByteArray();
+ byte[] colDValPageBytes = readNextPage(colDValDesc, refPageReadStore).getBytes().toByteArray();
+
+ // Write out sample file with checksums
+ conf.setBoolean(ParquetOutputFormat.PAGE_WRITE_CHECKSUM_ENABLED, true);
+ conf.setBoolean(ParquetInputFormat.PAGE_VERIFY_CHECKSUM_ENABLED, true);
+ Path path = writeNestedWithNullsSampleParquetFile(conf, false, CompressionCodecName.SNAPPY);
+
+ try (ParquetFileReader reader = getParquetFileReader(path, conf,
+ Arrays.asList(colCIdDesc, colDValDesc))) {
+ PageReadStore pageReadStore = reader.readNextRowGroup();
+
+ DataPageV1 colCIdPage = readNextPage(colCIdDesc, pageReadStore);
+ assertCrcSetAndCorrect(colCIdPage, snappy(colCIdPageBytes));
+ assertCorrectContent(colCIdPage.getBytes().toByteArray(), colCIdPageBytes);
+
+ DataPageV1 colDValPage = readNextPage(colDValDesc, pageReadStore);
+ assertCrcSetAndCorrect(colDValPage, snappy(colDValPageBytes));
+ assertCorrectContent(colDValPage.getBytes().toByteArray(), colDValPageBytes);
+ }
+ }
+ }
+
+ @Test
+ public void testDictionaryEncoding() throws IOException {
+ Configuration conf = new Configuration();
+
+ // Write out dictionary encoded sample file via the non-checksum code path, extract the raw
+ // bytes to calculate the reference crc with
+ conf.setBoolean(ParquetOutputFormat.PAGE_WRITE_CHECKSUM_ENABLED, false);
+ conf.setBoolean(ParquetInputFormat.PAGE_VERIFY_CHECKSUM_ENABLED, false);
+ Path refPath = writeNestedWithNullsSampleParquetFile(conf, true, CompressionCodecName.SNAPPY);
+
+ try (ParquetFileReader refReader =
+ getParquetFileReader(refPath, conf, Collections.singletonList(colDValDesc))) {
+ PageReadStore refPageReadStore = refReader.readNextRowGroup();
+ // Read (decompressed) dictionary page
+ byte[] dictPageBytes = readDictPage(colDValDesc, refPageReadStore).getBytes().toByteArray();
+ byte[] colDValPageBytes = readNextPage(colDValDesc, refPageReadStore).getBytes().toByteArray();
+
+ // Write out sample file with checksums
+ conf.setBoolean(ParquetOutputFormat.PAGE_WRITE_CHECKSUM_ENABLED, true);
+ conf.setBoolean(ParquetInputFormat.PAGE_VERIFY_CHECKSUM_ENABLED, true);
+ Path path = writeNestedWithNullsSampleParquetFile(conf, true, CompressionCodecName.SNAPPY);
+
+ try (ParquetFileReader reader =
+ getParquetFileReader(path, conf, Collections.singletonList(colDValDesc))) {
+ PageReadStore pageReadStore = reader.readNextRowGroup();
+
+ DictionaryPage dictPage = readDictPage(colDValDesc, pageReadStore);
+ assertCrcSetAndCorrect(dictPage, snappy(dictPageBytes));
+ assertCorrectContent(dictPage.getBytes().toByteArray(), dictPageBytes);
+
+ DataPageV1 colDValPage = readNextPage(colDValDesc, pageReadStore);
+ assertCrcSetAndCorrect(colDValPage, snappy(colDValPageBytes));
+ assertCorrectContent(colDValPage.getBytes().toByteArray(), colDValPageBytes);
+ }
+ }
+ }
+
+ /** Compress using snappy */
+ private byte[] snappy(byte[] bytes) throws IOException {
+ SnappyCompressor compressor = new SnappyCompressor();
+ compressor.reset();
+ compressor.setInput(bytes, 0, bytes.length);
+ compressor.finish();
+ byte[] buffer = new byte[bytes.length * 2];
+ int compressedSize = compressor.compress(buffer, 0, buffer.length);
+ return Arrays.copyOfRange(buffer, 0, compressedSize);
+ }
+
+ /** Construct ParquetFileReader for input file and columns */
+ private ParquetFileReader getParquetFileReader(Path path, Configuration conf,
+ List<ColumnDescriptor> columns)
+ throws IOException {
+ ParquetMetadata footer = ParquetFileReader.readFooter(conf, path);
+ return new ParquetFileReader(conf, footer.getFileMetaData(), path,
+ footer.getBlocks(), columns);
+ }
+
+ /** Read the dictionary page for the column */
+ private DictionaryPage readDictPage(ColumnDescriptor colDesc, PageReadStore pageReadStore) {
+ return pageReadStore.getPageReader(colDesc).readDictionaryPage();
+ }
+
+ /** Read the next page for a column */
+ private DataPageV1 readNextPage(ColumnDescriptor colDesc, PageReadStore pageReadStore) {
+ return (DataPageV1) pageReadStore.getPageReader(colDesc).readPage();
+ }
+
+ /**
+ * Compare the extracted (decompressed) bytes to the reference bytes
+ */
+ private void assertCorrectContent(byte[] pageBytes, byte[] referenceBytes) {
+ assertArrayEquals("Read page content was different from expected page content", referenceBytes,
+ pageBytes);
+ }
+
+ /**
+ * Verify that the crc is set in a page, calculate the reference crc using the reference bytes and
+ * check that the crc's are identical.
+ */
+ private void assertCrcSetAndCorrect(Page page, byte[] referenceBytes) {
+ assertTrue("Checksum was not set in page", page.getCrc().isPresent());
+ int crcFromPage = page.getCrc().getAsInt();
+ crc.reset();
+ crc.update(referenceBytes);
+ assertEquals("Checksum found in page did not match calculated reference checksum",
+ crc.getValue(), (long) crcFromPage & 0xffffffffL);
+ }
+
+ /** Verify that the crc is not set */
+ private void assertCrcNotSet(Page page) {
+ assertFalse("Checksum was set in page", page.getCrc().isPresent());
+ }
+
+ /**
+ * Read the next page for a column, fail if this did not throw an checksum verification exception,
+ * if the read succeeds (no exception was thrown ), verify that the checksum was not set.
+ */
+ private void assertVerificationFailed(ParquetFileReader reader) {
+ try {
+ reader.readNextRowGroup();
+ fail("Expected checksum verification exception to be thrown");
+ } catch (Exception e) {
+ assertTrue("Thrown exception is of incorrect type", e instanceof ParquetDecodingException);
+ assertTrue("Did not catch checksum verification ParquetDecodingException",
+ e.getMessage().contains("CRC checksum verification failed"));
+ }
+ }
+}
diff --git a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestParquetFileWriter.java b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestParquetFileWriter.java
index 917ad57..8763cac 100644
--- a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestParquetFileWriter.java
+++ b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestParquetFileWriter.java
@@ -225,6 +225,8 @@
Path path = new Path(testFile.toURI());
Configuration conf = new Configuration();
+ // Disable writing out checksums as hardcoded byte offsets in assertions below expect it
+ conf.setBoolean(ParquetOutputFormat.PAGE_WRITE_CHECKSUM_ENABLED, false);
// uses the test constructor
ParquetFileWriter w = new ParquetFileWriter(conf, SCHEMA, path, 120, 60);
@@ -330,6 +332,8 @@
Path path = new Path(testFile.toURI());
Configuration conf = new Configuration();
+ // Disable writing out checksums as hardcoded byte offsets in assertions below expect it
+ conf.setBoolean(ParquetOutputFormat.PAGE_WRITE_CHECKSUM_ENABLED, false);
// uses the test constructor
ParquetFileWriter w = new ParquetFileWriter(conf, SCHEMA, path, 100, 50);
diff --git a/parquet-tools/src/main/java/org/apache/parquet/tools/command/DumpCommand.java b/parquet-tools/src/main/java/org/apache/parquet/tools/command/DumpCommand.java
index 27043b9..eaf6e8e 100644
--- a/parquet-tools/src/main/java/org/apache/parquet/tools/command/DumpCommand.java
+++ b/parquet-tools/src/main/java/org/apache/parquet/tools/command/DumpCommand.java
@@ -29,6 +29,7 @@
import java.util.HashSet;
import java.util.List;
import java.util.Set;
+import java.util.zip.CRC32;
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.Option;
@@ -73,6 +74,8 @@
public static final int BLOCK_BUFFER_SIZE = 64 * 1024;
public static final String[] USAGE = new String[] { "<input>", "where <input> is the parquet file to print to stdout" };
+ private static CRC32 crc = new CRC32();
+
public static final Options OPTIONS;
static {
OPTIONS = new Options();
@@ -242,6 +245,12 @@
}
}
+ private static boolean verifyCrc(int referenceCrc, byte[] bytes) {
+ crc.reset();
+ crc.update(bytes);
+ return crc.getValue() == ((long) referenceCrc & 0xffffffffL);
+ }
+
public static void dump(final PrettyPrintWriter out, PageReadStore store, ColumnDescriptor column) throws IOException {
PageReader reader = store.getPageReader(column);
@@ -274,6 +283,15 @@
} else {
out.format(" ST:[none]");
}
+ if (pageV1.getCrc().isPresent()) {
+ try {
+ out.format(" CRC:%s", verifyCrc(pageV1.getCrc().getAsInt(), pageV1.getBytes().toByteArray()) ? "[verified]" : "[PAGE CORRUPT]");
+ } catch (IOException e) {
+ out.format(" CRC:[error getting page bytes]");
+ }
+ } else {
+ out.format(" CRC:[none]");
+ }
return null;
}