PARQUET-1977: Invalid data_page_offset (#868)
diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/Offsets.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/Offsets.java
new file mode 100644
index 0000000..fa25943
--- /dev/null
+++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/Offsets.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.parquet.hadoop;
+
+import java.io.IOException;
+
+import org.apache.parquet.format.PageHeader;
+import org.apache.parquet.format.Util;
+import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData;
+import org.apache.parquet.io.SeekableInputStream;
+
+/**
+ * Class to help gather/calculate the proper values of the dictionary/first data page offset values in a column chunk.
+ * This class is used by Tools (parquet-tools/-cli) that do not support encryption so this does not support it either.
+ * (In some cases this tool would read the dictionary page header which might be encrypted.)
+ */
+class Offsets {
+
+ /**
+ * Returns the offset values for the column chunk to be written.
+ *
+ * @param input the source input stream of the column chunk
+ * @param chunk the column chunk metadata read from the source file
+ * @param newChunkStart the position of the column chunk to be written
+ * @return the offset values
+ * @throws IOException if any I/O error occurs during the reading of the input stream
+ */
+ public static Offsets getOffsets(SeekableInputStream input, ColumnChunkMetaData chunk, long newChunkStart)
+ throws IOException {
+ long firstDataPageOffset;
+ long dictionaryPageOffset;
+ if (chunk.hasDictionaryPage()) {
+ long dictionaryPageSize;
+ if (chunk.getDictionaryPageOffset() == 0 || chunk.getFirstDataPageOffset() <= chunk.getDictionaryPageOffset()) {
+ /*
+ * The offsets might not contain the proper values (so we need to read the dictionary page header):
+ * - The dictionaryPageOffset might not be set; in this case 0 is returned
+ * (0 cannot be a valid offset because of the MAGIC bytes)
+ * - The firstDataPageOffset might point to the dictionary page
+ */
+ dictionaryPageSize = readDictionaryPageSize(input, newChunkStart);
+ } else {
+ dictionaryPageSize = chunk.getFirstDataPageOffset() - chunk.getDictionaryPageOffset();
+ }
+ firstDataPageOffset = newChunkStart + dictionaryPageSize;
+ dictionaryPageOffset = newChunkStart;
+ } else {
+ firstDataPageOffset = newChunkStart;
+ dictionaryPageOffset = 0;
+ }
+ return new Offsets(firstDataPageOffset, dictionaryPageOffset);
+ }
+
+ private static long readDictionaryPageSize(SeekableInputStream in, long pos) throws IOException {
+ long origPos = -1;
+ try {
+ origPos = in.getPos();
+ PageHeader header = Util.readPageHeader(in);
+ long headerSize = in.getPos() - origPos;
+ return headerSize + header.getCompressed_page_size();
+ } finally {
+ if (origPos != -1) {
+ in.seek(origPos);
+ }
+ }
+ }
+
+ private Offsets(long firstDataPageOffset, long dictionaryPageOffset) {
+ this.firstDataPageOffset = firstDataPageOffset;
+ this.dictionaryPageOffset = dictionaryPageOffset;
+ }
+
+ public final long firstDataPageOffset;
+ public final long dictionaryPageOffset;
+}
diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileWriter.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileWriter.java
index 3892f3d..a246a52 100644
--- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileWriter.java
+++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileWriter.java
@@ -155,14 +155,13 @@
private Statistics currentStatistics; // accumulated in writePage(s)
private ColumnIndexBuilder columnIndexBuilder;
private OffsetIndexBuilder offsetIndexBuilder;
- private long firstPageOffset;
// column chunk data set at the start of a column
private CompressionCodecName currentChunkCodec; // set in startColumn
private ColumnPath currentChunkPath; // set in startColumn
private PrimitiveType currentChunkType; // set in startColumn
private long currentChunkValueCount; // set in startColumn
- private long currentChunkFirstDataPage; // set in startColumn (out.pos())
+ private long currentChunkFirstDataPage; // set in startColumn & page writes
private long currentChunkDictionaryPageOffset; // set in writeDictionaryPage
// set when end is called
@@ -437,7 +436,7 @@
currentChunkType = descriptor.getPrimitiveType();
currentChunkCodec = compressionCodecName;
currentChunkValueCount = valueCount;
- currentChunkFirstDataPage = out.getPos();
+ currentChunkFirstDataPage = -1;
compressedLength = 0;
uncompressedLength = 0;
// The statistics will be copied from the first one added at writeDataPage(s) so we have the correct typed one
@@ -445,7 +444,6 @@
columnIndexBuilder = ColumnIndexBuilder.getBuilder(currentChunkType, columnIndexTruncateLength);
offsetIndexBuilder = OffsetIndexBuilder.getBuilder();
- firstPageOffset = -1;
}
/**
@@ -536,6 +534,9 @@
currentEncodings.add(rlEncoding);
currentEncodings.add(dlEncoding);
currentEncodings.add(valuesEncoding);
+ if (currentChunkFirstDataPage < 0) {
+ currentChunkFirstDataPage = beforeHeader;
+ }
}
/**
@@ -600,8 +601,8 @@
Encoding valuesEncoding) throws IOException {
state = state.write();
long beforeHeader = out.getPos();
- if (firstPageOffset == -1) {
- firstPageOffset = beforeHeader;
+ if (currentChunkFirstDataPage < 0) {
+ currentChunkFirstDataPage = beforeHeader;
}
LOG.debug("{}: write data page: {} values", beforeHeader, valueCount);
int compressedPageSize = (int) bytes.size();
@@ -689,8 +690,8 @@
);
long beforeHeader = out.getPos();
- if (firstPageOffset == -1) {
- firstPageOffset = beforeHeader;
+ if (currentChunkFirstDataPage < 0) {
+ currentChunkFirstDataPage = beforeHeader;
}
metadataConverter.writeDataPageV2Header(
@@ -806,7 +807,7 @@
this.uncompressedLength += uncompressedTotalPageSize + headersSize;
this.compressedLength += compressedTotalPageSize + headersSize;
LOG.debug("{}: write data pages content", out.getPos());
- firstPageOffset = out.getPos();
+ currentChunkFirstDataPage = out.getPos();
bytes.writeAllTo(out);
encodingStatsBuilder.addDataEncodings(dataEncodings);
if (rlEncodings.isEmpty()) {
@@ -835,7 +836,7 @@
} else {
currentColumnIndexes.add(columnIndexBuilder.build());
}
- currentOffsetIndexes.add(offsetIndexBuilder.build(firstPageOffset));
+ currentOffsetIndexes.add(offsetIndexBuilder.build(currentChunkFirstDataPage));
currentBlock.addColumn(ColumnChunkMetaData.get(
currentChunkPath,
currentChunkType,
@@ -994,6 +995,7 @@
currentColumnIndexes.add(null);
currentOffsetIndexes.add(null);
+ Offsets offsets = Offsets.getOffsets(from, chunk, newChunkStart);
currentBlock.addColumn(ColumnChunkMetaData.get(
chunk.getPath(),
chunk.getPrimitiveType(),
@@ -1001,8 +1003,8 @@
chunk.getEncodingStats(),
chunk.getEncodings(),
chunk.getStatistics(),
- newChunkStart,
- newChunkStart,
+ offsets.firstDataPageOffset,
+ offsets.dictionaryPageOffset,
chunk.getValueCount(),
chunk.getTotalSize(),
chunk.getTotalUncompressedSize()));
@@ -1036,6 +1038,7 @@
currentColumnIndexes.add(columnIndex);
currentOffsetIndexes.add(offsetIndex);
+ Offsets offsets = Offsets.getOffsets(from, chunk, newChunkStart);
currentBlock.addColumn(ColumnChunkMetaData.get(
chunk.getPath(),
chunk.getPrimitiveType(),
@@ -1043,8 +1046,8 @@
chunk.getEncodingStats(),
chunk.getEncodings(),
chunk.getStatistics(),
- newChunkStart,
- newChunkStart,
+ offsets.firstDataPageOffset,
+ offsets.dictionaryPageOffset,
chunk.getValueCount(),
chunk.getTotalSize(),
chunk.getTotalUncompressedSize()));
diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/metadata/ColumnChunkMetaData.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/metadata/ColumnChunkMetaData.java
index 587a241..ed26618 100644
--- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/metadata/ColumnChunkMetaData.java
+++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/metadata/ColumnChunkMetaData.java
@@ -262,7 +262,8 @@
abstract public long getFirstDataPageOffset();
/**
- * @return the location of the dictionary page if any
+ * @return the location of the dictionary page if any; {@code 0} is returned if there is no dictionary page. Check
+ * {@link #hasDictionaryPage()} to validate.
*/
abstract public long getDictionaryPageOffset();
diff --git a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestParquetFileWriter.java b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestParquetFileWriter.java
index c41c8bf..73ef70e 100644
--- a/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestParquetFileWriter.java
+++ b/parquet-hadoop/src/test/java/org/apache/parquet/hadoop/TestParquetFileWriter.java
@@ -39,6 +39,7 @@
import org.apache.parquet.column.Encoding;
import org.apache.parquet.column.page.DataPage;
import org.apache.parquet.column.page.DataPageV1;
+import org.apache.parquet.column.page.DictionaryPage;
import org.apache.parquet.column.page.PageReadStore;
import org.apache.parquet.column.page.PageReader;
import org.apache.parquet.column.statistics.BinaryStatistics;
@@ -69,6 +70,7 @@
import static org.junit.Assert.*;
import static org.apache.parquet.column.Encoding.BIT_PACKED;
import static org.apache.parquet.column.Encoding.PLAIN;
+import static org.apache.parquet.column.Encoding.RLE_DICTIONARY;
import static org.apache.parquet.format.converter.ParquetMetadataConverter.MAX_STATS_SIZE;
import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.BINARY;
import static org.apache.parquet.schema.Type.Repetition.*;
@@ -157,12 +159,15 @@
w.startBlock(3);
w.startColumn(C1, 5, CODEC);
long c1Starts = w.getPos();
+ long c1p1Starts = w.getPos();
w.writeDataPage(2, 4, BytesInput.from(BYTES1), EMPTY_STATS, BIT_PACKED, BIT_PACKED, PLAIN);
w.writeDataPage(3, 4, BytesInput.from(BYTES1), EMPTY_STATS, BIT_PACKED, BIT_PACKED, PLAIN);
w.endColumn();
long c1Ends = w.getPos();
w.startColumn(C2, 6, CODEC);
long c2Starts = w.getPos();
+ w.writeDictionaryPage(new DictionaryPage(BytesInput.from(BYTES2), 4, RLE_DICTIONARY));
+ long c2p1Starts = w.getPos();
w.writeDataPage(2, 4, BytesInput.from(BYTES2), EMPTY_STATS, BIT_PACKED, BIT_PACKED, PLAIN);
w.writeDataPage(3, 4, BytesInput.from(BYTES2), EMPTY_STATS, BIT_PACKED, BIT_PACKED, PLAIN);
w.writeDataPage(1, 4, BytesInput.from(BYTES2), EMPTY_STATS, BIT_PACKED, BIT_PACKED, PLAIN);
@@ -181,17 +186,26 @@
ParquetMetadata readFooter = ParquetFileReader.readFooter(configuration, path);
assertEquals("footer: "+ readFooter, 2, readFooter.getBlocks().size());
- assertEquals(c1Ends - c1Starts, readFooter.getBlocks().get(0).getColumns().get(0).getTotalSize());
- assertEquals(c2Ends - c2Starts, readFooter.getBlocks().get(0).getColumns().get(1).getTotalSize());
- assertEquals(c2Ends - c1Starts, readFooter.getBlocks().get(0).getTotalByteSize());
+ BlockMetaData rowGroup = readFooter.getBlocks().get(0);
+ assertEquals(c1Ends - c1Starts, rowGroup.getColumns().get(0).getTotalSize());
+ assertEquals(c2Ends - c2Starts, rowGroup.getColumns().get(1).getTotalSize());
+ assertEquals(c2Ends - c1Starts, rowGroup.getTotalByteSize());
+
+ assertEquals(c1Starts, rowGroup.getColumns().get(0).getStartingPos());
+ assertEquals(0, rowGroup.getColumns().get(0).getDictionaryPageOffset());
+ assertEquals(c1p1Starts, rowGroup.getColumns().get(0).getFirstDataPageOffset());
+ assertEquals(c2Starts, rowGroup.getColumns().get(1).getStartingPos());
+ assertEquals(c2Starts, rowGroup.getColumns().get(1).getDictionaryPageOffset());
+ assertEquals(c2p1Starts, rowGroup.getColumns().get(1).getFirstDataPageOffset());
+
HashSet<Encoding> expectedEncoding=new HashSet<Encoding>();
expectedEncoding.add(PLAIN);
expectedEncoding.add(BIT_PACKED);
- assertEquals(expectedEncoding,readFooter.getBlocks().get(0).getColumns().get(0).getEncodings());
+ assertEquals(expectedEncoding,rowGroup.getColumns().get(0).getEncodings());
{ // read first block of col #1
ParquetFileReader r = new ParquetFileReader(configuration, readFooter.getFileMetaData(), path,
- Arrays.asList(readFooter.getBlocks().get(0)), Arrays.asList(SCHEMA.getColumnDescription(PATH1)));
+ Arrays.asList(rowGroup), Arrays.asList(SCHEMA.getColumnDescription(PATH1)));
PageReadStore pages = r.readNextRowGroup();
assertEquals(3, pages.getRowCount());
validateContains(SCHEMA, pages, PATH1, 2, BytesInput.from(BYTES1));