PARQUET-1977: Invalid data_page_offset (#868)

diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/Offsets.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/Offsets.java
new file mode 100644
index 0000000..fa25943
--- /dev/null
+++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/Offsets.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.parquet.hadoop;
+
+import java.io.IOException;
+
+import org.apache.parquet.format.PageHeader;
+import org.apache.parquet.format.Util;
+import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData;
+import org.apache.parquet.io.SeekableInputStream;
+
+/**
+ * Class to help gather/calculate the proper values of the dictionary/first data page offset values in a column chunk.
+ * This class is used by Tools (parquet-tools/-cli) that do not support encryption so this does not support it either.
+ * (In some cases this tool would read the dictionary page header which might be encrypted.)
+ */
+class Offsets {
+
+  /**
+   * Returns the offset values for the column chunk to be written.
+   *
+   * @param input         the source input stream of the column chunk
+   * @param chunk         the column chunk metadata read from the source file
+   * @param newChunkStart the position of the column chunk to be written
+   * @return the offset values
+   * @throws IOException if any I/O error occurs during the reading of the input stream
+   */
+  public static Offsets getOffsets(SeekableInputStream input, ColumnChunkMetaData chunk, long newChunkStart)
+      throws IOException {
+    long firstDataPageOffset;
+    long dictionaryPageOffset;
+    if (chunk.hasDictionaryPage()) {
+      long dictionaryPageSize;
+      if (chunk.getDictionaryPageOffset() == 0 || chunk.getFirstDataPageOffset() <= chunk.getDictionaryPageOffset()) {
+        /*
+         * The offsets might not contain the proper values (so we need to read the dictionary page header):
+         * - The dictionaryPageOffset might not be set; in this case 0 is returned
+         *   (0 cannot be a valid offset because of the MAGIC bytes)
+         * - The firstDataPageOffset might point to the dictionary page
+         */
+        dictionaryPageSize = readDictionaryPageSize(input, newChunkStart);
+      } else {
+        dictionaryPageSize = chunk.getFirstDataPageOffset() - chunk.getDictionaryPageOffset();
+      }
+      firstDataPageOffset = newChunkStart + dictionaryPageSize;
+      dictionaryPageOffset = newChunkStart;
+    } else {
+      firstDataPageOffset = newChunkStart;
+      dictionaryPageOffset = 0;
+    }
+    return new Offsets(firstDataPageOffset, dictionaryPageOffset);
+  }
+
+  private static long readDictionaryPageSize(SeekableInputStream in, long pos) throws IOException {
+    long origPos = -1;
+    try {
+      origPos = in.getPos();
+      PageHeader header = Util.readPageHeader(in);
+      long headerSize = in.getPos() - origPos;
+      return headerSize + header.getCompressed_page_size();
+    } finally {
+      if (origPos != -1) {
+        in.seek(origPos);
+      }
+    }
+  }
+
+  private Offsets(long firstDataPageOffset, long dictionaryPageOffset) {
+    this.firstDataPageOffset = firstDataPageOffset;
+    this.dictionaryPageOffset = dictionaryPageOffset;
+  }
+
+  public final long firstDataPageOffset;
+  public final long dictionaryPageOffset;
+}
diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileWriter.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileWriter.java
index 3892f3d..a246a52 100644
--- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileWriter.java
+++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileWriter.java
@@ -155,14 +155,13 @@
   private Statistics currentStatistics; // accumulated in writePage(s)
   private ColumnIndexBuilder columnIndexBuilder;
   private OffsetIndexBuilder offsetIndexBuilder;
-  private long firstPageOffset;
 
   // column chunk data set at the start of a column
   private CompressionCodecName currentChunkCodec; // set in startColumn
   private ColumnPath currentChunkPath;            // set in startColumn
   private PrimitiveType currentChunkType;         // set in startColumn
   private long currentChunkValueCount;            // set in startColumn
-  private long currentChunkFirstDataPage;         // set in startColumn (out.pos())
+  private long currentChunkFirstDataPage;         // set in startColumn & page writes
   private long currentChunkDictionaryPageOffset;  // set in writeDictionaryPage
 
   // set when end is called
@@ -437,7 +436,7 @@
     currentChunkType = descriptor.getPrimitiveType();
     currentChunkCodec = compressionCodecName;
     currentChunkValueCount = valueCount;
-    currentChunkFirstDataPage = out.getPos();
+    currentChunkFirstDataPage = -1;
     compressedLength = 0;
     uncompressedLength = 0;
     // The statistics will be copied from the first one added at writeDataPage(s) so we have the correct typed one
@@ -445,7 +444,6 @@
 
     columnIndexBuilder = ColumnIndexBuilder.getBuilder(currentChunkType, columnIndexTruncateLength);
     offsetIndexBuilder = OffsetIndexBuilder.getBuilder();
-    firstPageOffset = -1;
   }
 
   /**
@@ -536,6 +534,9 @@
     currentEncodings.add(rlEncoding);
     currentEncodings.add(dlEncoding);
     currentEncodings.add(valuesEncoding);
+    if (currentChunkFirstDataPage < 0) {
+      currentChunkFirstDataPage = beforeHeader;
+    }
   }
 
   /**
@@ -600,8 +601,8 @@
       Encoding valuesEncoding) throws IOException {
     state = state.write();
     long beforeHeader = out.getPos();
-    if (firstPageOffset == -1) {
-      firstPageOffset = beforeHeader;
+    if (currentChunkFirstDataPage < 0) {
+      currentChunkFirstDataPage = beforeHeader;
     }
     LOG.debug("{}: write data page: {} values", beforeHeader, valueCount);
     int compressedPageSize = (int) bytes.size();
@@ -689,8 +690,8 @@
     );
 
     long beforeHeader = out.getPos();
-    if (firstPageOffset == -1) {
-      firstPageOffset = beforeHeader;
+    if (currentChunkFirstDataPage < 0) {
+      currentChunkFirstDataPage = beforeHeader;
     }
 
     metadataConverter.writeDataPageV2Header(
@@ -806,7 +807,7 @@
     this.uncompressedLength += uncompressedTotalPageSize + headersSize;
     this.compressedLength += compressedTotalPageSize + headersSize;
     LOG.debug("{}: write data pages content", out.getPos());
-    firstPageOffset = out.getPos();
+    currentChunkFirstDataPage = out.getPos();
     bytes.writeAllTo(out);
     encodingStatsBuilder.addDataEncodings(dataEncodings);
     if (rlEncodings.isEmpty()) {
@@ -835,7 +836,7 @@
     } else {
       currentColumnIndexes.add(columnIndexBuilder.build());
     }
-    currentOffsetIndexes.add(offsetIndexBuilder.build(firstPageOffset));
+    currentOffsetIndexes.add(offsetIndexBuilder.build(currentChunkFirstDataPage));
     currentBlock.addColumn(ColumnChunkMetaData.get(
         currentChunkPath,
         currentChunkType,
@@ -994,6 +995,7 @@
       currentColumnIndexes.add(null);
       currentOffsetIndexes.add(null);
 
+      Offsets offsets = Offsets.getOffsets(from, chunk, newChunkStart);
       currentBlock.addColumn(ColumnChunkMetaData.get(
           chunk.getPath(),
           chunk.getPrimitiveType(),
@@ -1001,8 +1003,8 @@
           chunk.getEncodingStats(),
           chunk.getEncodings(),
           chunk.getStatistics(),
-          newChunkStart,
-          newChunkStart,
+          offsets.firstDataPageOffset,
+          offsets.dictionaryPageOffset,
           chunk.getValueCount(),
           chunk.getTotalSize(),
           chunk.getTotalUncompressedSize()));
@@ -1036,6 +1038,7 @@
     currentColumnIndexes.add(columnIndex);
     currentOffsetIndexes.add(offsetIndex);
 
+    Offsets offsets = Offsets.getOffsets(from, chunk, newChunkStart);
     currentBlock.addColumn(ColumnChunkMetaData.get(
       chunk.getPath(),
       chunk.getPrimitiveType(),
@@ -1043,8 +1046,8 @@
       chunk.getEncodingStats(),
       chunk.getEncodings(),
       chunk.getStatistics(),
-      newChunkStart,
-      newChunkStart,
+      offsets.firstDataPageOffset,
+      offsets.dictionaryPageOffset,
       chunk.getValueCount(),
       chunk.getTotalSize(),
       chunk.getTotalUncompressedSize()));
diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/metadata/ColumnChunkMetaData.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/metadata/ColumnChunkMetaData.java
index 587a241..ed26618 100644
--- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/metadata/ColumnChunkMetaData.java
+++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/metadata/ColumnChunkMetaData.java
@@ -262,7 +262,8 @@
   abstract public long getFirstDataPageOffset();
 
   /**
-   * @return the location of the dictionary page if any
+   * @return the location of the dictionary page if any; {@code 0} is returned if there is no dictionary page. Check
+   *         {@link #hasDictionaryPage()} to validate.
    */
   abstract public long getDictionaryPageOffset();
 
