PARQUET-1580: Page-level CRC checksum verfication for DataPageV1 (#647)

* Page-level checksums for DataPageV1

* Got rid of redundant constant

* Use more direct way of obtaining defaults

* Revised implementation, updated tests, addressed review comments

* Revert auto whitespace trimming

* Variable rename for consistency

* Revert whitespace changes

* Revert more whitespace changes

* Addressed code review comments

* Enable writing out checksums by default

* Added benchmarks

* Addressed review comments

* Addressed test failures

* Added run script for checksum benchmarks

* Addressed code review comments
diff --git a/parquet-benchmarks/run_checksums.sh b/parquet-benchmarks/run_checksums.sh
new file mode 100755
index 0000000..e798488
--- /dev/null
+++ b/parquet-benchmarks/run_checksums.sh
@@ -0,0 +1,28 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+# !/usr/bin/env bash
+
+SCRIPT_PATH=$( cd "$(dirname "$0")" ; pwd -P )
+
+echo "Page level CRC checksum benchmarks"
+echo "Running write benchmarks"
+java -jar ${SCRIPT_PATH}/target/parquet-benchmarks.jar p*PageChecksumWriteBenchmarks -bm ss "$@"
+echo "Running read benchmarks"
+java -jar ${SCRIPT_PATH}/target/parquet-benchmarks.jar p*PageChecksumReadBenchmarks -bm ss "$@"
diff --git a/parquet-benchmarks/src/main/java/org/apache/parquet/benchmarks/BenchmarkFiles.java b/parquet-benchmarks/src/main/java/org/apache/parquet/benchmarks/BenchmarkFiles.java
index d9ef4fd..f039403 100644
--- a/parquet-benchmarks/src/main/java/org/apache/parquet/benchmarks/BenchmarkFiles.java
+++ b/parquet-benchmarks/src/main/java/org/apache/parquet/benchmarks/BenchmarkFiles.java
@@ -37,4 +37,26 @@
 //  public final Path parquetFile_1M_LZO = new Path("target/tests/ParquetBenchmarks/PARQUET-1M-LZO");
   public static final Path file_1M_SNAPPY = new Path(TARGET_DIR + "/PARQUET-1M-SNAPPY");
   public static final Path file_1M_GZIP = new Path(TARGET_DIR + "/PARQUET-1M-GZIP");
+
+  // Page checksum files
+  public static final Path file_100K_CHECKSUMS_UNCOMPRESSED = new Path(TARGET_DIR + "/PARQUET-100K-CHECKSUMS-UNCOMPRESSED");
+  public static final Path file_100K_NOCHECKSUMS_UNCOMPRESSED = new Path(TARGET_DIR + "/PARQUET-100K-NOCHECKSUMS-UNCOMPRESSED");
+  public static final Path file_1M_CHECKSUMS_UNCOMPRESSED = new Path(TARGET_DIR + "/PARQUET-1M-CHECKSUMS-UNCOMPRESSED");
+  public static final Path file_1M_NOCHECKSUMS_UNCOMPRESSED = new Path(TARGET_DIR + "/PARQUET-1M-NOCHECKSUMS-UNCOMPRESSED");
+  public static final Path file_10M_CHECKSUMS_UNCOMPRESSED = new Path(TARGET_DIR + "/PARQUET-10M-CHECKSUMS-UNCOMPRESSED");
+  public static final Path file_10M_NOCHECKSUMS_UNCOMPRESSED = new Path(TARGET_DIR + "/PARQUET-10M-NOCHECKSUMS-UNCOMPRESSED");
+
+  public static final Path file_100K_CHECKSUMS_GZIP = new Path(TARGET_DIR + "/PARQUET-100K-CHECKSUMS-GZIP");
+  public static final Path file_100K_NOCHECKSUMS_GZIP = new Path(TARGET_DIR + "/PARQUET-100K-NOCHECKSUMS-GZIP");
+  public static final Path file_1M_CHECKSUMS_GZIP = new Path(TARGET_DIR + "/PARQUET-1M-CHECKSUMS-GZIP");
+  public static final Path file_1M_NOCHECKSUMS_GZIP = new Path(TARGET_DIR + "/PARQUET-1M-NOCHECKSUMS-GZIP");
+  public static final Path file_10M_CHECKSUMS_GZIP = new Path(TARGET_DIR + "/PARQUET-10M-CHECKSUMS-GZIP");
+  public static final Path file_10M_NOCHECKSUMS_GZIP = new Path(TARGET_DIR + "/PARQUET-10M-NOCHECKSUMS-GZIP");
+
+  public static final Path file_100K_CHECKSUMS_SNAPPY = new Path(TARGET_DIR + "/PARQUET-100K-CHECKSUMS-SNAPPY");
+  public static final Path file_100K_NOCHECKSUMS_SNAPPY = new Path(TARGET_DIR + "/PARQUET-100K-NOCHECKSUMS-SNAPPY");
+  public static final Path file_1M_CHECKSUMS_SNAPPY = new Path(TARGET_DIR + "/PARQUET-1M-CHECKSUMS-SNAPPY");
+  public static final Path file_1M_NOCHECKSUMS_SNAPPY = new Path(TARGET_DIR + "/PARQUET-1M-NOCHECKSUMS-SNAPPY");
+  public static final Path file_10M_CHECKSUMS_SNAPPY = new Path(TARGET_DIR + "/PARQUET-10M-CHECKSUMS-SNAPPY");
+  public static final Path file_10M_NOCHECKSUMS_SNAPPY = new Path(TARGET_DIR + "/PARQUET-10M-NOCHECKSUMS-SNAPPY");
 }