diff --git a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestParquetFileWriter.java b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestParquetFileWriter.java
index c41c8bf..73ef70e 100644
--- a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestParquetFileWriter.java
+++ b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestParquetFileWriter.java
@@ -39,6 +39,7 @@
 import org.apache.parquet.column.Encoding;
 import org.apache.parquet.column.page.DataPage;
 import org.apache.parquet.column.page.DataPageV1;
+import org.apache.parquet.column.page.DictionaryPage;
 import org.apache.parquet.column.page.PageReadStore;
 import org.apache.parquet.column.page.PageReader;
 import org.apache.parquet.column.statistics.BinaryStatistics;
@@ -69,6 +70,7 @@
 import static org.junit.Assert.*;
 import static org.apache.parquet.column.Encoding.BIT_PACKED;
 import static org.apache.parquet.column.Encoding.PLAIN;
+import static org.apache.parquet.column.Encoding.RLE_DICTIONARY;
 import static org.apache.parquet.format.converter.ParquetMetadataConverter.MAX_STATS_SIZE;
 import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.BINARY;
 import static org.apache.parquet.schema.Type.Repetition.*;
@@ -157,12 +159,15 @@
     w.startBlock(3);
     w.startColumn(C1, 5, CODEC);
     long c1Starts = w.getPos();