diff --git a/parquet-benchmarks/src/main/java/org/apache/parquet/benchmarks/PageChecksumDataGenerator.java b/parquet-benchmarks/src/main/java/org/apache/parquet/benchmarks/PageChecksumDataGenerator.java
new file mode 100644
index 0000000..6c62cc6
--- /dev/null
+++ b/parquet-benchmarks/src/main/java/org/apache/parquet/benchmarks/PageChecksumDataGenerator.java
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.benchmarks;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.parquet.example.data.GroupFactory;
+import org.apache.parquet.example.data.Group;
+import org.apache.parquet.example.data.simple.SimpleGroupFactory;
+import org.apache.parquet.hadoop.ParquetFileWriter;
+import org.apache.parquet.hadoop.ParquetWriter;
+import org.apache.parquet.hadoop.example.ExampleParquetWriter;
+import org.apache.parquet.hadoop.metadata.CompressionCodecName;
+import org.apache.parquet.schema.MessageType;
+import org.apache.parquet.schema.MessageTypeParser;
+
+import static java.util.UUID.randomUUID;
+import static org.apache.parquet.benchmarks.BenchmarkConstants.*;
+import static org.apache.parquet.benchmarks.BenchmarkFiles.*;
+
+import java.io.IOException;
+import java.util.Random;
+
+import static org.apache.parquet.benchmarks.BenchmarkUtils.deleteIfExists;
+import static org.apache.parquet.benchmarks.BenchmarkUtils.exists;
+import static org.apache.parquet.hadoop.metadata.CompressionCodecName.*;
+
+public class PageChecksumDataGenerator {
+
+  private final MessageType SCHEMA = MessageTypeParser.parseMessageType(
+    "message m {" +
+      "  required int64 long_field;" +
+      "  required binary binary_field;" +
+      "  required group group {" +
+      "    repeated int32 int_field;" +
+      "  }" +
+      "}");
+
+  public void generateData(Path outFile, int nRows, boolean writeChecksums,
+                           CompressionCodecName compression) throws IOException {
+    if (exists(configuration, outFile)) {
+      System.out.println("File already exists " + outFile);
+      return;
+    }
+
+    ParquetWriter<Group> writer = ExampleParquetWriter.builder(outFile)
+      .withConf(configuration)
+      .withWriteMode(ParquetFileWriter.Mode.OVERWRITE)
+      .withCompressionCodec(compression)
+      .withDictionaryEncoding(true)
+      .withType(SCHEMA)
+      .withPageWriteChecksumEnabled(writeChecksums)
+      .build();
+
+    GroupFactory groupFactory = new SimpleGroupFactory(SCHEMA);
+    Random rand = new Random(42);
+    for (int i = 0; i < nRows; i++) {
+      Group group = groupFactory.newGroup();
+      group
+        .append("long_field", (long) i)
+        .append("binary_field", randomUUID().toString())
+        .addGroup("group")
+        // Force dictionary encoding by performing modulo
+        .append("int_field", rand.nextInt() % 100)
+        .append("int_field", rand.nextInt() % 100)
+        .append("int_field", rand.nextInt() % 100)
+        .append("int_field", rand.nextInt() % 100);
+      writer.write(group);
+    }
+
+    writer.close();
+  }
+
+  public void generateAll() {
+    try {
+      // No need to generate the non-checksum versions, as the files generated here are only used in
+      // the read benchmarks
+      generateData(file_100K_CHECKSUMS_UNCOMPRESSED, 100 * ONE_K, true, UNCOMPRESSED);
+      generateData(file_100K_CHECKSUMS_GZIP, 100 * ONE_K, true, GZIP);
+      generateData(file_100K_CHECKSUMS_SNAPPY, 100 * ONE_K, true, SNAPPY);
+      generateData(file_1M_CHECKSUMS_UNCOMPRESSED, ONE_MILLION, true, UNCOMPRESSED);
+      generateData(file_1M_CHECKSUMS_GZIP, ONE_MILLION, true, GZIP);
+      generateData(file_1M_CHECKSUMS_SNAPPY, ONE_MILLION, true, SNAPPY);
+      generateData(file_10M_CHECKSUMS_UNCOMPRESSED, 10 * ONE_MILLION, true, UNCOMPRESSED);
+      generateData(file_10M_CHECKSUMS_GZIP, 10 * ONE_MILLION, true, GZIP);
+      generateData(file_10M_CHECKSUMS_SNAPPY, 10 * ONE_MILLION, true, SNAPPY);
+    } catch (IOException e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  public void cleanup() {
+    deleteIfExists(configuration, file_100K_NOCHECKSUMS_UNCOMPRESSED);
+    deleteIfExists(configuration, file_100K_CHECKSUMS_UNCOMPRESSED);
+    deleteIfExists(configuration, file_100K_NOCHECKSUMS_GZIP);
+    deleteIfExists(configuration, file_100K_CHECKSUMS_GZIP);
+    deleteIfExists(configuration, file_100K_NOCHECKSUMS_SNAPPY);
+    deleteIfExists(configuration, file_100K_CHECKSUMS_SNAPPY);
+    deleteIfExists(configuration, file_1M_NOCHECKSUMS_UNCOMPRESSED);
+    deleteIfExists(configuration, file_1M_CHECKSUMS_UNCOMPRESSED);
+    deleteIfExists(configuration, file_1M_NOCHECKSUMS_GZIP);
+    deleteIfExists(configuration, file_1M_CHECKSUMS_GZIP);
+    deleteIfExists(configuration, file_1M_NOCHECKSUMS_SNAPPY);
+    deleteIfExists(configuration, file_1M_CHECKSUMS_SNAPPY);
+    deleteIfExists(configuration, file_10M_NOCHECKSUMS_UNCOMPRESSED);
+    deleteIfExists(configuration, file_10M_CHECKSUMS_UNCOMPRESSED);
+    deleteIfExists(configuration, file_10M_NOCHECKSUMS_GZIP);
+    deleteIfExists(configuration, file_10M_CHECKSUMS_GZIP);
+    deleteIfExists(configuration, file_10M_NOCHECKSUMS_SNAPPY);
+    deleteIfExists(configuration, file_10M_CHECKSUMS_SNAPPY);
+  }
+}
diff --git a/parquet-benchmarks/src/main/java/org/apache/parquet/benchmarks/PageChecksumReadBenchmarks.java b/parquet-benchmarks/src/main/java/org/apache/parquet/benchmarks/PageChecksumReadBenchmarks.java
new file mode 100644
index 0000000..db23eeb
--- /dev/null
+++ b/parquet-benchmarks/src/main/java/org/apache/parquet/benchmarks/PageChecksumReadBenchmarks.java
@@ -0,0 +1,179 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.benchmarks;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.parquet.example.data.Group;
+import org.apache.parquet.hadoop.ParquetReader;
+import org.apache.parquet.hadoop.example.GroupReadSupport;
+import org.openjdk.jmh.annotations.Benchmark;
+import org.openjdk.jmh.annotations.BenchmarkMode;
+import org.openjdk.jmh.annotations.Level;
+import org.openjdk.jmh.annotations.Mode;
+import org.openjdk.jmh.annotations.Scope;
+import org.openjdk.jmh.annotations.Setup;
+import org.openjdk.jmh.annotations.State;
+import org.openjdk.jmh.infra.Blackhole;
+
+import static org.apache.parquet.benchmarks.BenchmarkConstants.ONE_K;
+import static org.apache.parquet.benchmarks.BenchmarkConstants.ONE_MILLION;
+import static org.apache.parquet.benchmarks.BenchmarkFiles.configuration;
+import static org.apache.parquet.benchmarks.BenchmarkFiles.file_100K_CHECKSUMS_UNCOMPRESSED;
+import static org.apache.parquet.benchmarks.BenchmarkFiles.file_100K_CHECKSUMS_GZIP;
+import static org.apache.parquet.benchmarks.BenchmarkFiles.file_100K_CHECKSUMS_SNAPPY;
+import static org.apache.parquet.benchmarks.BenchmarkFiles.file_1M_CHECKSUMS_UNCOMPRESSED;
+import static org.apache.parquet.benchmarks.BenchmarkFiles.file_1M_CHECKSUMS_GZIP;
+import static org.apache.parquet.benchmarks.BenchmarkFiles.file_1M_CHECKSUMS_SNAPPY;
+import static org.apache.parquet.benchmarks.BenchmarkFiles.file_10M_CHECKSUMS_UNCOMPRESSED;
+import static org.apache.parquet.benchmarks.BenchmarkFiles.file_10M_CHECKSUMS_GZIP;
+import static org.apache.parquet.benchmarks.BenchmarkFiles.file_10M_CHECKSUMS_SNAPPY;
+
+import java.io.IOException;
+
+@State(Scope.Thread)
+public class PageChecksumReadBenchmarks {
+
+  private PageChecksumDataGenerator pageChecksumDataGenerator = new PageChecksumDataGenerator();
+
+  @Setup(Level.Trial)
+  public void setup() {
+    pageChecksumDataGenerator.generateAll();
+  }
+
+  @Setup(Level.Trial)
+  public void cleanup() {
+    pageChecksumDataGenerator.cleanup();
+  }
+
+  private void readFile(Path file, int nRows, boolean verifyChecksums, Blackhole blackhole)
+    throws IOException {
+    try (ParquetReader<Group> reader = ParquetReader.builder(new GroupReadSupport(), file)
+        .withConf(configuration)
+        .usePageChecksumVerification(verifyChecksums)
+        .build()) {
+      for (int i = 0; i < nRows; i++) {
+        Group group = reader.read();
+        blackhole.consume(group.getLong("long_field", 0));
+        blackhole.consume(group.getBinary("binary_field", 0));
+        Group subgroup = group.getGroup("group", 0);
+        blackhole.consume(subgroup.getInteger("int_field", 0));
+        blackhole.consume(subgroup.getInteger("int_field", 1));
+        blackhole.consume(subgroup.getInteger("int_field", 2));
+        blackhole.consume(subgroup.getInteger("int_field", 3));
+      }
+    }
+  }
+
+  // 100k rows, uncompressed, GZIP, Snappy
+
+  @Benchmark @BenchmarkMode(Mode.SingleShotTime)
+  public void read100KRowsUncompressedWithoutVerification(Blackhole blackhole) throws IOException {
+    readFile(file_100K_CHECKSUMS_UNCOMPRESSED, 100 * ONE_K, false, blackhole);
+  }
+
+  @Benchmark @BenchmarkMode(Mode.SingleShotTime)
+  public void read100KRowsUncompressedWithVerification(Blackhole blackhole) throws IOException {
+    readFile(file_100K_CHECKSUMS_UNCOMPRESSED, 100 * ONE_K, true, blackhole);
+  }
+
+  @Benchmark @BenchmarkMode(Mode.SingleShotTime)
+  public void read100KRowsGzipWithoutVerification(Blackhole blackhole) throws IOException {
+    readFile(file_100K_CHECKSUMS_GZIP, 100 * ONE_K, false, blackhole);
+  }
+
+  @Benchmark @BenchmarkMode(Mode.SingleShotTime)
+  public void read100KRowsGzipWithVerification(Blackhole blackhole) throws IOException {
+    readFile(file_100K_CHECKSUMS_GZIP, 100 * ONE_K, true, blackhole);
+  }
+
+  @Benchmark @BenchmarkMode(Mode.SingleShotTime)
+  public void read100KRowsSnappyWithoutVerification(Blackhole blackhole) throws IOException {
+    readFile(file_100K_CHECKSUMS_SNAPPY, 100 * ONE_K, false, blackhole);
+  }
+
+  @Benchmark @BenchmarkMode(Mode.SingleShotTime)
+  public void read100KRowsSnappyWithVerification(Blackhole blackhole) throws IOException {
+    readFile(file_100K_CHECKSUMS_SNAPPY, 100 * ONE_K, true, blackhole);
+  }
+
+  // 1M rows, uncompressed, GZIP, Snappy
+
+  @Benchmark @BenchmarkMode(Mode.SingleShotTime)
+  public void read1MRowsUncompressedWithoutVerification(Blackhole blackhole) throws IOException {
+    readFile(file_1M_CHECKSUMS_UNCOMPRESSED, ONE_MILLION, false, blackhole);
+  }
+
+  @Benchmark @BenchmarkMode(Mode.SingleShotTime)
+  public void read1MRowsUncompressedWithVerification(Blackhole blackhole) throws IOException {
+    readFile(file_1M_CHECKSUMS_UNCOMPRESSED, ONE_MILLION, true, blackhole);
+  }
+
+  @Benchmark @BenchmarkMode(Mode.SingleShotTime)
+  public void read1MRowsGzipWithoutVerification(Blackhole blackhole) throws IOException {
+    readFile(file_1M_CHECKSUMS_GZIP, ONE_MILLION, false, blackhole);
+  }
+
+  @Benchmark @BenchmarkMode(Mode.SingleShotTime)
+  public void read1MRowsGzipWithVerification(Blackhole blackhole) throws IOException {
+    readFile(file_1M_CHECKSUMS_GZIP, ONE_MILLION, true, blackhole);
+  }
+
+  @Benchmark @BenchmarkMode(Mode.SingleShotTime)
+  public void read1MRowsSnappyWithoutVerification(Blackhole blackhole) throws IOException {
+    readFile(file_1M_CHECKSUMS_SNAPPY, ONE_MILLION, false, blackhole);
+  }
+
+  @Benchmark @BenchmarkMode(Mode.SingleShotTime)
+  public void read1MRowsSnappyWithVerification(Blackhole blackhole) throws IOException {
+    readFile(file_1M_CHECKSUMS_SNAPPY, ONE_MILLION, true, blackhole);
+  }
+
+  // 10M rows, uncompressed, GZIP, Snappy
+
+  @Benchmark @BenchmarkMode(Mode.SingleShotTime)
+  public void read10MRowsUncompressedWithoutVerification(Blackhole blackhole) throws IOException {
+    readFile(file_10M_CHECKSUMS_UNCOMPRESSED, 10 * ONE_MILLION, false, blackhole);
+  }
+
+  @Benchmark @BenchmarkMode(Mode.SingleShotTime)
+  public void read10MRowsUncompressedWithVerification(Blackhole blackhole) throws IOException {
+    readFile(file_10M_CHECKSUMS_UNCOMPRESSED, 10 * ONE_MILLION, true, blackhole);
+  }
+
+  @Benchmark @BenchmarkMode(Mode.SingleShotTime)
+  public void read10MRowsGzipWithoutVerification(Blackhole blackhole) throws IOException {
+    readFile(file_10M_CHECKSUMS_GZIP, 10 * ONE_MILLION, false, blackhole);
+  }
+
+  @Benchmark @BenchmarkMode(Mode.SingleShotTime)
+  public void read10MRowsGzipWithVerification(Blackhole blackhole) throws IOException {
+    readFile(file_10M_CHECKSUMS_GZIP, 10 * ONE_MILLION, true, blackhole);
+  }
+
+  @Benchmark @BenchmarkMode(Mode.SingleShotTime)
+  public void read10MRowsSnappyWithoutVerification(Blackhole blackhole) throws IOException {
+    readFile(file_10M_CHECKSUMS_SNAPPY, 10 * ONE_MILLION, false, blackhole);
+  }
+
+  @Benchmark @BenchmarkMode(Mode.SingleShotTime)
+  public void read10MRowsSnappyWithVerification(Blackhole blackhole) throws IOException {
+    readFile(file_10M_CHECKSUMS_SNAPPY, 10 * ONE_MILLION, true, blackhole);
+  }
+
+}
diff --git a/parquet-benchmarks/src/main/java/org/apache/parquet/benchmarks/PageChecksumWriteBenchmarks.java b/parquet-benchmarks/src/main/java/org/apache/parquet/benchmarks/PageChecksumWriteBenchmarks.java
new file mode 100644
index 0000000..c743dde
--- /dev/null
+++ b/parquet-benchmarks/src/main/java/org/apache/parquet/benchmarks/PageChecksumWriteBenchmarks.java
@@ -0,0 +1,160 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.benchmarks;
+
+import org.openjdk.jmh.annotations.Benchmark;
+import org.openjdk.jmh.annotations.BenchmarkMode;
+import org.openjdk.jmh.annotations.Level;
+import org.openjdk.jmh.annotations.Mode;
+import org.openjdk.jmh.annotations.Scope;
+import org.openjdk.jmh.annotations.Setup;
+import org.openjdk.jmh.annotations.State;
+
+import static org.apache.parquet.benchmarks.BenchmarkConstants.ONE_K;
+import static org.apache.parquet.benchmarks.BenchmarkConstants.ONE_MILLION;
+import static org.apache.parquet.benchmarks.BenchmarkFiles.file_100K_CHECKSUMS_UNCOMPRESSED;
+import static org.apache.parquet.benchmarks.BenchmarkFiles.file_100K_NOCHECKSUMS_UNCOMPRESSED;
+import static org.apache.parquet.benchmarks.BenchmarkFiles.file_100K_CHECKSUMS_GZIP;
+import static org.apache.parquet.benchmarks.BenchmarkFiles.file_100K_NOCHECKSUMS_GZIP;
+import static org.apache.parquet.benchmarks.BenchmarkFiles.file_100K_CHECKSUMS_SNAPPY;
+import static org.apache.parquet.benchmarks.BenchmarkFiles.file_100K_NOCHECKSUMS_SNAPPY;
+import static org.apache.parquet.benchmarks.BenchmarkFiles.file_1M_CHECKSUMS_UNCOMPRESSED;
+import static org.apache.parquet.benchmarks.BenchmarkFiles.file_1M_NOCHECKSUMS_UNCOMPRESSED;
+import static org.apache.parquet.benchmarks.BenchmarkFiles.file_1M_CHECKSUMS_GZIP;
+import static org.apache.parquet.benchmarks.BenchmarkFiles.file_1M_NOCHECKSUMS_GZIP;
+import static org.apache.parquet.benchmarks.BenchmarkFiles.file_1M_CHECKSUMS_SNAPPY;
+import static org.apache.parquet.benchmarks.BenchmarkFiles.file_1M_NOCHECKSUMS_SNAPPY;
+import static org.apache.parquet.benchmarks.BenchmarkFiles.file_10M_CHECKSUMS_UNCOMPRESSED;
+import static org.apache.parquet.benchmarks.BenchmarkFiles.file_10M_NOCHECKSUMS_UNCOMPRESSED;
+import static org.apache.parquet.benchmarks.BenchmarkFiles.file_10M_CHECKSUMS_GZIP;
+import static org.apache.parquet.benchmarks.BenchmarkFiles.file_10M_NOCHECKSUMS_GZIP;
+import static org.apache.parquet.benchmarks.BenchmarkFiles.file_10M_CHECKSUMS_SNAPPY;
+import static org.apache.parquet.benchmarks.BenchmarkFiles.file_10M_NOCHECKSUMS_SNAPPY;
+
+import java.io.IOException;
+
+import static org.apache.parquet.hadoop.metadata.CompressionCodecName.*;
+
+@State(Scope.Thread)
+public class PageChecksumWriteBenchmarks {
+
+  private PageChecksumDataGenerator pageChecksumDataGenerator = new PageChecksumDataGenerator();
+
+  @Setup(Level.Iteration)
+  public void cleanup() {
+    pageChecksumDataGenerator.cleanup();
+  }
+
+  // 100k rows, uncompressed, GZIP, Snappy
+
+  @Benchmark @BenchmarkMode(Mode.SingleShotTime)
+  public void write100KRowsUncompressedWithoutChecksums() throws IOException {
+    pageChecksumDataGenerator.generateData(file_100K_NOCHECKSUMS_UNCOMPRESSED, 100 * ONE_K, false, UNCOMPRESSED);
+  }
+
+  @Benchmark @BenchmarkMode(Mode.SingleShotTime)
+  public void write100KRowsUncompressedWithChecksums() throws IOException {
+    pageChecksumDataGenerator.generateData(file_100K_CHECKSUMS_UNCOMPRESSED, 100 * ONE_K, true, UNCOMPRESSED);
+  }
+
+  @Benchmark @BenchmarkMode(Mode.SingleShotTime)
+  public void write100KRowsGzipWithoutChecksums() throws IOException {
+    pageChecksumDataGenerator.generateData(file_100K_NOCHECKSUMS_GZIP, 100 * ONE_K, false, GZIP);
+  }
+
+  @Benchmark @BenchmarkMode(Mode.SingleShotTime)
+  public void write100KRowsGzipWithChecksums() throws IOException {
+    pageChecksumDataGenerator.generateData(file_100K_CHECKSUMS_GZIP, 100 * ONE_K, true, GZIP);
+  }
+
+  @Benchmark @BenchmarkMode(Mode.SingleShotTime)
+  public void write100KRowsSnappyWithoutChecksums() throws IOException {
+    pageChecksumDataGenerator.generateData(file_100K_NOCHECKSUMS_SNAPPY, 100 * ONE_K, false, SNAPPY);
+  }
+
+  @Benchmark @BenchmarkMode(Mode.SingleShotTime)
+  public void write100KRowsSnappyWithChecksums() throws IOException {
+    pageChecksumDataGenerator.generateData(file_100K_CHECKSUMS_SNAPPY, 100 * ONE_K, true, SNAPPY);
+  }
+
+  // 1M rows, uncompressed, GZIP, Snappy
+
+  @Benchmark @BenchmarkMode(Mode.SingleShotTime)
+  public void write1MRowsUncompressedWithoutChecksums() throws IOException {
+    pageChecksumDataGenerator.generateData(file_1M_NOCHECKSUMS_UNCOMPRESSED, ONE_MILLION, false, UNCOMPRESSED);
+  }
+
+  @Benchmark @BenchmarkMode(Mode.SingleShotTime)
+  public void write1MRowsUncompressedWithChecksums() throws IOException {
+    pageChecksumDataGenerator.generateData(file_1M_CHECKSUMS_UNCOMPRESSED, ONE_MILLION, true, UNCOMPRESSED);
+  }
+
+  @Benchmark @BenchmarkMode(Mode.SingleShotTime)
+  public void write1MRowsGzipWithoutChecksums() throws IOException {
+    pageChecksumDataGenerator.generateData(file_1M_NOCHECKSUMS_GZIP, ONE_MILLION, false, GZIP);
+  }
+
+  @Benchmark @BenchmarkMode(Mode.SingleShotTime)
+  public void write1MRowsGzipWithChecksums() throws IOException {
+    pageChecksumDataGenerator.generateData(file_1M_CHECKSUMS_GZIP, ONE_MILLION, true, GZIP);
+  }
+
+  @Benchmark @BenchmarkMode(Mode.SingleShotTime)
+  public void write1MRowsSnappyWithoutChecksums() throws IOException {
+    pageChecksumDataGenerator.generateData(file_1M_NOCHECKSUMS_SNAPPY, ONE_MILLION, false, SNAPPY);
+  }
+
+  @Benchmark @BenchmarkMode(Mode.SingleShotTime)
+  public void write1MRowsSnappyWithChecksums() throws IOException {
+    pageChecksumDataGenerator.generateData(file_1M_CHECKSUMS_SNAPPY, ONE_MILLION, true, SNAPPY);
+  }
+
+  // 10M rows, uncompressed, GZIP, Snappy
+
+  @Benchmark @BenchmarkMode(Mode.SingleShotTime)
+  public void write10MRowsUncompressedWithoutChecksums() throws IOException {
+    pageChecksumDataGenerator.generateData(file_10M_NOCHECKSUMS_UNCOMPRESSED, 10 * ONE_MILLION, false, UNCOMPRESSED);
+  }
+
+  @Benchmark @BenchmarkMode(Mode.SingleShotTime)
+  public void write10MRowsUncompressedWithChecksums() throws IOException {
+    pageChecksumDataGenerator.generateData(file_10M_CHECKSUMS_UNCOMPRESSED, 10 * ONE_MILLION, true, UNCOMPRESSED);
+  }
+
+  @Benchmark @BenchmarkMode(Mode.SingleShotTime)
+  public void write10MRowsGzipWithoutChecksums() throws IOException {
+    pageChecksumDataGenerator.generateData(file_10M_NOCHECKSUMS_GZIP, 10 * ONE_MILLION, false, GZIP);
+  }
+
+  @Benchmark @BenchmarkMode(Mode.SingleShotTime)
+  public void write10MRowsGzipWithChecksums() throws IOException {
+    pageChecksumDataGenerator.generateData(file_10M_CHECKSUMS_GZIP, 10 * ONE_MILLION, true, GZIP);
+  }
+
+  @Benchmark @BenchmarkMode(Mode.SingleShotTime)
+  public void write10MRowsSnappyWithoutChecksums() throws IOException {
+    pageChecksumDataGenerator.generateData(file_10M_NOCHECKSUMS_SNAPPY, 10 * ONE_MILLION, false, SNAPPY);
+  }
+
+  @Benchmark @BenchmarkMode(Mode.SingleShotTime)
+  public void write10MRowsSnappyWithChecksums() throws IOException {
+    pageChecksumDataGenerator.generateData(file_10M_CHECKSUMS_SNAPPY, 10 * ONE_MILLION, true, SNAPPY);
+  }
+
+}
diff --git a/parquet-column/src/main/java/org/apache/parquet/column/ParquetProperties.java b/parquet-column/src/main/java/org/apache/parquet/column/ParquetProperties.java
index 41e482c..7492b54 100644
--- a/parquet-column/src/main/java/org/apache/parquet/column/ParquetProperties.java
+++ b/parquet-column/src/main/java/org/apache/parquet/column/ParquetProperties.java
@@ -50,6 +50,8 @@
   public static final int DEFAULT_COLUMN_INDEX_TRUNCATE_LENGTH = 64;
   public static final int DEFAULT_PAGE_ROW_COUNT_LIMIT = 20_000;
 
+  public static final boolean DEFAULT_PAGE_WRITE_CHECKSUM_ENABLED = true;
+
   public static final ValuesWriterFactory DEFAULT_VALUES_WRITER_FACTORY = new DefaultValuesWriterFactory();
 
   private static final int MIN_SLAB_SIZE = 64;
@@ -87,10 +89,12 @@
   private final ValuesWriterFactory valuesWriterFactory;
   private final int columnIndexTruncateLength;
   private final int pageRowCountLimit;
+  private final boolean pageWriteChecksumEnabled;
 
   private ParquetProperties(WriterVersion writerVersion, int pageSize, int dictPageSize, boolean enableDict, int minRowCountForPageSizeCheck,
                             int maxRowCountForPageSizeCheck, boolean estimateNextSizeCheck, ByteBufferAllocator allocator,
-                            ValuesWriterFactory writerFactory, int columnIndexMinMaxTruncateLength, int pageRowCountLimit) {
+                            ValuesWriterFactory writerFactory, int columnIndexMinMaxTruncateLength, int pageRowCountLimit,
+                            boolean pageWriteChecksumEnabled) {
     this.pageSizeThreshold = pageSize;
     this.initialSlabSize = CapacityByteArrayOutputStream
       .initialSlabSizeHeuristic(MIN_SLAB_SIZE, pageSizeThreshold, 10);
@@ -105,6 +109,7 @@
     this.valuesWriterFactory = writerFactory;
     this.columnIndexTruncateLength = columnIndexMinMaxTruncateLength;
     this.pageRowCountLimit = pageRowCountLimit;
+    this.pageWriteChecksumEnabled = pageWriteChecksumEnabled;
   }
 
   public ValuesWriter newRepetitionLevelWriter(ColumnDescriptor path) {
@@ -201,6 +206,10 @@
     return pageRowCountLimit;
   }
 
+  public boolean getPageWriteChecksumEnabled() {
+    return pageWriteChecksumEnabled;
+  }
+
   public static Builder builder() {
     return new Builder();
   }
@@ -221,6 +230,7 @@
     private ValuesWriterFactory valuesWriterFactory = DEFAULT_VALUES_WRITER_FACTORY;
     private int columnIndexTruncateLength = DEFAULT_COLUMN_INDEX_TRUNCATE_LENGTH;
     private int pageRowCountLimit = DEFAULT_PAGE_ROW_COUNT_LIMIT;
+    private boolean pageWriteChecksumEnabled = DEFAULT_PAGE_WRITE_CHECKSUM_ENABLED;
 
     private Builder() {
     }
@@ -236,6 +246,7 @@
       this.valuesWriterFactory = toCopy.valuesWriterFactory;
       this.allocator = toCopy.allocator;
       this.pageRowCountLimit = toCopy.pageRowCountLimit;
+      this.pageWriteChecksumEnabled = toCopy.pageWriteChecksumEnabled;
     }
 
     /**
@@ -330,11 +341,17 @@
       return this;
     }
 
+    public Builder withPageWriteChecksumEnabled(boolean val) {
+      this.pageWriteChecksumEnabled = val;
+      return this;
+    }
+
     public ParquetProperties build() {
       ParquetProperties properties =
         new ParquetProperties(writerVersion, pageSize, dictPageSize,
           enableDict, minRowCountForPageSizeCheck, maxRowCountForPageSizeCheck,
-          estimateNextSizeCheck, allocator, valuesWriterFactory, columnIndexTruncateLength, pageRowCountLimit);
+          estimateNextSizeCheck, allocator, valuesWriterFactory, columnIndexTruncateLength,
+          pageRowCountLimit, pageWriteChecksumEnabled);
       // we pass a constructed but uninitialized factory to ParquetProperties above as currently
       // creation of ValuesWriters is invoked from within ParquetProperties. In the future
       // we'd like to decouple that and won't need to pass an object to properties and then pass the
diff --git a/parquet-column/src/main/java/org/apache/parquet/column/page/Page.java b/parquet-column/src/main/java/org/apache/parquet/column/page/Page.java
index 606f9f7..0489449 100644
--- a/parquet-column/src/main/java/org/apache/parquet/column/page/Page.java
+++ b/parquet-column/src/main/java/org/apache/parquet/column/page/Page.java
@@ -18,6 +18,8 @@
  */
 package org.apache.parquet.column.page;
 
+import java.util.OptionalInt;
+
 /**
  * one page in a chunk
  */
@@ -43,4 +45,18 @@
     return uncompressedSize;
   }
 
+  // Note: the following field is only used for testing purposes and are NOT used in checksum
+  // verification. There crc value here will merely be a copy of the actual crc field read in
+  // ParquetFileReader.Chunk.readAllPages()
+  private OptionalInt crc = OptionalInt.empty();
+
+  // Visible for testing
+  public void setCrc(int crc) {
+    this.crc = OptionalInt.of(crc);
+  }
+
+  // Visible for testing
+  public OptionalInt getCrc() {
+    return crc;
+  }
 }
diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/HadoopReadOptions.java b/parquet-hadoop/src/main/java/org/apache/parquet/HadoopReadOptions.java
index 4f5c78a..13ab80b 100644
--- a/parquet-hadoop/src/main/java/org/apache/parquet/HadoopReadOptions.java
+++ b/parquet-hadoop/src/main/java/org/apache/parquet/HadoopReadOptions.java
@@ -30,9 +30,10 @@
 
 import static org.apache.parquet.hadoop.ParquetInputFormat.COLUMN_INDEX_FILTERING_ENABLED;
 import static org.apache.parquet.hadoop.ParquetInputFormat.DICTIONARY_FILTERING_ENABLED;
+import static org.apache.parquet.hadoop.ParquetInputFormat.getFilter;
+import static org.apache.parquet.hadoop.ParquetInputFormat.PAGE_VERIFY_CHECKSUM_ENABLED;
 import static org.apache.parquet.hadoop.ParquetInputFormat.RECORD_FILTERING_ENABLED;
 import static org.apache.parquet.hadoop.ParquetInputFormat.STATS_FILTERING_ENABLED;
-import static org.apache.parquet.hadoop.ParquetInputFormat.getFilter;
 import static org.apache.parquet.hadoop.UnmaterializableRecordCounter.BAD_RECORD_THRESHOLD_CONF_KEY;
 
 public class HadoopReadOptions extends ParquetReadOptions {
@@ -45,6 +46,7 @@
                             boolean useDictionaryFilter,
                             boolean useRecordFilter,
                             boolean useColumnIndexFilter,
+                            boolean usePageChecksumVerification,
                             FilterCompat.Filter recordFilter,
                             MetadataFilter metadataFilter,
                             CompressionCodecFactory codecFactory,
@@ -54,7 +56,8 @@
                             Configuration conf) {
     super(
         useSignedStringMinMax, useStatsFilter, useDictionaryFilter, useRecordFilter, useColumnIndexFilter,
-        recordFilter, metadataFilter, codecFactory, allocator, maxAllocationSize, properties
+        usePageChecksumVerification, recordFilter, metadataFilter, codecFactory, allocator, maxAllocationSize,
+        properties
     );
     this.conf = conf;
   }
@@ -86,6 +89,8 @@
       useStatsFilter(conf.getBoolean(STATS_FILTERING_ENABLED, true));
       useRecordFilter(conf.getBoolean(RECORD_FILTERING_ENABLED, true));
       useColumnIndexFilter(conf.getBoolean(COLUMN_INDEX_FILTERING_ENABLED, true));