+    long c1p1Starts = w.getPos();
     w.writeDataPage(2, 4, BytesInput.from(BYTES1), EMPTY_STATS, BIT_PACKED, BIT_PACKED, PLAIN);
     w.writeDataPage(3, 4, BytesInput.from(BYTES1), EMPTY_STATS, BIT_PACKED, BIT_PACKED, PLAIN);
     w.endColumn();
     long c1Ends = w.getPos();
     w.startColumn(C2, 6, CODEC);
     long c2Starts = w.getPos();
+    w.writeDictionaryPage(new DictionaryPage(BytesInput.from(BYTES2), 4, RLE_DICTIONARY));
+    long c2p1Starts = w.getPos();
     w.writeDataPage(2, 4, BytesInput.from(BYTES2), EMPTY_STATS, BIT_PACKED, BIT_PACKED, PLAIN);
     w.writeDataPage(3, 4, BytesInput.from(BYTES2), EMPTY_STATS, BIT_PACKED, BIT_PACKED, PLAIN);
     w.writeDataPage(1, 4, BytesInput.from(BYTES2), EMPTY_STATS, BIT_PACKED, BIT_PACKED, PLAIN);
@@ -181,17 +186,26 @@
 
     ParquetMetadata readFooter = ParquetFileReader.readFooter(configuration, path);
     assertEquals("footer: "+ readFooter, 2, readFooter.getBlocks().size());
-    assertEquals(c1Ends - c1Starts, readFooter.getBlocks().get(0).getColumns().get(0).getTotalSize());
-    assertEquals(c2Ends - c2Starts, readFooter.getBlocks().get(0).getColumns().get(1).getTotalSize());
-    assertEquals(c2Ends - c1Starts, readFooter.getBlocks().get(0).getTotalByteSize());
+    BlockMetaData rowGroup = readFooter.getBlocks().get(0);
+    assertEquals(c1Ends - c1Starts, rowGroup.getColumns().get(0).getTotalSize());
+    assertEquals(c2Ends - c2Starts, rowGroup.getColumns().get(1).getTotalSize());
+    assertEquals(c2Ends - c1Starts, rowGroup.getTotalByteSize());
+
+    assertEquals(c1Starts, rowGroup.getColumns().get(0).getStartingPos());
+    assertEquals(0, rowGroup.getColumns().get(0).getDictionaryPageOffset());
+    assertEquals(c1p1Starts, rowGroup.getColumns().get(0).getFirstDataPageOffset());
+    assertEquals(c2Starts, rowGroup.getColumns().get(1).getStartingPos());
+    assertEquals(c2Starts, rowGroup.getColumns().get(1).getDictionaryPageOffset());
+    assertEquals(c2p1Starts, rowGroup.getColumns().get(1).getFirstDataPageOffset());
+
     HashSet<Encoding> expectedEncoding=new HashSet<Encoding>();
     expectedEncoding.add(PLAIN);
     expectedEncoding.add(BIT_PACKED);
-    assertEquals(expectedEncoding,readFooter.getBlocks().get(0).getColumns().get(0).getEncodings());
+    assertEquals(expectedEncoding,rowGroup.getColumns().get(0).getEncodings());
 
     { // read first block of col #1
       ParquetFileReader r = new ParquetFileReader(configuration, readFooter.getFileMetaData(), path,
-          Arrays.asList(readFooter.getBlocks().get(0)), Arrays.asList(SCHEMA.getColumnDescription(PATH1)));
+          Arrays.asList(rowGroup), Arrays.asList(SCHEMA.getColumnDescription(PATH1)));
       PageReadStore pages = r.readNextRowGroup();
       assertEquals(3, pages.getRowCount());
       validateContains(SCHEMA, pages, PATH1, 2, BytesInput.from(BYTES1));