+      usePageChecksumVerification(conf.getBoolean(PAGE_VERIFY_CHECKSUM_ENABLED,
+        usePageChecksumVerification));
       withCodecFactory(HadoopCodecs.newFactory(conf, 0));
       withRecordFilter(getFilter(conf));
       withMaxAllocationInBytes(conf.getInt(ALLOCATION_SIZE, 8388608));
@@ -98,9 +103,9 @@
     @Override
     public ParquetReadOptions build() {
       return new HadoopReadOptions(
-          useSignedStringMinMax, useStatsFilter, useDictionaryFilter, useRecordFilter, useColumnIndexFilter,
-          recordFilter, metadataFilter, codecFactory, allocator, maxAllocationSize, properties,
-          conf);
+        useSignedStringMinMax, useStatsFilter, useDictionaryFilter, useRecordFilter,
+        useColumnIndexFilter, usePageChecksumVerification, recordFilter, metadataFilter,
+        codecFactory, allocator, maxAllocationSize, properties, conf);
     }
   }
 }
diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/ParquetReadOptions.java b/parquet-hadoop/src/main/java/org/apache/parquet/ParquetReadOptions.java
index 846d3bd..f059023 100644
--- a/parquet-hadoop/src/main/java/org/apache/parquet/ParquetReadOptions.java
+++ b/parquet-hadoop/src/main/java/org/apache/parquet/ParquetReadOptions.java
@@ -40,12 +40,14 @@
   private static final boolean DICTIONARY_FILTERING_ENABLED_DEFAULT = true;
   private static final boolean COLUMN_INDEX_FILTERING_ENABLED_DEFAULT = true;
   private static final int ALLOCATION_SIZE_DEFAULT = 8388608; // 8MB
+  private static final boolean PAGE_VERIFY_CHECKSUM_ENABLED_DEFAULT = false;
 
   private final boolean useSignedStringMinMax;
   private final boolean useStatsFilter;
   private final boolean useDictionaryFilter;
   private final boolean useRecordFilter;
   private final boolean useColumnIndexFilter;
+  private final boolean usePageChecksumVerification;
   private final FilterCompat.Filter recordFilter;
   private final ParquetMetadataConverter.MetadataFilter metadataFilter;
   private final CompressionCodecFactory codecFactory;
@@ -58,6 +60,7 @@
                      boolean useDictionaryFilter,
                      boolean useRecordFilter,
                      boolean useColumnIndexFilter,
+                     boolean usePageChecksumVerification,
                      FilterCompat.Filter recordFilter,
                      ParquetMetadataConverter.MetadataFilter metadataFilter,
                      CompressionCodecFactory codecFactory,
@@ -69,6 +72,7 @@
     this.useDictionaryFilter = useDictionaryFilter;
     this.useRecordFilter = useRecordFilter;
     this.useColumnIndexFilter = useColumnIndexFilter;
+    this.usePageChecksumVerification = usePageChecksumVerification;
     this.recordFilter = recordFilter;
     this.metadataFilter = metadataFilter;
     this.codecFactory = codecFactory;
@@ -97,6 +101,10 @@
     return useColumnIndexFilter;
   }
 
+  public boolean usePageChecksumVerification() {
+    return usePageChecksumVerification;
+  }
+
   public FilterCompat.Filter getRecordFilter() {
     return recordFilter;
   }
@@ -143,6 +151,7 @@
     protected boolean useDictionaryFilter = DICTIONARY_FILTERING_ENABLED_DEFAULT;
     protected boolean useRecordFilter = RECORD_FILTERING_ENABLED_DEFAULT;
     protected boolean useColumnIndexFilter = COLUMN_INDEX_FILTERING_ENABLED_DEFAULT;
+    protected boolean usePageChecksumVerification = PAGE_VERIFY_CHECKSUM_ENABLED_DEFAULT;
     protected FilterCompat.Filter recordFilter = null;
     protected ParquetMetadataConverter.MetadataFilter metadataFilter = NO_FILTER;
     // the page size parameter isn't used when only using the codec factory to get decompressors
@@ -200,6 +209,16 @@
       return useColumnIndexFilter(true);
     }
 
+
+    public Builder usePageChecksumVerification(boolean usePageChecksumVerification) {
+      this.usePageChecksumVerification = usePageChecksumVerification;
+      return this;
+    }
+
+    public Builder usePageChecksumVerification() {
+      return usePageChecksumVerification(true);
+    }
+
     public Builder withRecordFilter(FilterCompat.Filter rowGroupFilter) {
       this.recordFilter = rowGroupFilter;
       return this;
@@ -235,6 +254,11 @@
       return this;
     }
 
+    public Builder withPageChecksumVerification(boolean val) {
+      this.usePageChecksumVerification = val;
+      return this;
+    }
+
     public Builder set(String key, String value) {
       properties.put(key, value);
       return this;
@@ -249,6 +273,7 @@
       withMetadataFilter(options.metadataFilter);
       withCodecFactory(options.codecFactory);
       withAllocator(options.allocator);
+      withPageChecksumVerification(options.usePageChecksumVerification);
       for (Map.Entry<String, String> keyValue : options.properties.entrySet()) {
         set(keyValue.getKey(), keyValue.getValue());
       }
@@ -257,8 +282,9 @@
 
     public ParquetReadOptions build() {
       return new ParquetReadOptions(
-          useSignedStringMinMax, useStatsFilter, useDictionaryFilter, useRecordFilter, useColumnIndexFilter,
-          recordFilter, metadataFilter, codecFactory, allocator, maxAllocationSize, properties);
+        useSignedStringMinMax, useStatsFilter, useDictionaryFilter, useRecordFilter,
+        useColumnIndexFilter, usePageChecksumVerification, recordFilter, metadataFilter,
+        codecFactory, allocator, maxAllocationSize, properties);
     }
   }
 }
diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/format/converter/ParquetMetadataConverter.java b/parquet-hadoop/src/main/java/org/apache/parquet/format/converter/ParquetMetadataConverter.java
index fb0ca7b..deeda65 100644
--- a/parquet-hadoop/src/main/java/org/apache/parquet/format/converter/ParquetMetadataConverter.java
+++ b/parquet-hadoop/src/main/java/org/apache/parquet/format/converter/ParquetMetadataConverter.java
@@ -1365,13 +1365,29 @@
   }
 
   private PageHeader newDataPageHeader(
+    int uncompressedSize, int compressedSize,
+    int valueCount,
+    org.apache.parquet.column.Encoding rlEncoding,
+    org.apache.parquet.column.Encoding dlEncoding,
+    org.apache.parquet.column.Encoding valuesEncoding) {
+    PageHeader pageHeader = new PageHeader(PageType.DATA_PAGE, uncompressedSize, compressedSize);
+    pageHeader.setData_page_header(new DataPageHeader(
+      valueCount,
+      getEncoding(valuesEncoding),
+      getEncoding(dlEncoding),
+      getEncoding(rlEncoding)));
+    return pageHeader;
+  }
+
+  private PageHeader newDataPageHeader(
       int uncompressedSize, int compressedSize,
       int valueCount,
       org.apache.parquet.column.Encoding rlEncoding,
       org.apache.parquet.column.Encoding dlEncoding,
-      org.apache.parquet.column.Encoding valuesEncoding) {
+      org.apache.parquet.column.Encoding valuesEncoding,
+      int crc) {
     PageHeader pageHeader = new PageHeader(PageType.DATA_PAGE, uncompressedSize, compressedSize);
-    // TODO: pageHeader.crc = ...;
+    pageHeader.setCrc(crc);
     pageHeader.setData_page_header(new DataPageHeader(
         valueCount,
         getEncoding(valuesEncoding),
@@ -1398,19 +1414,37 @@
   }
 
   public void writeDataPageV1Header(
+    int uncompressedSize,
+    int compressedSize,
+    int valueCount,
+    org.apache.parquet.column.Encoding rlEncoding,
+    org.apache.parquet.column.Encoding dlEncoding,
+    org.apache.parquet.column.Encoding valuesEncoding,
+    OutputStream to) throws IOException {
+    writePageHeader(newDataPageHeader(uncompressedSize,
+      compressedSize,
+      valueCount,
+      rlEncoding,
+      dlEncoding,
+      valuesEncoding), to);
+  }
+
+  public void writeDataPageV1Header(
       int uncompressedSize,
       int compressedSize,
       int valueCount,
       org.apache.parquet.column.Encoding rlEncoding,
       org.apache.parquet.column.Encoding dlEncoding,
       org.apache.parquet.column.Encoding valuesEncoding,
+      int crc,
       OutputStream to) throws IOException {
     writePageHeader(newDataPageHeader(uncompressedSize,
                                       compressedSize,
                                       valueCount,
                                       rlEncoding,
                                       dlEncoding,
-                                      valuesEncoding), to);
+                                      valuesEncoding,
+                                      crc), to);
   }
 
   public void writeDataPageV2Header(
@@ -1443,13 +1477,22 @@
   }
 
   public void writeDictionaryPageHeader(
-      int uncompressedSize, int compressedSize, int valueCount,
-      org.apache.parquet.column.Encoding valuesEncoding, OutputStream to) throws IOException {
+    int uncompressedSize, int compressedSize, int valueCount,
+    org.apache.parquet.column.Encoding valuesEncoding, OutputStream to) throws IOException {
     PageHeader pageHeader = new PageHeader(PageType.DICTIONARY_PAGE, uncompressedSize, compressedSize);
     pageHeader.setDictionary_page_header(new DictionaryPageHeader(valueCount, getEncoding(valuesEncoding)));
     writePageHeader(pageHeader, to);
   }
 
+  public void writeDictionaryPageHeader(
+      int uncompressedSize, int compressedSize, int valueCount,
+      org.apache.parquet.column.Encoding valuesEncoding, int crc, OutputStream to) throws IOException {
+    PageHeader pageHeader = new PageHeader(PageType.DICTIONARY_PAGE, uncompressedSize, compressedSize);
+    pageHeader.setCrc(crc);
+    pageHeader.setDictionary_page_header(new DictionaryPageHeader(valueCount, getEncoding(valuesEncoding)));
+    writePageHeader(pageHeader, to);
+  }
+
   private static BoundaryOrder toParquetBoundaryOrder(
       org.apache.parquet.internal.column.columnindex.BoundaryOrder boundaryOrder) {
     switch (boundaryOrder) {
diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ColumnChunkPageReadStore.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ColumnChunkPageReadStore.java
index 0ca9fe3..2e646e7 100644
--- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ColumnChunkPageReadStore.java
+++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ColumnChunkPageReadStore.java
@@ -99,8 +99,9 @@
         public DataPage visit(DataPageV1 dataPageV1) {
           try {
             BytesInput decompressed = decompressor.decompress(dataPageV1.getBytes(), dataPageV1.getUncompressedSize());
+            final DataPageV1 decompressedPage;
             if (offsetIndex == null) {
-              return new DataPageV1(
+              decompressedPage = new DataPageV1(
                   decompressed,
                   dataPageV1.getValueCount(),
                   dataPageV1.getUncompressedSize(),
@@ -110,7 +111,7 @@
                   dataPageV1.getValueEncoding());
             } else {
               long firstRowIndex = offsetIndex.getFirstRowIndex(currentPageIndex);
-              return new DataPageV1(
+              decompressedPage = new DataPageV1(
                   decompressed,
                   dataPageV1.getValueCount(),
                   dataPageV1.getUncompressedSize(),
@@ -121,6 +122,10 @@
                   dataPageV1.getDlEncoding(),
                   dataPageV1.getValueEncoding());
             }
+            if (dataPageV1.getCrc().isPresent()) {
+              decompressedPage.setCrc(dataPageV1.getCrc().getAsInt());
+            }
+            return decompressedPage;
           } catch (IOException e) {
             throw new ParquetDecodingException("could not decompress page", e);
           }
@@ -185,10 +190,14 @@
         return null;
       }
       try {
-        return new DictionaryPage(
-            decompressor.decompress(compressedDictionaryPage.getBytes(), compressedDictionaryPage.getUncompressedSize()),
-            compressedDictionaryPage.getDictionarySize(),
-            compressedDictionaryPage.getEncoding());
+        DictionaryPage decompressedPage = new DictionaryPage(
+          decompressor.decompress(compressedDictionaryPage.getBytes(), compressedDictionaryPage.getUncompressedSize()),
+          compressedDictionaryPage.getDictionarySize(),
+          compressedDictionaryPage.getEncoding());
+        if (compressedDictionaryPage.getCrc().isPresent()) {
+          decompressedPage.setCrc(compressedDictionaryPage.getCrc().getAsInt());
+        }
+        return decompressedPage;
       } catch (IOException e) {
         throw new ParquetDecodingException("Could not decompress dictionary page", e);
       }
diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ColumnChunkPageWriteStore.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ColumnChunkPageWriteStore.java
index f85d374..72f26fc 100644
--- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ColumnChunkPageWriteStore.java
+++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ColumnChunkPageWriteStore.java
@@ -26,11 +26,13 @@
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.zip.CRC32;
 
 import org.apache.parquet.bytes.BytesInput;
 import org.apache.parquet.bytes.ConcatenatingByteArrayCollector;
 import org.apache.parquet.column.ColumnDescriptor;
 import org.apache.parquet.column.Encoding;
+import org.apache.parquet.column.ParquetProperties;
 import org.apache.parquet.column.page.DictionaryPage;
 import org.apache.parquet.column.page.PageWriteStore;
 import org.apache.parquet.column.page.PageWriter;
@@ -74,16 +76,22 @@
     private Statistics totalStatistics;
     private final ByteBufferAllocator allocator;
 
+    private final CRC32 crc;
+    boolean pageWriteChecksumEnabled;
+
     private ColumnChunkPageWriter(ColumnDescriptor path,
                                   BytesCompressor compressor,
                                   ByteBufferAllocator allocator,
-                                  int columnIndexTruncateLength) {
+                                  int columnIndexTruncateLength,
+                                  boolean pageWriteChecksumEnabled) {
       this.path = path;
       this.compressor = compressor;
       this.allocator = allocator;
       this.buf = new ConcatenatingByteArrayCollector();
       this.columnIndexBuilder = ColumnIndexBuilder.getBuilder(path.getPrimitiveType(), columnIndexTruncateLength);
       this.offsetIndexBuilder = OffsetIndexBuilder.getBuilder();
+      this.pageWriteChecksumEnabled = pageWriteChecksumEnabled;
+      this.crc = pageWriteChecksumEnabled ? new CRC32() : null;
     }
 
     @Override
@@ -119,7 +127,20 @@
                 + compressedSize);
       }
       tempOutputStream.reset();
-      parquetMetadataConverter.writeDataPageV1Header(
+      if (pageWriteChecksumEnabled) {
+        crc.reset();
+        crc.update(compressedBytes.toByteArray());
+        parquetMetadataConverter.writeDataPageV1Header(
+          (int)uncompressedSize,
+          (int)compressedSize,
+          valueCount,
+          rlEncoding,
+          dlEncoding,
+          valuesEncoding,
+          (int) crc.getValue(),
+          tempOutputStream);
+      } else {
+        parquetMetadataConverter.writeDataPageV1Header(
           (int)uncompressedSize,
           (int)compressedSize,
           valueCount,
@@ -127,6 +148,7 @@
           dlEncoding,
           valuesEncoding,
           tempOutputStream);
+      }
       this.uncompressedLength += uncompressedSize;
       this.compressedLength += compressedSize;
       this.totalValueCount += valueCount;
@@ -273,10 +295,16 @@
   private final MessageType schema;
 
   public ColumnChunkPageWriteStore(BytesCompressor compressor, MessageType schema, ByteBufferAllocator allocator,
-      int columnIndexTruncateLength) {
+                                   int columnIndexTruncateLength) {
+    this(compressor, schema, allocator, columnIndexTruncateLength,
+      ParquetProperties.DEFAULT_PAGE_WRITE_CHECKSUM_ENABLED);
+  }
+
+  public ColumnChunkPageWriteStore(BytesCompressor compressor, MessageType schema, ByteBufferAllocator allocator,
+      int columnIndexTruncateLength, boolean pageWriteChecksumEnabled) {
     this.schema = schema;
     for (ColumnDescriptor path : schema.getColumns()) {
-      writers.put(path, new ColumnChunkPageWriter(path, compressor, allocator, columnIndexTruncateLength));
+      writers.put(path, new ColumnChunkPageWriter(path, compressor, allocator, columnIndexTruncateLength, pageWriteChecksumEnabled));
     }
   }
 
diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/InternalParquetRecordWriter.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/InternalParquetRecordWriter.java
index d8af379..c3da323 100644
--- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/InternalParquetRecordWriter.java
+++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/InternalParquetRecordWriter.java
@@ -102,7 +102,7 @@
 
   private void initStore() {
     pageStore = new ColumnChunkPageWriteStore(compressor, schema, props.getAllocator(),
-        props.getColumnIndexTruncateLength());
+        props.getColumnIndexTruncateLength(), props.getPageWriteChecksumEnabled());
     columnStore = props.newColumnWriteStore(schema, pageStore);
     MessageColumnIO columnIO = new ColumnIOFactory(validating).getColumnIO(schema);
     this.recordConsumer = columnIO.getRecordWriter(columnStore);
diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java
index 8e205f6..4acd4c4 100644
--- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java
+++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java
@@ -50,6 +50,7 @@
 import java.util.concurrent.Future;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
+import java.util.zip.CRC32;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileStatus;
@@ -113,6 +114,8 @@
 
   private final ParquetMetadataConverter converter;
 
+  private final CRC32 crc;
+
   /**
    * for files provided, check if there's a summary file.
    * If a summary file is found it is used otherwise the file footer is used.
@@ -659,6 +662,7 @@
     for (ColumnDescriptor col : columns) {
       paths.put(ColumnPath.get(col.getPath()), col);
     }
+    this.crc = options.usePageChecksumVerification() ? new CRC32() : null;
   }
 
   /**
@@ -695,6 +699,7 @@
     for (ColumnDescriptor col : footer.getFileMetaData().getSchema().getColumns()) {
       paths.put(ColumnPath.get(col.getPath()), col);
     }
+    this.crc = options.usePageChecksumVerification() ? new CRC32() : null;
   }
 
   public ParquetFileReader(InputFile file, ParquetReadOptions options) throws IOException {
@@ -717,6 +722,7 @@
     for (ColumnDescriptor col : footer.getFileMetaData().getSchema().getColumns()) {
       paths.put(ColumnPath.get(col.getPath()), col);
     }
+    this.crc = options.usePageChecksumVerification() ? new CRC32() : null;
   }
 
   private static <T> List<T> listWithNulls(int size) {
@@ -1164,6 +1170,18 @@
     }
 
     /**
+     * Calculate checksum of input bytes, throw decoding exception if it does not match the provided
+     * reference crc
+     */
+    private void verifyCrc(int referenceCrc, byte[] bytes, String exceptionMsg) {
+      crc.reset();
+      crc.update(bytes);
+      if (crc.getValue() != ((long) referenceCrc & 0xffffffffL)) {
+        throw new ParquetDecodingException(exceptionMsg);
+      }
+    }
+
+    /**
      * Read all of the pages in a given column chunk.
      * @return the list of pages
      */
@@ -1178,36 +1196,54 @@
         PageHeader pageHeader = readPageHeader();
         int uncompressedPageSize = pageHeader.getUncompressed_page_size();
         int compressedPageSize = pageHeader.getCompressed_page_size();
+        final BytesInput pageBytes;
         switch (pageHeader.type) {
           case DICTIONARY_PAGE:
             // there is only one dictionary page per column chunk
             if (dictionaryPage != null) {
               throw new ParquetDecodingException("more than one dictionary page in column " + descriptor.col);
             }
+            pageBytes = this.readAsBytesInput(compressedPageSize);
+            if (options.usePageChecksumVerification() && pageHeader.isSetCrc()) {
+              verifyCrc(pageHeader.getCrc(), pageBytes.toByteArray(),
+                "could not verify dictionary page integrity, CRC checksum verification failed");
+            }
             DictionaryPageHeader dicHeader = pageHeader.getDictionary_page_header();
             dictionaryPage =
                 new DictionaryPage(
-                    this.readAsBytesInput(compressedPageSize),
+                    pageBytes,
                     uncompressedPageSize,
                     dicHeader.getNum_values(),
                     converter.getEncoding(dicHeader.getEncoding())
                     );
+            // Copy crc to new page, used for testing
+            if (pageHeader.isSetCrc()) {
+              dictionaryPage.setCrc(pageHeader.getCrc());
+            }
             break;
           case DATA_PAGE:
             DataPageHeader dataHeaderV1 = pageHeader.getData_page_header();
-            pagesInChunk.add(
-                new DataPageV1(
-                    this.readAsBytesInput(compressedPageSize),
-                    dataHeaderV1.getNum_values(),
-                    uncompressedPageSize,
-                    converter.fromParquetStatistics(
-                        getFileMetaData().getCreatedBy(),
-                        dataHeaderV1.getStatistics(),
-                        type),
-                    converter.getEncoding(dataHeaderV1.getRepetition_level_encoding()),
-                    converter.getEncoding(dataHeaderV1.getDefinition_level_encoding()),
-                    converter.getEncoding(dataHeaderV1.getEncoding())
-                    ));
+            pageBytes = this.readAsBytesInput(compressedPageSize);
+            if (options.usePageChecksumVerification() && pageHeader.isSetCrc()) {
+              verifyCrc(pageHeader.getCrc(), pageBytes.toByteArray(),
+                "could not verify page integrity, CRC checksum verification failed");
+            }
+            DataPageV1 dataPageV1 = new DataPageV1(
+              pageBytes,
+              dataHeaderV1.getNum_values(),
+              uncompressedPageSize,
+              converter.fromParquetStatistics(
+                getFileMetaData().getCreatedBy(),
+                dataHeaderV1.getStatistics(),
+                type),
+              converter.getEncoding(dataHeaderV1.getRepetition_level_encoding()),
+              converter.getEncoding(dataHeaderV1.getDefinition_level_encoding()),
+              converter.getEncoding(dataHeaderV1.getEncoding()));
+            // Copy crc to new page, used for testing
+            if (pageHeader.isSetCrc()) {
+              dataPageV1.setCrc(pageHeader.getCrc());
+            }
+            pagesInChunk.add(dataPageV1);
             valuesCountReadSoFar += dataHeaderV1.getNum_values();
             ++dataPageCountReadSoFar;
             break;
diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileWriter.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileWriter.java
index c875702..50cd31e 100644
--- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileWriter.java
+++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileWriter.java
@@ -33,6 +33,7 @@
 import java.util.Map;
 import java.util.Map.Entry;
 import java.util.Set;
+import java.util.zip.CRC32;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FSDataInputStream;
@@ -141,6 +142,9 @@
   // set when end is called
   private ParquetMetadata footer = null;
 
+  private final CRC32 crc;
+  private boolean pageWriteChecksumEnabled;
+
   /**
    * Captures the order in which methods should be called
    */
@@ -200,7 +204,7 @@
    */
   @Deprecated
   public ParquetFileWriter(Configuration configuration, MessageType schema,
-      Path file) throws IOException {
+                           Path file) throws IOException {
     this(HadoopOutputFile.fromPath(file, configuration),
         schema, Mode.CREATE, DEFAULT_BLOCK_SIZE, MAX_PADDING_SIZE_DEFAULT);
   }
@@ -253,7 +257,8 @@
                            long rowGroupSize, int maxPaddingSize)
       throws IOException {
     this(file, schema, mode, rowGroupSize, maxPaddingSize,
-        ParquetProperties.DEFAULT_COLUMN_INDEX_TRUNCATE_LENGTH);
+        ParquetProperties.DEFAULT_COLUMN_INDEX_TRUNCATE_LENGTH,
+        ParquetProperties.DEFAULT_PAGE_WRITE_CHECKSUM_ENABLED);
   }
   /**
    * @param file OutputFile to create or overwrite
@@ -262,10 +267,12 @@
    * @param rowGroupSize the row group size
    * @param maxPaddingSize the maximum padding
    * @param columnIndexTruncateLength the length which the min/max values in column indexes tried to be truncated to
+   * @param pageWriteChecksumEnabled whether to write out page level checksums
    * @throws IOException if the file can not be created
    */
   public ParquetFileWriter(OutputFile file, MessageType schema, Mode mode,
-                           long rowGroupSize, int maxPaddingSize, int columnIndexTruncateLength)
+                           long rowGroupSize, int maxPaddingSize, int columnIndexTruncateLength,
+                           boolean pageWriteChecksumEnabled)
       throws IOException {
     TypeUtil.checkValidWriteSchema(schema);
 
@@ -287,6 +294,8 @@
 
     this.encodingStatsBuilder = new EncodingStats.Builder();
     this.columnIndexTruncateLength = columnIndexTruncateLength;
+    this.pageWriteChecksumEnabled = pageWriteChecksumEnabled;
+    this.crc = pageWriteChecksumEnabled ? new CRC32() : null;
   }
 
   /**
@@ -311,6 +320,8 @@
     this.encodingStatsBuilder = new EncodingStats.Builder();
     // no truncation is needed for testing
     this.columnIndexTruncateLength = Integer.MAX_VALUE;
+    this.pageWriteChecksumEnabled = ParquetOutputFormat.getPageWriteChecksumEnabled(configuration);
+    this.crc = pageWriteChecksumEnabled ? new CRC32() : null;
   }
   /**
    * start the file
@@ -380,12 +391,24 @@
     currentChunkDictionaryPageOffset = out.getPos();
     int uncompressedSize = dictionaryPage.getUncompressedSize();
     int compressedPageSize = (int)dictionaryPage.getBytes().size(); // TODO: fix casts
-    metadataConverter.writeDictionaryPageHeader(
+    if (pageWriteChecksumEnabled) {
+      crc.reset();
+      crc.update(dictionaryPage.getBytes().toByteArray());
+      metadataConverter.writeDictionaryPageHeader(
+        uncompressedSize,
+        compressedPageSize,
+        dictionaryPage.getDictionarySize(),
+        dictionaryPage.getEncoding(),
+        (int) crc.getValue(),
+        out);
+    } else {
+      metadataConverter.writeDictionaryPageHeader(
         uncompressedSize,
         compressedPageSize,
         dictionaryPage.getDictionarySize(),
         dictionaryPage.getEncoding(),
         out);
+    }
     long headerSize = out.getPos() - currentChunkDictionaryPageOffset;
     this.uncompressedLength += uncompressedSize + headerSize;
     this.compressedLength += compressedPageSize + headerSize;
@@ -505,13 +528,26 @@
     }
     LOG.debug("{}: write data page: {} values", beforeHeader, valueCount);
     int compressedPageSize = (int) bytes.size();
-    metadataConverter.writeDataPageV1Header(
+    if (pageWriteChecksumEnabled) {
+      crc.reset();
+      crc.update(bytes.toByteArray());
+      metadataConverter.writeDataPageV1Header(
+        uncompressedPageSize, compressedPageSize,
+        valueCount,
+        rlEncoding,
+        dlEncoding,
+        valuesEncoding,
+        (int) crc.getValue(),
+        out);
+    } else {
+      metadataConverter.writeDataPageV1Header(
         uncompressedPageSize, compressedPageSize,
         valueCount,
         rlEncoding,
         dlEncoding,
         valuesEncoding,
         out);
+    }
     long headerSize = out.getPos() - beforeHeader;
     this.uncompressedLength += uncompressedPageSize + headerSize;
     this.compressedLength += compressedPageSize + headerSize;
diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetInputFormat.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetInputFormat.java
index b8fce2f..7eab611 100644
--- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetInputFormat.java
+++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetInputFormat.java
@@ -135,6 +135,11 @@
   public static final String COLUMN_INDEX_FILTERING_ENABLED = "parquet.filter.columnindex.enabled";
 
   /**
+   * key to configure whether page level checksum verification is enabled
+   */
+  public static final String PAGE_VERIFY_CHECKSUM_ENABLED = "parquet.page.verify-checksum.enabled";
+
+  /**
    * key to turn on or off task side metadata loading (default true)
    * if true then metadata is read on the task side and some tasks may finish immediately.
    * if false metadata is read on the client which is slower if there is a lot of metadata but tasks will only be spawn if there is work to do.
diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetOutputFormat.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetOutputFormat.java
index cd25b23..afcbbff 100644
--- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetOutputFormat.java
+++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetOutputFormat.java
@@ -145,6 +145,7 @@
   public static final String ESTIMATE_PAGE_SIZE_CHECK = "parquet.page.size.check.estimate";
   public static final String COLUMN_INDEX_TRUNCATE_LENGTH = "parquet.columnindex.truncate.length";
   public static final String PAGE_ROW_COUNT_LIMIT = "parquet.page.row.count.limit";
+  public static final String PAGE_WRITE_CHECKSUM_ENABLED = "parquet.page.write-checksum.enabled";
 
   public static JobSummaryLevel getJobSummaryLevel(Configuration conf) {
     String level = conf.get(JOB_SUMMARY_LEVEL);
@@ -338,6 +339,18 @@
     return conf.getInt(PAGE_ROW_COUNT_LIMIT, ParquetProperties.DEFAULT_PAGE_ROW_COUNT_LIMIT);
   }
 
+  public static void setPageWriteChecksumEnabled(JobContext jobContext, boolean val) {
+    setPageWriteChecksumEnabled(getConfiguration(jobContext), val);
+  }
+
+  public static void setPageWriteChecksumEnabled(Configuration conf, boolean val) {
+    conf.setBoolean(PAGE_WRITE_CHECKSUM_ENABLED, val);
+  }
+
+  public static boolean getPageWriteChecksumEnabled(Configuration conf) {
+    return conf.getBoolean(PAGE_WRITE_CHECKSUM_ENABLED, ParquetProperties.DEFAULT_PAGE_WRITE_CHECKSUM_ENABLED);
+  }
+
   private WriteSupport<T> writeSupport;
   private ParquetOutputCommitter committer;
 
@@ -409,6 +422,7 @@
         .withMaxRowCountForPageSizeCheck(getMaxRowCountForPageSizeCheck(conf))
         .withColumnIndexTruncateLength(getColumnIndexTruncateLength(conf))
         .withPageRowCountLimit(getPageRowCountLimit(conf))
+        .withPageWriteChecksumEnabled(getPageWriteChecksumEnabled(conf))
         .build();
 
     long blockSize = getLongBlockSize(conf);
@@ -428,11 +442,13 @@
       LOG.info("Max row count for page size check is: {}", props.getMaxRowCountForPageSizeCheck());
       LOG.info("Truncate length for column indexes is: {}", props.getColumnIndexTruncateLength());
       LOG.info("Page row count limit to {}", props.getPageRowCountLimit());
+      LOG.info("Writing page checksums is: {}", props.getPageWriteChecksumEnabled() ? "on" : "off");
     }
 
     WriteContext init = writeSupport.init(conf);
     ParquetFileWriter w = new ParquetFileWriter(HadoopOutputFile.fromPath(file, conf),
-        init.getSchema(), mode, blockSize, maxPaddingSize, props.getColumnIndexTruncateLength());
+        init.getSchema(), mode, blockSize, maxPaddingSize, props.getColumnIndexTruncateLength(),
+        props.getPageWriteChecksumEnabled());
     w.start();
 
     float maxLoad = conf.getFloat(ParquetOutputFormat.MEMORY_POOL_RATIO,
diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetReader.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetReader.java
index de20808..28e1967 100644
--- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetReader.java
+++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetReader.java
@@ -280,6 +280,16 @@
       return this;
     }
 
+    public Builder<T> usePageChecksumVerification(boolean usePageChecksumVerification) {
+      optionsBuilder.usePageChecksumVerification(usePageChecksumVerification);
+      return this;
+    }
+
+    public Builder<T> usePageChecksumVerification() {
+      optionsBuilder.usePageChecksumVerification();
+      return this;
+    }
+
     public Builder<T> withFileRange(long start, long end) {
       optionsBuilder.withRange(start, end);
       return this;
diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetWriter.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetWriter.java
index 1ed5e32..7fb7186 100644
--- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetWriter.java
+++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetWriter.java
@@ -278,7 +278,8 @@
     MessageType schema = writeContext.getSchema();
 
     ParquetFileWriter fileWriter = new ParquetFileWriter(
-        file, schema, mode, rowGroupSize, maxPaddingSize, encodingProps.getColumnIndexTruncateLength());
+      file, schema, mode, rowGroupSize, maxPaddingSize,
+      encodingProps.getColumnIndexTruncateLength(), encodingProps.getPageWriteChecksumEnabled());
     fileWriter.start();
 
     this.codecFactory = new CodecFactory(conf, encodingProps.getPageSizeThreshold());
@@ -516,6 +517,27 @@
     }
 
     /**
+     * Enables writing page level checksums for the constructed writer.
+     *
+     * @return this builder for method chaining.
+     */
+    public SELF enablePageWriteChecksum() {
+      encodingPropsBuilder.withPageWriteChecksumEnabled(true);
+      return self();
+    }
+
+    /**
+     * Enables writing page level checksums for the constructed writer.
+     *
+     * @param enablePageWriteChecksum whether page checksums should be written out
+     * @return this builder for method chaining.
+     */
+    public SELF withPageWriteChecksumEnabled(boolean enablePageWriteChecksum) {
+      encodingPropsBuilder.withPageWriteChecksumEnabled(enablePageWriteChecksum);
+      return self();
+    }
+
+    /**
      * Set a property that will be available to the read path. For writers that use a Hadoop
      * configuration, this is the recommended way to add configuration values.
      *
diff --git a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestColumnChunkPageWriteStore.java b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestColumnChunkPageWriteStore.java
index c353ee3..88c8d83 100644
--- a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestColumnChunkPageWriteStore.java
+++ b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestColumnChunkPageWriteStore.java
@@ -46,6 +46,7 @@
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.parquet.column.ParquetProperties;
 import org.junit.Before;
 import org.junit.Test;
 import org.mockito.InOrder;
diff --git a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestDataPageV1Checksums.java b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestDataPageV1Checksums.java
new file mode 100644
index 0000000..61a9d63
--- /dev/null
+++ b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestDataPageV1Checksums.java
@@ -0,0 +1,563 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.parquet.hadoop;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Random;
+import java.util.zip.CRC32;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.parquet.bytes.BytesInput;
+import org.apache.parquet.bytes.HeapByteBufferAllocator;
+import org.apache.parquet.column.ColumnDescriptor;
+import org.apache.parquet.column.Encoding;
+import org.apache.parquet.column.page.DataPageV1;
+import org.apache.parquet.column.page.DictionaryPage;
+import org.apache.parquet.column.page.Page;
+import org.apache.parquet.column.page.PageReadStore;
+import org.apache.parquet.column.page.PageWriter;
+import org.apache.parquet.column.statistics.Statistics;
+import org.apache.parquet.example.data.Group;
+import org.apache.parquet.example.data.GroupFactory;
+import org.apache.parquet.example.data.simple.SimpleGroupFactory;
+import org.apache.parquet.hadoop.codec.SnappyCompressor;
+import org.apache.parquet.hadoop.example.ExampleParquetWriter;
+import org.apache.parquet.hadoop.metadata.CompressionCodecName;
+import org.apache.parquet.hadoop.metadata.ParquetMetadata;
+import org.apache.parquet.hadoop.util.HadoopInputFile;
+import org.apache.parquet.hadoop.util.HadoopOutputFile;
+import org.apache.parquet.io.InputFile;
+import org.apache.parquet.io.OutputFile;
+import org.apache.parquet.io.PositionOutputStream;
+import org.apache.parquet.io.ParquetDecodingException;
+import org.apache.parquet.io.SeekableInputStream;
+import org.apache.parquet.schema.MessageType;
+import org.apache.parquet.schema.MessageTypeParser;
+import org.apache.parquet.schema.Types;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.INT32;
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+/**
+ * Tests that page level checksums are correctly written and that checksum verification works as
+ * expected
+ */
+public class TestDataPageV1Checksums {
+  @Rule
+  public final TemporaryFolder tempFolder = new TemporaryFolder();
+
+  private static final Statistics<?> EMPTY_STATS_INT32 = Statistics.getBuilderForReading(
+    Types.required(INT32).named("a")).build();
+
+  private CRC32 crc = new CRC32();
+
+  // Sample data, two columns 'a' and 'b' (both int32),
+
+  private static final int PAGE_SIZE = 1024 * 1024; // 1MB
+
+  private static final MessageType schemaSimple = MessageTypeParser.parseMessageType(
+    "message m {" +
+    "  required int32 a;" +
+    "  required int32 b;" +
+    "}");
+  private static final ColumnDescriptor colADesc = schemaSimple.getColumns().get(0);
+  private static final ColumnDescriptor colBDesc = schemaSimple.getColumns().get(1);
+  private static final byte[] colAPage1Bytes = new byte[PAGE_SIZE];
+  private static final byte[] colAPage2Bytes = new byte[PAGE_SIZE];
+  private static final byte[] colBPage1Bytes = new byte[PAGE_SIZE];
+  private static final byte[] colBPage2Bytes = new byte[PAGE_SIZE];
+  private static final int numRecordsLargeFile = (2 * PAGE_SIZE) / Integer.BYTES;
+
+  /** Write out sample Parquet file using ColumnChunkPageWriteStore directly, return path to file */
+  private Path writeSimpleParquetFile(Configuration conf, CompressionCodecName compression)
+    throws IOException {
+    File file = tempFolder.newFile();
+    file.delete();
+    Path path = new Path(file.toURI());
+
+    for (int i = 0; i < PAGE_SIZE; i++) {
+      colAPage1Bytes[i] = (byte) i;
+      colAPage2Bytes[i] = (byte) -i;
+      colBPage1Bytes[i] = (byte) (i + 100);
+      colBPage2Bytes[i] = (byte) (i - 100);
+    }
+
+    ParquetFileWriter writer =  new ParquetFileWriter(conf, schemaSimple, path,
+      ParquetWriter.DEFAULT_BLOCK_SIZE, ParquetWriter.MAX_PADDING_SIZE_DEFAULT);
+
+    writer.start();
+    writer.startBlock(numRecordsLargeFile);
+
+    CodecFactory codecFactory = new CodecFactory(conf, PAGE_SIZE);
+    CodecFactory.BytesCompressor compressor = codecFactory.getCompressor(compression);
+
+    ColumnChunkPageWriteStore writeStore = new ColumnChunkPageWriteStore(
+      compressor, schemaSimple, new HeapByteBufferAllocator(),
+      Integer.MAX_VALUE, ParquetOutputFormat.getPageWriteChecksumEnabled(conf));
+
+    PageWriter pageWriter = writeStore.getPageWriter(colADesc);
+    pageWriter.writePage(BytesInput.from(colAPage1Bytes), numRecordsLargeFile / 2,
+      numRecordsLargeFile / 2, EMPTY_STATS_INT32, Encoding.RLE, Encoding.RLE, Encoding.PLAIN);
+    pageWriter.writePage(BytesInput.from(colAPage2Bytes), numRecordsLargeFile / 2,
+      numRecordsLargeFile / 2, EMPTY_STATS_INT32, Encoding.RLE, Encoding.RLE, Encoding.PLAIN);
+
+    pageWriter = writeStore.getPageWriter(colBDesc);
+    pageWriter.writePage(BytesInput.from(colBPage1Bytes), numRecordsLargeFile / 2,
+      numRecordsLargeFile / 2, EMPTY_STATS_INT32, Encoding.RLE, Encoding.RLE, Encoding.PLAIN);
+    pageWriter.writePage(BytesInput.from(colBPage2Bytes), numRecordsLargeFile / 2,
+      numRecordsLargeFile / 2, EMPTY_STATS_INT32, Encoding.RLE, Encoding.RLE, Encoding.PLAIN);
+
+    writeStore.flushToFileWriter(writer);
+
+    writer.endBlock();
+    writer.end(new HashMap<>());
+
+    codecFactory.release();
+
+    return path;
+  }
+
+  // Sample data, nested schema with nulls
+
+  private static final MessageType schemaNestedWithNulls = MessageTypeParser.parseMessageType(
+    "message m {" +
+      "  optional group c {" +
+      "    required int64 id;" +
+      "    required group d {" +
+      "      repeated int32 val;" +
+      "    }" +
+      "  }" +
+      "}");
+  private static final ColumnDescriptor colCIdDesc = schemaNestedWithNulls.getColumns().get(0);
+  private static final ColumnDescriptor colDValDesc = schemaNestedWithNulls.getColumns().get(1);
+
+  private static final double nullRatio = 0.3;
+  private static final int numRecordsNestedWithNullsFile = 2000;
+
+  private Path writeNestedWithNullsSampleParquetFile(Configuration conf,
+                                                     boolean dictionaryEncoding,
+                                                     CompressionCodecName compression)
+    throws IOException {
+    File file = tempFolder.newFile();
+    file.delete();
+    Path path = new Path(file.toURI());
+
+    try (ParquetWriter<Group> writer = ExampleParquetWriter.builder(path)
+      .withConf(conf)
+      .withWriteMode(ParquetFileWriter.Mode.OVERWRITE)
+      .withCompressionCodec(compression)
+      .withDictionaryEncoding(dictionaryEncoding)
+      .withType(schemaNestedWithNulls)
+      .withPageWriteChecksumEnabled(ParquetOutputFormat.getPageWriteChecksumEnabled(conf))
+      .build()) {
+      GroupFactory groupFactory = new SimpleGroupFactory(schemaNestedWithNulls);
+      Random rand = new Random(42);
+
+      for (int i = 0; i < numRecordsNestedWithNullsFile; i++) {
+        Group group = groupFactory.newGroup();
+        if (rand.nextDouble() > nullRatio) {
+          // With equal probability, write out either 1 or 3 values in group e. To ensure our values
+          // are dictionary encoded when required, perform modulo.
+          if (rand.nextDouble() > 0.5) {
+            group.addGroup("c").append("id", (long) i).addGroup("d")
+              .append("val", rand.nextInt() % 10);
+          } else {
+            group.addGroup("c").append("id", (long) i).addGroup("d")
+              .append("val", rand.nextInt() % 10)
+              .append("val", rand.nextInt() % 10)
+              .append("val", rand.nextInt() % 10);
+          }
+        }
+        writer.write(group);
+      }
+    }
+
+    return path;
+  }
+
+  /**
+   * Enable writing out page level crc checksum, disable verification in read path but check that
+   * the crc checksums are correct. Tests whether we successfully write out correct crc checksums
+   * without potentially failing on the read path verification .
+   */
+  @Test
+  public void testWriteOnVerifyOff() throws IOException {
+    Configuration conf = new Configuration();
+    conf.setBoolean(ParquetOutputFormat.PAGE_WRITE_CHECKSUM_ENABLED, true);
+    conf.setBoolean(ParquetInputFormat.PAGE_VERIFY_CHECKSUM_ENABLED, false);
+
+    Path path = writeSimpleParquetFile(conf, CompressionCodecName.UNCOMPRESSED);
+
+    try (ParquetFileReader reader = getParquetFileReader(path, conf,
+      Arrays.asList(colADesc, colBDesc))) {
+      PageReadStore pageReadStore = reader.readNextRowGroup();
+
+      DataPageV1 colAPage1 = readNextPage(colADesc, pageReadStore);
+      assertCrcSetAndCorrect(colAPage1, colAPage1Bytes);
+      assertCorrectContent(colAPage1.getBytes().toByteArray(), colAPage1Bytes);
+
+      DataPageV1 colAPage2 = readNextPage(colADesc, pageReadStore);
+      assertCrcSetAndCorrect(colAPage2, colAPage2Bytes);
+      assertCorrectContent(colAPage2.getBytes().toByteArray(), colAPage2Bytes);
+
+      DataPageV1 colBPage1 = readNextPage(colBDesc, pageReadStore);
+      assertCrcSetAndCorrect(colBPage1, colBPage1Bytes);
+      assertCorrectContent(colBPage1.getBytes().toByteArray(), colBPage1Bytes);
+
+      DataPageV1 colBPage2 = readNextPage(colBDesc, pageReadStore);
+      assertCrcSetAndCorrect(colBPage2, colBPage2Bytes);
+      assertCorrectContent(colBPage2.getBytes().toByteArray(), colBPage2Bytes);
+    }
+  }
+
+  /** Test that we do not write out checksums if the feature is turned off */
+  @Test
+  public void testWriteOffVerifyOff() throws IOException {
+    Configuration conf = new Configuration();
+    conf.setBoolean(ParquetOutputFormat.PAGE_WRITE_CHECKSUM_ENABLED, false);
+    conf.setBoolean(ParquetInputFormat.PAGE_VERIFY_CHECKSUM_ENABLED, false);
+
+    Path path = writeSimpleParquetFile(conf, CompressionCodecName.UNCOMPRESSED);
+
+    try (ParquetFileReader reader = getParquetFileReader(path, conf,
+      Arrays.asList(colADesc, colBDesc))) {
+      PageReadStore pageReadStore = reader.readNextRowGroup();
+
+      assertCrcNotSet(readNextPage(colADesc, pageReadStore));
+      assertCrcNotSet(readNextPage(colADesc, pageReadStore));
+      assertCrcNotSet(readNextPage(colBDesc, pageReadStore));
+      assertCrcNotSet(readNextPage(colBDesc, pageReadStore));
+    }
+  }
+
+  /**
+   * Do not write out page level crc checksums, but enable verification on the read path. Tests
+   * that the read still succeeds and does not throw an exception.
+   */
+  @Test
+  public void testWriteOffVerifyOn() throws IOException {
+    Configuration conf = new Configuration();
+    conf.setBoolean(ParquetOutputFormat.PAGE_WRITE_CHECKSUM_ENABLED, false);
+    conf.setBoolean(ParquetInputFormat.PAGE_VERIFY_CHECKSUM_ENABLED, true);
+
+    Path path = writeSimpleParquetFile(conf, CompressionCodecName.UNCOMPRESSED);
+
+    try (ParquetFileReader reader = getParquetFileReader(path, conf,
+      Arrays.asList(colADesc, colBDesc))) {
+      PageReadStore pageReadStore = reader.readNextRowGroup();
+
+      assertCorrectContent(readNextPage(colADesc, pageReadStore).getBytes().toByteArray(),
+        colAPage1Bytes);
+      assertCorrectContent(readNextPage(colADesc, pageReadStore).getBytes().toByteArray(),
+        colAPage2Bytes);
+      assertCorrectContent(readNextPage(colBDesc, pageReadStore).getBytes().toByteArray(),
+        colBPage1Bytes);
+      assertCorrectContent(readNextPage(colBDesc, pageReadStore).getBytes().toByteArray(),
+        colBPage2Bytes);
+    }
+  }
+
+  /**
+   * Write out checksums and verify them on the read path. Tests that crc is set and that we can
+   * read back what we wrote if checksums are enabled on both the write and read path.
+   */
+  @Test
+  public void testWriteOnVerifyOn() throws IOException {
+    Configuration conf = new Configuration();
+    conf.setBoolean(ParquetOutputFormat.PAGE_WRITE_CHECKSUM_ENABLED, true);
+    conf.setBoolean(ParquetInputFormat.PAGE_VERIFY_CHECKSUM_ENABLED, true);
+
+    Path path = writeSimpleParquetFile(conf, CompressionCodecName.UNCOMPRESSED);
+
+    try (ParquetFileReader reader = getParquetFileReader(path, conf,
+      Arrays.asList(colADesc, colBDesc))) {
+      PageReadStore pageReadStore = reader.readNextRowGroup();
+
+      DataPageV1 colAPage1 = readNextPage(colADesc, pageReadStore);
+      assertCrcSetAndCorrect(colAPage1, colAPage1Bytes);
+      assertCorrectContent(colAPage1.getBytes().toByteArray(), colAPage1Bytes);
+
+      DataPageV1 colAPage2 = readNextPage(colADesc, pageReadStore);
+      assertCrcSetAndCorrect(colAPage2, colAPage2Bytes);
+      assertCorrectContent(colAPage2.getBytes().toByteArray(), colAPage2Bytes);
+
+      DataPageV1 colBPage1 = readNextPage(colBDesc, pageReadStore);
+      assertCrcSetAndCorrect(colBPage1, colBPage1Bytes);
+      assertCorrectContent(colBPage1.getBytes().toByteArray(), colBPage1Bytes);
+
+      DataPageV1 colBPage2 = readNextPage(colBDesc, pageReadStore);
+      assertCrcSetAndCorrect(colBPage2, colBPage2Bytes);
+      assertCorrectContent(colBPage2.getBytes().toByteArray(), colBPage2Bytes);
+    }
+  }
+
+  /**
+   * Test whether corruption in the page content is detected by checksum verification
+   */
+  @Test
+  public void testCorruptedPage() throws IOException {
+    Configuration conf = new Configuration();
+    conf.setBoolean(ParquetOutputFormat.PAGE_WRITE_CHECKSUM_ENABLED, true);
+
+    Path path = writeSimpleParquetFile(conf, CompressionCodecName.UNCOMPRESSED);
+
+    InputFile inputFile = HadoopInputFile.fromPath(path, conf);
+    try (SeekableInputStream inputStream = inputFile.newStream()) {
+      int fileLen = (int) inputFile.getLength();
+      byte[] fileBytes = new byte[fileLen];
+      inputStream.readFully(fileBytes);
+      inputStream.close();
+
+      // There are 4 pages in total (2 per column), we corrupt the first page of the first column
+      // and the second page of the second column. We do this by altering a byte roughly in the
+      // middle of each page to be corrupted
+      fileBytes[fileLen / 8]++;
+      fileBytes[fileLen / 8 + ((fileLen / 4) * 3)]++;
+
+      OutputFile outputFile = HadoopOutputFile.fromPath(path, conf);
+      try (PositionOutputStream outputStream = outputFile.createOrOverwrite(1024 * 1024)) {
+        outputStream.write(fileBytes);
+        outputStream.close();
+
+        // First we disable checksum verification, the corruption will go undetected as it is in the
+        // data section of the page
+        conf.setBoolean(ParquetInputFormat.PAGE_VERIFY_CHECKSUM_ENABLED, false);
+        try (ParquetFileReader reader = getParquetFileReader(path, conf,
+          Arrays.asList(colADesc, colBDesc))) {
+          PageReadStore pageReadStore = reader.readNextRowGroup();
+
+          DataPageV1 colAPage1 = readNextPage(colADesc, pageReadStore);
+          assertFalse("Data in page was not corrupted",
+            Arrays.equals(colAPage1.getBytes().toByteArray(), colAPage1Bytes));
+          readNextPage(colADesc, pageReadStore);
+          readNextPage(colBDesc, pageReadStore);
+          DataPageV1 colBPage2 = readNextPage(colBDesc, pageReadStore);
+          assertFalse("Data in page was not corrupted",
+            Arrays.equals(colBPage2.getBytes().toByteArray(), colBPage2Bytes));
+        }
+
+        // Now we enable checksum verification, the corruption should be detected
+        conf.setBoolean(ParquetInputFormat.PAGE_VERIFY_CHECKSUM_ENABLED, true);
+        try (ParquetFileReader reader =
+               getParquetFileReader(path, conf, Arrays.asList(colADesc, colBDesc))) {
+          // We expect an exception on the first encountered corrupt page (in readAllPages)
+          assertVerificationFailed(reader);
+        }
+      }
+    }
+  }
+
+  /**
+   * Tests that the checksum is calculated using the compressed version of the data and that
+   * checksum verification succeeds
+   */
+  @Test
+  public void testCompression() throws IOException {
+    Configuration conf = new Configuration();
+    conf.setBoolean(ParquetOutputFormat.PAGE_WRITE_CHECKSUM_ENABLED, true);
+    conf.setBoolean(ParquetInputFormat.PAGE_VERIFY_CHECKSUM_ENABLED, true);
+
+    Path path = writeSimpleParquetFile(conf, CompressionCodecName.SNAPPY);
+
+    try (ParquetFileReader reader = getParquetFileReader(path, conf,
+      Arrays.asList(colADesc, colBDesc))) {
+      PageReadStore pageReadStore = reader.readNextRowGroup();
+
+      DataPageV1 colAPage1 = readNextPage(colADesc, pageReadStore);
+      assertCrcSetAndCorrect(colAPage1, snappy(colAPage1Bytes));
+      assertCorrectContent(colAPage1.getBytes().toByteArray(), colAPage1Bytes);
+
+      DataPageV1 colAPage2 = readNextPage(colADesc, pageReadStore);
+      assertCrcSetAndCorrect(colAPage2, snappy(colAPage2Bytes));
+      assertCorrectContent(colAPage2.getBytes().toByteArray(), colAPage2Bytes);
+
+      DataPageV1 colBPage1 = readNextPage(colBDesc, pageReadStore);
+      assertCrcSetAndCorrect(colBPage1, snappy(colBPage1Bytes));
+      assertCorrectContent(colBPage1.getBytes().toByteArray(), colBPage1Bytes);
+
+      DataPageV1 colBPage2 = readNextPage(colBDesc, pageReadStore);
+      assertCrcSetAndCorrect(colBPage2, snappy(colBPage2Bytes));
+      assertCorrectContent(colBPage2.getBytes().toByteArray(), colBPage2Bytes);
+    }
+  }
+
+  /**
+   * Tests that we adhere to the checksum calculation specification, namely that the crc is
+   * calculated using the compressed concatenation of the repetition levels, definition levels and
+   * the actual data. This is done by generating sample data with a nested schema containing nulls
+   * (generating non trivial repetition and definition levels).
+   */
+  @Test
+  public void testNestedWithNulls() throws IOException {
+    Configuration conf = new Configuration();
+
+    // Write out sample file via the non-checksum code path, extract the raw bytes to calculate the
+    // reference crc with
+    conf.setBoolean(ParquetOutputFormat.PAGE_WRITE_CHECKSUM_ENABLED, false);
+    conf.setBoolean(ParquetInputFormat.PAGE_VERIFY_CHECKSUM_ENABLED, false);
+    Path refPath = writeNestedWithNullsSampleParquetFile(conf, false, CompressionCodecName.SNAPPY);
+
+    try (ParquetFileReader refReader = getParquetFileReader(refPath, conf,
+      Arrays.asList(colCIdDesc, colDValDesc))) {
+      PageReadStore refPageReadStore = refReader.readNextRowGroup();
+      byte[] colCIdPageBytes = readNextPage(colCIdDesc, refPageReadStore).getBytes().toByteArray();
+      byte[] colDValPageBytes = readNextPage(colDValDesc, refPageReadStore).getBytes().toByteArray();
+
+      // Write out sample file with checksums
+      conf.setBoolean(ParquetOutputFormat.PAGE_WRITE_CHECKSUM_ENABLED, true);
+      conf.setBoolean(ParquetInputFormat.PAGE_VERIFY_CHECKSUM_ENABLED, true);
+      Path path = writeNestedWithNullsSampleParquetFile(conf, false, CompressionCodecName.SNAPPY);
+
+      try (ParquetFileReader reader = getParquetFileReader(path, conf,
+        Arrays.asList(colCIdDesc, colDValDesc))) {
+        PageReadStore pageReadStore = reader.readNextRowGroup();
+
+        DataPageV1 colCIdPage = readNextPage(colCIdDesc, pageReadStore);
+        assertCrcSetAndCorrect(colCIdPage, snappy(colCIdPageBytes));
+        assertCorrectContent(colCIdPage.getBytes().toByteArray(), colCIdPageBytes);
+
+        DataPageV1 colDValPage = readNextPage(colDValDesc, pageReadStore);
+        assertCrcSetAndCorrect(colDValPage, snappy(colDValPageBytes));
+        assertCorrectContent(colDValPage.getBytes().toByteArray(), colDValPageBytes);
+      }
+    }
+  }
+
+  @Test
+  public void testDictionaryEncoding() throws IOException {
+    Configuration conf = new Configuration();
+
+    // Write out dictionary encoded sample file via the non-checksum code path, extract the raw
+    // bytes to calculate the  reference crc with
+    conf.setBoolean(ParquetOutputFormat.PAGE_WRITE_CHECKSUM_ENABLED, false);
+    conf.setBoolean(ParquetInputFormat.PAGE_VERIFY_CHECKSUM_ENABLED, false);
+    Path refPath = writeNestedWithNullsSampleParquetFile(conf, true, CompressionCodecName.SNAPPY);
+
+    try (ParquetFileReader refReader =
+      getParquetFileReader(refPath, conf, Collections.singletonList(colDValDesc))) {
+      PageReadStore refPageReadStore = refReader.readNextRowGroup();
+      // Read (decompressed) dictionary page
+      byte[] dictPageBytes = readDictPage(colDValDesc, refPageReadStore).getBytes().toByteArray();
+      byte[] colDValPageBytes = readNextPage(colDValDesc, refPageReadStore).getBytes().toByteArray();
+
+      // Write out sample file with checksums
+      conf.setBoolean(ParquetOutputFormat.PAGE_WRITE_CHECKSUM_ENABLED, true);
+      conf.setBoolean(ParquetInputFormat.PAGE_VERIFY_CHECKSUM_ENABLED, true);
+      Path path = writeNestedWithNullsSampleParquetFile(conf, true, CompressionCodecName.SNAPPY);
+
+      try (ParquetFileReader reader =
+        getParquetFileReader(path, conf, Collections.singletonList(colDValDesc))) {
+        PageReadStore pageReadStore = reader.readNextRowGroup();
+
+        DictionaryPage dictPage = readDictPage(colDValDesc, pageReadStore);
+        assertCrcSetAndCorrect(dictPage, snappy(dictPageBytes));
+        assertCorrectContent(dictPage.getBytes().toByteArray(), dictPageBytes);
+
+        DataPageV1 colDValPage = readNextPage(colDValDesc, pageReadStore);
+        assertCrcSetAndCorrect(colDValPage, snappy(colDValPageBytes));
+        assertCorrectContent(colDValPage.getBytes().toByteArray(), colDValPageBytes);
+      }
+    }
+  }
+
+  /** Compress using snappy */
+  private byte[] snappy(byte[] bytes) throws IOException {
+    SnappyCompressor compressor = new SnappyCompressor();
+    compressor.reset();
+    compressor.setInput(bytes, 0, bytes.length);
+    compressor.finish();
+    byte[] buffer = new byte[bytes.length * 2];
+    int compressedSize = compressor.compress(buffer, 0, buffer.length);
+    return Arrays.copyOfRange(buffer, 0, compressedSize);
+  }
+
+  /** Construct ParquetFileReader for input file and columns */
+  private ParquetFileReader getParquetFileReader(Path path, Configuration conf,
+                                                 List<ColumnDescriptor> columns)
+    throws IOException {
+    ParquetMetadata footer = ParquetFileReader.readFooter(conf, path);
+    return new ParquetFileReader(conf, footer.getFileMetaData(), path,
+      footer.getBlocks(), columns);
+  }
+
+  /** Read the dictionary page for the column */
+  private DictionaryPage readDictPage(ColumnDescriptor colDesc, PageReadStore pageReadStore) {
+    return pageReadStore.getPageReader(colDesc).readDictionaryPage();
+  }
+
+  /** Read the next page for a column */
+  private DataPageV1 readNextPage(ColumnDescriptor colDesc, PageReadStore pageReadStore) {
+    return (DataPageV1) pageReadStore.getPageReader(colDesc).readPage();
+  }
+
+  /**
+   * Compare the extracted (decompressed) bytes to the reference bytes
+   */
+  private void assertCorrectContent(byte[] pageBytes, byte[] referenceBytes) {
+    assertArrayEquals("Read page content was different from expected page content", referenceBytes,
+      pageBytes);
+  }
+
+  /**
+   * Verify that the crc is set in a page, calculate the reference crc using the reference bytes and
+   * check that the crc's are identical.
+   */
+  private void assertCrcSetAndCorrect(Page page, byte[] referenceBytes) {
+    assertTrue("Checksum was not set in page", page.getCrc().isPresent());
+    int crcFromPage = page.getCrc().getAsInt();
+    crc.reset();
+    crc.update(referenceBytes);
+    assertEquals("Checksum found in page did not match calculated reference checksum",
+      crc.getValue(), (long) crcFromPage & 0xffffffffL);
+  }
+
+  /** Verify that the crc is not set */
+  private void assertCrcNotSet(Page page) {
+    assertFalse("Checksum was set in page", page.getCrc().isPresent());
+  }
+
+  /**
+   * Read the next page for a column, fail if this did not throw an checksum verification exception,
+   * if the read succeeds (no exception was thrown ), verify that the checksum was not set.
+   */
+  private void assertVerificationFailed(ParquetFileReader reader) {
+    try {
+      reader.readNextRowGroup();
+      fail("Expected checksum verification exception to be thrown");
+    } catch (Exception e) {
+      assertTrue("Thrown exception is of incorrect type", e instanceof ParquetDecodingException);
+      assertTrue("Did not catch checksum verification ParquetDecodingException",
+        e.getMessage().contains("CRC checksum verification failed"));
+    }
+  }
+}
diff --git a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestParquetFileWriter.java b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestParquetFileWriter.java
index 917ad57..8763cac 100644
--- a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestParquetFileWriter.java
+++ b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestParquetFileWriter.java
@@ -225,6 +225,8 @@
 
     Path path = new Path(testFile.toURI());
     Configuration conf = new Configuration();
+    // Disable writing out checksums as hardcoded byte offsets in assertions below expect it
+    conf.setBoolean(ParquetOutputFormat.PAGE_WRITE_CHECKSUM_ENABLED, false);
 
     // uses the test constructor
     ParquetFileWriter w = new ParquetFileWriter(conf, SCHEMA, path, 120, 60);
@@ -330,6 +332,8 @@
 
     Path path = new Path(testFile.toURI());
     Configuration conf = new Configuration();
+    // Disable writing out checksums as hardcoded byte offsets in assertions below expect it
+    conf.setBoolean(ParquetOutputFormat.PAGE_WRITE_CHECKSUM_ENABLED, false);
 
     // uses the test constructor
     ParquetFileWriter w = new ParquetFileWriter(conf, SCHEMA, path, 100, 50);
diff --git a/parquet-tools/src/main/java/org/apache/parquet/tools/command/DumpCommand.java b/parquet-tools/src/main/java/org/apache/parquet/tools/command/DumpCommand.java
index 27043b9..eaf6e8e 100644
--- a/parquet-tools/src/main/java/org/apache/parquet/tools/command/DumpCommand.java
+++ b/parquet-tools/src/main/java/org/apache/parquet/tools/command/DumpCommand.java
@@ -29,6 +29,7 @@
 import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
+import java.util.zip.CRC32;
 
 import org.apache.commons.cli.CommandLine;
 import org.apache.commons.cli.Option;
@@ -73,6 +74,8 @@
     public static final int BLOCK_BUFFER_SIZE = 64 * 1024;
     public static final String[] USAGE = new String[] { "<input>", "where <input> is the parquet file to print to stdout" };
 
+    private static CRC32 crc = new CRC32();
+
     public static final Options OPTIONS;
     static {
         OPTIONS = new Options();
@@ -242,6 +245,12 @@
         }
     }
 
+    private static boolean verifyCrc(int referenceCrc, byte[] bytes) {
+      crc.reset();
+      crc.update(bytes);
+      return crc.getValue() == ((long) referenceCrc & 0xffffffffL);
+    }
+
     public static void dump(final PrettyPrintWriter out, PageReadStore store, ColumnDescriptor column) throws IOException {
         PageReader reader = store.getPageReader(column);
 
@@ -274,6 +283,15 @@
                 } else {
                   out.format(" ST:[none]");
                 }
+                if (pageV1.getCrc().isPresent()) {
+                  try {
+                    out.format(" CRC:%s", verifyCrc(pageV1.getCrc().getAsInt(), pageV1.getBytes().toByteArray()) ? "[verified]" : "[PAGE CORRUPT]");
+                  } catch (IOException e) {
+                    out.format(" CRC:[error getting page bytes]");
+                  }
+                } else {
+                  out.format(" CRC:[none]");
+                }
                 return null;
               }