| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| */ |
| package org.apache.parquet.hadoop; |
| |
| import org.apache.hadoop.conf.Configuration; |
| import org.apache.hadoop.fs.FSDataInputStream; |
| import org.apache.hadoop.fs.FileStatus; |
| import org.apache.hadoop.fs.FileSystem; |
| import org.apache.hadoop.fs.Path; |
| import org.apache.parquet.ParquetReadOptions; |
| import org.apache.parquet.Version; |
| import org.apache.parquet.bytes.BytesUtils; |
| import org.apache.parquet.column.page.DataPageV2; |
| import org.apache.parquet.column.values.bloomfilter.BlockSplitBloomFilter; |
| import org.apache.parquet.column.values.bloomfilter.BloomFilter; |
| import org.apache.parquet.hadoop.ParquetOutputFormat.JobSummaryLevel; |
| import org.apache.parquet.io.ParquetEncodingException; |
| import org.junit.Assume; |
| import org.junit.Rule; |
| import org.junit.Test; |
| import org.apache.parquet.bytes.BytesInput; |
| import org.apache.parquet.column.ColumnDescriptor; |
| import org.apache.parquet.column.Encoding; |
| import org.apache.parquet.column.page.DataPage; |
| import org.apache.parquet.column.page.DataPageV1; |
| import org.apache.parquet.column.page.DictionaryPage; |
| import org.apache.parquet.column.page.PageReadStore; |
| import org.apache.parquet.column.page.PageReader; |
| import org.apache.parquet.column.statistics.BinaryStatistics; |
| import org.apache.parquet.column.statistics.LongStatistics; |
| import org.apache.parquet.format.Statistics; |
| import org.apache.parquet.hadoop.metadata.*; |
| import org.apache.parquet.hadoop.util.HadoopInputFile; |
| import org.apache.parquet.hadoop.util.HiddenFileFilter; |
| import org.apache.parquet.internal.column.columnindex.BoundaryOrder; |
| import org.apache.parquet.internal.column.columnindex.ColumnIndex; |
| import org.apache.parquet.internal.column.columnindex.OffsetIndex; |
| import org.apache.parquet.io.api.Binary; |
| import org.apache.parquet.schema.MessageType; |
| import org.apache.parquet.schema.MessageTypeParser; |
| import org.apache.parquet.schema.PrimitiveType; |
| import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName; |
| import org.apache.parquet.schema.Types; |
| |
| import java.io.File; |
| import java.io.IOException; |
| import java.nio.ByteBuffer; |
| import java.nio.charset.StandardCharsets; |
| import java.util.*; |
| import java.util.concurrent.Callable; |
| |
| import static org.apache.parquet.CorruptStatistics.shouldIgnoreStatistics; |
| import static org.apache.parquet.hadoop.ParquetFileWriter.Mode.OVERWRITE; |
| import static org.junit.Assert.*; |
| import static org.apache.parquet.column.Encoding.BIT_PACKED; |
| import static org.apache.parquet.column.Encoding.PLAIN; |
| import static org.apache.parquet.column.Encoding.RLE_DICTIONARY; |
| import static org.apache.parquet.format.converter.ParquetMetadataConverter.MAX_STATS_SIZE; |
| import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.BINARY; |
| import static org.apache.parquet.schema.Type.Repetition.*; |
| import static org.apache.parquet.hadoop.TestUtils.enforceEmptyDir; |
| |
| import org.apache.parquet.example.data.Group; |
| import org.apache.parquet.example.data.simple.SimpleGroup; |
| |
| import org.apache.parquet.hadoop.example.GroupWriteSupport; |
| import org.junit.rules.TemporaryFolder; |
| import org.mockito.Mockito; |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| |
| public class TestParquetFileWriter { |
| |
| private static final Logger LOG = LoggerFactory.getLogger(TestParquetFileWriter.class); |
| |
| private static final MessageType SCHEMA = MessageTypeParser.parseMessageType("" + |
| "message m {" + |
| " required group a {" + |
| " required binary b;" + |
| " }" + |
| " required group c {" + |
| " required int64 d;" + |
| " }" + |
| "}"); |
| private static final String[] PATH1 = {"a", "b"}; |
| private static final ColumnDescriptor C1 = SCHEMA.getColumnDescription(PATH1); |
| private static final String[] PATH2 = {"c", "d"}; |
| private static final ColumnDescriptor C2 = SCHEMA.getColumnDescription(PATH2); |
| |
| private static final byte[] BYTES1 = { 0, 1, 2, 3 }; |
| private static final byte[] BYTES2 = { 1, 2, 3, 4 }; |
| private static final byte[] BYTES3 = { 2, 3, 4, 5 }; |
| private static final byte[] BYTES4 = { 3, 4, 5, 6 }; |
| private static final CompressionCodecName CODEC = CompressionCodecName.UNCOMPRESSED; |
| |
| private static final org.apache.parquet.column.statistics.Statistics<?> EMPTY_STATS = org.apache.parquet.column.statistics.Statistics |
| .getBuilderForReading(Types.required(PrimitiveTypeName.BINARY).named("test_binary")).build(); |
| |
| private String writeSchema; |
| |
| @Rule |
| public final TemporaryFolder temp = new TemporaryFolder(); |
| |
| @Test |
| public void testWriteMode() throws Exception { |
| File testFile = temp.newFile(); |
| MessageType schema = MessageTypeParser.parseMessageType( |
| "message m { required group a {required binary b;} required group " |
| + "c { required int64 d; }}"); |
| Configuration conf = new Configuration(); |
| |
| ParquetFileWriter writer = null; |
| boolean exceptionThrown = false; |
| Path path = new Path(testFile.toURI()); |
| try { |
| writer = new ParquetFileWriter(conf, schema, path, |
| ParquetFileWriter.Mode.CREATE); |
| } catch(IOException ioe1) { |
| exceptionThrown = true; |
| } |
| assertTrue(exceptionThrown); |
| exceptionThrown = false; |
| try { |
| writer = new ParquetFileWriter(conf, schema, path, |
| OVERWRITE); |
| } catch(IOException ioe2) { |
| exceptionThrown = true; |
| } |
| assertTrue(!exceptionThrown); |
| testFile.delete(); |
| } |
| |
| @Test |
| public void testWriteRead() throws Exception { |
| File testFile = temp.newFile(); |
| testFile.delete(); |
| |
| Path path = new Path(testFile.toURI()); |
| Configuration configuration = new Configuration(); |
| |
| ParquetFileWriter w = new ParquetFileWriter(configuration, SCHEMA, path); |
| w.start(); |
| w.startBlock(3); |
| w.startColumn(C1, 5, CODEC); |
| long c1Starts = w.getPos(); |
| long c1p1Starts = w.getPos(); |
| w.writeDataPage(2, 4, BytesInput.from(BYTES1), EMPTY_STATS, BIT_PACKED, BIT_PACKED, PLAIN); |
| w.writeDataPage(3, 4, BytesInput.from(BYTES1), EMPTY_STATS, BIT_PACKED, BIT_PACKED, PLAIN); |
| w.endColumn(); |
| long c1Ends = w.getPos(); |
| w.startColumn(C2, 6, CODEC); |
| long c2Starts = w.getPos(); |
| w.writeDictionaryPage(new DictionaryPage(BytesInput.from(BYTES2), 4, RLE_DICTIONARY)); |
| long c2p1Starts = w.getPos(); |
| w.writeDataPage(2, 4, BytesInput.from(BYTES2), EMPTY_STATS, BIT_PACKED, BIT_PACKED, PLAIN); |
| w.writeDataPage(3, 4, BytesInput.from(BYTES2), EMPTY_STATS, BIT_PACKED, BIT_PACKED, PLAIN); |
| w.writeDataPage(1, 4, BytesInput.from(BYTES2), EMPTY_STATS, BIT_PACKED, BIT_PACKED, PLAIN); |
| w.endColumn(); |
| long c2Ends = w.getPos(); |
| w.endBlock(); |
| w.startBlock(4); |
| w.startColumn(C1, 7, CODEC); |
| w.writeDataPage(7, 4, BytesInput.from(BYTES3), EMPTY_STATS, BIT_PACKED, BIT_PACKED, PLAIN); |
| w.endColumn(); |
| w.startColumn(C2, 8, CODEC); |
| w.writeDataPage(8, 4, BytesInput.from(BYTES4), EMPTY_STATS, BIT_PACKED, BIT_PACKED, PLAIN); |
| w.endColumn(); |
| w.endBlock(); |
| w.end(new HashMap<String, String>()); |
| |
| ParquetMetadata readFooter = ParquetFileReader.readFooter(configuration, path); |
| assertEquals("footer: "+ readFooter, 2, readFooter.getBlocks().size()); |
| BlockMetaData rowGroup = readFooter.getBlocks().get(0); |
| assertEquals(c1Ends - c1Starts, rowGroup.getColumns().get(0).getTotalSize()); |
| assertEquals(c2Ends - c2Starts, rowGroup.getColumns().get(1).getTotalSize()); |
| assertEquals(c2Ends - c1Starts, rowGroup.getTotalByteSize()); |
| |
| assertEquals(c1Starts, rowGroup.getColumns().get(0).getStartingPos()); |
| assertEquals(0, rowGroup.getColumns().get(0).getDictionaryPageOffset()); |
| assertEquals(c1p1Starts, rowGroup.getColumns().get(0).getFirstDataPageOffset()); |
| assertEquals(c2Starts, rowGroup.getColumns().get(1).getStartingPos()); |
| assertEquals(c2Starts, rowGroup.getColumns().get(1).getDictionaryPageOffset()); |
| assertEquals(c2p1Starts, rowGroup.getColumns().get(1).getFirstDataPageOffset()); |
| |
| HashSet<Encoding> expectedEncoding=new HashSet<Encoding>(); |
| expectedEncoding.add(PLAIN); |
| expectedEncoding.add(BIT_PACKED); |
| assertEquals(expectedEncoding,rowGroup.getColumns().get(0).getEncodings()); |
| |
| { // read first block of col #1 |
| ParquetFileReader r = new ParquetFileReader(configuration, readFooter.getFileMetaData(), path, |
| Arrays.asList(rowGroup), Arrays.asList(SCHEMA.getColumnDescription(PATH1))); |
| PageReadStore pages = r.readNextRowGroup(); |
| assertEquals(3, pages.getRowCount()); |
| validateContains(SCHEMA, pages, PATH1, 2, BytesInput.from(BYTES1)); |
| validateContains(SCHEMA, pages, PATH1, 3, BytesInput.from(BYTES1)); |
| assertNull(r.readNextRowGroup()); |
| } |
| |
| { // read all blocks of col #1 and #2 |
| |
| ParquetFileReader r = new ParquetFileReader(configuration, readFooter.getFileMetaData(), path, |
| readFooter.getBlocks(), Arrays.asList(SCHEMA.getColumnDescription(PATH1), SCHEMA.getColumnDescription(PATH2))); |
| |
| PageReadStore pages = r.readNextRowGroup(); |
| assertEquals(3, pages.getRowCount()); |
| validateContains(SCHEMA, pages, PATH1, 2, BytesInput.from(BYTES1)); |
| validateContains(SCHEMA, pages, PATH1, 3, BytesInput.from(BYTES1)); |
| validateContains(SCHEMA, pages, PATH2, 2, BytesInput.from(BYTES2)); |
| validateContains(SCHEMA, pages, PATH2, 3, BytesInput.from(BYTES2)); |
| validateContains(SCHEMA, pages, PATH2, 1, BytesInput.from(BYTES2)); |
| |
| pages = r.readNextRowGroup(); |
| assertEquals(4, pages.getRowCount()); |
| |
| validateContains(SCHEMA, pages, PATH1, 7, BytesInput.from(BYTES3)); |
| validateContains(SCHEMA, pages, PATH2, 8, BytesInput.from(BYTES4)); |
| |
| assertNull(r.readNextRowGroup()); |
| } |
| PrintFooter.main(new String[] {path.toString()}); |
| } |
| |
| @Test |
| public void testWriteEmptyBlock() throws Exception { |
| File testFile = temp.newFile(); |
| testFile.delete(); |
| |
| Path path = new Path(testFile.toURI()); |
| Configuration configuration = new Configuration(); |
| |
| ParquetFileWriter w = new ParquetFileWriter(configuration, SCHEMA, path); |
| w.start(); |
| w.startBlock(0); |
| |
| TestUtils.assertThrows("End block with zero record", ParquetEncodingException.class, |
| (Callable<Void>) () -> { |
| w.endBlock(); |
| return null; |
| }); |
| } |
| |
| @Test |
| public void testBloomFilterWriteRead() throws Exception { |
| MessageType schema = MessageTypeParser.parseMessageType("message test { required binary foo; }"); |
| File testFile = temp.newFile(); |
| testFile.delete(); |
| Path path = new Path(testFile.toURI()); |
| Configuration configuration = new Configuration(); |
| configuration.set("parquet.bloom.filter.column.names", "foo"); |
| String[] colPath = {"foo"}; |
| ColumnDescriptor col = schema.getColumnDescription(colPath); |
| BinaryStatistics stats1 = new BinaryStatistics(); |
| ParquetFileWriter w = new ParquetFileWriter(configuration, schema, path); |
| w.start(); |
| w.startBlock(3); |
| w.startColumn(col, 5, CODEC); |
| w.writeDataPage(2, 4, BytesInput.from(BYTES1),stats1, BIT_PACKED, BIT_PACKED, PLAIN); |
| w.writeDataPage(3, 4, BytesInput.from(BYTES1),stats1, BIT_PACKED, BIT_PACKED, PLAIN); |
| w.endColumn(); |
| BloomFilter blockSplitBloomFilter = new BlockSplitBloomFilter(0); |
| blockSplitBloomFilter.insertHash(blockSplitBloomFilter.hash(Binary.fromString("hello"))); |
| blockSplitBloomFilter.insertHash(blockSplitBloomFilter.hash(Binary.fromString("world"))); |
| w.addBloomFilter("foo", blockSplitBloomFilter); |
| w.endBlock(); |
| w.end(new HashMap<>()); |
| ParquetMetadata readFooter = ParquetFileReader.readFooter(configuration, path); |
| ParquetFileReader r = new ParquetFileReader(configuration, readFooter.getFileMetaData(), path, |
| Arrays.asList(readFooter.getBlocks().get(0)), Arrays.asList(schema.getColumnDescription(colPath))); |
| BloomFilterReader bloomFilterReader = r.getBloomFilterDataReader(readFooter.getBlocks().get(0)); |
| BloomFilter bloomFilter = bloomFilterReader.readBloomFilter(readFooter.getBlocks().get(0).getColumns().get(0)); |
| assertTrue(bloomFilter.findHash(blockSplitBloomFilter.hash(Binary.fromString("hello")))); |
| assertTrue(bloomFilter.findHash(blockSplitBloomFilter.hash(Binary.fromString("world")))); |
| } |
| |
| @Test |
| public void testWriteReadDataPageV2() throws Exception { |
| File testFile = temp.newFile(); |
| testFile.delete(); |
| |
| Path path = new Path(testFile.toURI()); |
| Configuration configuration = new Configuration(); |
| |
| ParquetFileWriter w = new ParquetFileWriter(configuration, SCHEMA, path); |
| w.start(); |
| w.startBlock(14); |
| |
| BytesInput repLevels = BytesInput.fromInt(2); |
| BytesInput defLevels = BytesInput.fromInt(1); |
| BytesInput data = BytesInput.fromInt(3); |
| BytesInput data2 = BytesInput.fromInt(10); |
| |
| org.apache.parquet.column.statistics.Statistics<?> statsC1P1 = createStatistics("s", "z", C1); |
| org.apache.parquet.column.statistics.Statistics<?> statsC1P2 = createStatistics("b", "d", C1); |
| |
| w.startColumn(C1, 6, CODEC); |
| long c1Starts = w.getPos(); |
| w.writeDataPageV2(4, 1, 3, repLevels, defLevels, PLAIN, data, 4, statsC1P1); |
| w.writeDataPageV2(3, 0, 3, repLevels, defLevels, PLAIN, data, 4, statsC1P2); |
| w.endColumn(); |
| long c1Ends = w.getPos(); |
| |
| w.startColumn(C2, 5, CODEC); |
| long c2Starts = w.getPos(); |
| w.writeDataPageV2(5, 2, 3, repLevels, defLevels, PLAIN, data2, 4, EMPTY_STATS); |
| w.writeDataPageV2(2, 0, 2, repLevels, defLevels, PLAIN, data2, 4, EMPTY_STATS); |
| w.endColumn(); |
| long c2Ends = w.getPos(); |
| |
| w.endBlock(); |
| w.end(new HashMap<>()); |
| |
| ParquetMetadata readFooter = ParquetFileReader.readFooter(configuration, path); |
| assertEquals("footer: "+ readFooter, 1, readFooter.getBlocks().size()); |
| assertEquals(c1Ends - c1Starts, readFooter.getBlocks().get(0).getColumns().get(0).getTotalSize()); |
| assertEquals(c2Ends - c2Starts, readFooter.getBlocks().get(0).getColumns().get(1).getTotalSize()); |
| assertEquals(c2Ends - c1Starts, readFooter.getBlocks().get(0).getTotalByteSize()); |
| |
| //check for stats |
| org.apache.parquet.column.statistics.Statistics<?> expectedStats = createStatistics("b", "z", C1); |
| TestUtils.assertStatsValuesEqual(expectedStats, readFooter.getBlocks().get(0).getColumns().get(0).getStatistics()); |
| |
| HashSet<Encoding> expectedEncoding = new HashSet<Encoding>(); |
| expectedEncoding.add(PLAIN); |
| assertEquals(expectedEncoding, readFooter.getBlocks().get(0).getColumns().get(0).getEncodings()); |
| |
| ParquetFileReader reader = new ParquetFileReader(configuration, readFooter.getFileMetaData(), path, |
| readFooter.getBlocks(), Arrays.asList(SCHEMA.getColumnDescription(PATH1), SCHEMA.getColumnDescription(PATH2))); |
| |
| PageReadStore pages = reader.readNextRowGroup(); |
| assertEquals(14, pages.getRowCount()); |
| validateV2Page(SCHEMA, pages, PATH1, 3, 4, 1, repLevels.toByteArray(), defLevels.toByteArray(), data.toByteArray(), 12); |
| validateV2Page(SCHEMA, pages, PATH1, 3, 3, 0, repLevels.toByteArray(), defLevels.toByteArray(),data.toByteArray(), 12); |
| validateV2Page(SCHEMA, pages, PATH2, 3, 5, 2, repLevels.toByteArray(), defLevels.toByteArray(), data2.toByteArray(), 12); |
| validateV2Page(SCHEMA, pages, PATH2, 2, 2, 0, repLevels.toByteArray(), defLevels.toByteArray(), data2.toByteArray(), 12); |
| assertNull(reader.readNextRowGroup()); |
| } |
| |
| @Test |
| public void testAlignmentWithPadding() throws Exception { |
| File testFile = temp.newFile(); |
| |
| Path path = new Path(testFile.toURI()); |
| Configuration conf = new Configuration(); |
| // Disable writing out checksums as hardcoded byte offsets in assertions below expect it |
| conf.setBoolean(ParquetOutputFormat.PAGE_WRITE_CHECKSUM_ENABLED, false); |
| |
| // uses the test constructor |
| ParquetFileWriter w = new ParquetFileWriter(conf, SCHEMA, path, 120, 60); |
| |
| w.start(); |
| w.startBlock(3); |
| w.startColumn(C1, 5, CODEC); |
| long c1Starts = w.getPos(); |
| w.writeDataPage(2, 4, BytesInput.from(BYTES1), EMPTY_STATS, BIT_PACKED, BIT_PACKED, PLAIN); |
| w.writeDataPage(3, 4, BytesInput.from(BYTES1), EMPTY_STATS, BIT_PACKED, BIT_PACKED, PLAIN); |
| w.endColumn(); |
| long c1Ends = w.getPos(); |
| w.startColumn(C2, 6, CODEC); |
| long c2Starts = w.getPos(); |
| w.writeDataPage(2, 4, BytesInput.from(BYTES2), EMPTY_STATS, BIT_PACKED, BIT_PACKED, PLAIN); |
| w.writeDataPage(3, 4, BytesInput.from(BYTES2), EMPTY_STATS, BIT_PACKED, BIT_PACKED, PLAIN); |
| w.writeDataPage(1, 4, BytesInput.from(BYTES2), EMPTY_STATS, BIT_PACKED, BIT_PACKED, PLAIN); |
| w.endColumn(); |
| long c2Ends = w.getPos(); |
| w.endBlock(); |
| |
| long firstRowGroupEnds = w.getPos(); // should be 109 |
| |
| w.startBlock(4); |
| w.startColumn(C1, 7, CODEC); |
| w.writeDataPage(7, 4, BytesInput.from(BYTES3), EMPTY_STATS, BIT_PACKED, BIT_PACKED, PLAIN); |
| w.endColumn(); |
| w.startColumn(C2, 8, CODEC); |
| w.writeDataPage(8, 4, BytesInput.from(BYTES4), EMPTY_STATS, BIT_PACKED, BIT_PACKED, PLAIN); |
| w.endColumn(); |
| w.endBlock(); |
| |
| long secondRowGroupEnds = w.getPos(); |
| |
| w.end(new HashMap<String, String>()); |
| |
| FileSystem fs = path.getFileSystem(conf); |
| long fileLen = fs.getFileStatus(path).getLen(); |
| |
| FSDataInputStream data = fs.open(path); |
| data.seek(fileLen - 8); // 4-byte offset + "PAR1" |
| long footerLen = BytesUtils.readIntLittleEndian(data); |
| long startFooter = fileLen - footerLen - 8; |
| |
| assertEquals("Footer should start after second row group without padding", |
| secondRowGroupEnds, startFooter); |
| |
| ParquetMetadata readFooter = ParquetFileReader.readFooter(conf, path); |
| assertEquals("footer: "+ readFooter, 2, readFooter.getBlocks().size()); |
| assertEquals(c1Ends - c1Starts, readFooter.getBlocks().get(0).getColumns().get(0).getTotalSize()); |
| assertEquals(c2Ends - c2Starts, readFooter.getBlocks().get(0).getColumns().get(1).getTotalSize()); |
| assertEquals(c2Ends - c1Starts, readFooter.getBlocks().get(0).getTotalByteSize()); |
| HashSet<Encoding> expectedEncoding=new HashSet<Encoding>(); |
| expectedEncoding.add(PLAIN); |
| expectedEncoding.add(BIT_PACKED); |
| assertEquals(expectedEncoding,readFooter.getBlocks().get(0).getColumns().get(0).getEncodings()); |
| |
| // verify block starting positions with padding |
| assertEquals("First row group should start after magic", |
| 4, readFooter.getBlocks().get(0).getStartingPos()); |
| assertTrue("First row group should end before the block size (120)", |
| firstRowGroupEnds < 120); |
| assertEquals("Second row group should start at the block size", |
| 120, readFooter.getBlocks().get(1).getStartingPos()); |
| |
| { // read first block of col #1 |
| ParquetFileReader r = new ParquetFileReader(conf, readFooter.getFileMetaData(), path, |
| Arrays.asList(readFooter.getBlocks().get(0)), Arrays.asList(SCHEMA.getColumnDescription(PATH1))); |
| PageReadStore pages = r.readNextRowGroup(); |
| assertEquals(3, pages.getRowCount()); |
| validateContains(SCHEMA, pages, PATH1, 2, BytesInput.from(BYTES1)); |
| validateContains(SCHEMA, pages, PATH1, 3, BytesInput.from(BYTES1)); |
| assertNull(r.readNextRowGroup()); |
| } |
| |
| { // read all blocks of col #1 and #2 |
| |
| ParquetFileReader r = new ParquetFileReader(conf, readFooter.getFileMetaData(), path, |
| readFooter.getBlocks(), Arrays.asList(SCHEMA.getColumnDescription(PATH1), SCHEMA.getColumnDescription(PATH2))); |
| |
| PageReadStore pages = r.readNextRowGroup(); |
| assertEquals(3, pages.getRowCount()); |
| validateContains(SCHEMA, pages, PATH1, 2, BytesInput.from(BYTES1)); |
| validateContains(SCHEMA, pages, PATH1, 3, BytesInput.from(BYTES1)); |
| validateContains(SCHEMA, pages, PATH2, 2, BytesInput.from(BYTES2)); |
| validateContains(SCHEMA, pages, PATH2, 3, BytesInput.from(BYTES2)); |
| validateContains(SCHEMA, pages, PATH2, 1, BytesInput.from(BYTES2)); |
| |
| pages = r.readNextRowGroup(); |
| assertEquals(4, pages.getRowCount()); |
| |
| validateContains(SCHEMA, pages, PATH1, 7, BytesInput.from(BYTES3)); |
| validateContains(SCHEMA, pages, PATH2, 8, BytesInput.from(BYTES4)); |
| |
| assertNull(r.readNextRowGroup()); |
| } |
| PrintFooter.main(new String[] {path.toString()}); |
| } |
| |
| @Test |
| public void testAlignmentWithNoPaddingNeeded() throws Exception { |
| File testFile = temp.newFile(); |
| |
| Path path = new Path(testFile.toURI()); |
| Configuration conf = new Configuration(); |
| // Disable writing out checksums as hardcoded byte offsets in assertions below expect it |
| conf.setBoolean(ParquetOutputFormat.PAGE_WRITE_CHECKSUM_ENABLED, false); |
| |
| // uses the test constructor |
| ParquetFileWriter w = new ParquetFileWriter(conf, SCHEMA, path, 100, 50); |
| |
| w.start(); |
| w.startBlock(3); |
| w.startColumn(C1, 5, CODEC); |
| long c1Starts = w.getPos(); |
| w.writeDataPage(2, 4, BytesInput.from(BYTES1), EMPTY_STATS, BIT_PACKED, BIT_PACKED, PLAIN); |
| w.writeDataPage(3, 4, BytesInput.from(BYTES1), EMPTY_STATS, BIT_PACKED, BIT_PACKED, PLAIN); |
| w.endColumn(); |
| long c1Ends = w.getPos(); |
| w.startColumn(C2, 6, CODEC); |
| long c2Starts = w.getPos(); |
| w.writeDataPage(2, 4, BytesInput.from(BYTES2), EMPTY_STATS, BIT_PACKED, BIT_PACKED, PLAIN); |
| w.writeDataPage(3, 4, BytesInput.from(BYTES2), EMPTY_STATS, BIT_PACKED, BIT_PACKED, PLAIN); |
| w.writeDataPage(1, 4, BytesInput.from(BYTES2), EMPTY_STATS, BIT_PACKED, BIT_PACKED, PLAIN); |
| w.endColumn(); |
| long c2Ends = w.getPos(); |
| w.endBlock(); |
| |
| long firstRowGroupEnds = w.getPos(); // should be 109 |
| |
| w.startBlock(4); |
| w.startColumn(C1, 7, CODEC); |
| w.writeDataPage(7, 4, BytesInput.from(BYTES3), EMPTY_STATS, BIT_PACKED, BIT_PACKED, PLAIN); |
| w.endColumn(); |
| w.startColumn(C2, 8, CODEC); |
| w.writeDataPage(8, 4, BytesInput.from(BYTES4), EMPTY_STATS, BIT_PACKED, BIT_PACKED, PLAIN); |
| w.endColumn(); |
| w.endBlock(); |
| |
| long secondRowGroupEnds = w.getPos(); |
| |
| w.end(new HashMap<String, String>()); |
| |
| FileSystem fs = path.getFileSystem(conf); |
| long fileLen = fs.getFileStatus(path).getLen(); |
| |
| FSDataInputStream data = fs.open(path); |
| data.seek(fileLen - 8); // 4-byte offset + "PAR1" |
| long footerLen = BytesUtils.readIntLittleEndian(data); |
| long startFooter = fileLen - footerLen - 8; |
| |
| assertEquals("Footer should start after second row group without padding", |
| secondRowGroupEnds, startFooter); |
| |
| ParquetMetadata readFooter = ParquetFileReader.readFooter(conf, path); |
| assertEquals("footer: "+ readFooter, 2, readFooter.getBlocks().size()); |
| assertEquals(c1Ends - c1Starts, readFooter.getBlocks().get(0).getColumns().get(0).getTotalSize()); |
| assertEquals(c2Ends - c2Starts, readFooter.getBlocks().get(0).getColumns().get(1).getTotalSize()); |
| assertEquals(c2Ends - c1Starts, readFooter.getBlocks().get(0).getTotalByteSize()); |
| HashSet<Encoding> expectedEncoding=new HashSet<Encoding>(); |
| expectedEncoding.add(PLAIN); |
| expectedEncoding.add(BIT_PACKED); |
| assertEquals(expectedEncoding,readFooter.getBlocks().get(0).getColumns().get(0).getEncodings()); |
| |
| // verify block starting positions with padding |
| assertEquals("First row group should start after magic", |
| 4, readFooter.getBlocks().get(0).getStartingPos()); |
| assertTrue("First row group should end before the block size (120)", |
| firstRowGroupEnds > 100); |
| assertEquals("Second row group should start after no padding", |
| 109, readFooter.getBlocks().get(1).getStartingPos()); |
| |
| { // read first block of col #1 |
| ParquetFileReader r = new ParquetFileReader(conf, readFooter.getFileMetaData(), path, |
| Arrays.asList(readFooter.getBlocks().get(0)), Arrays.asList(SCHEMA.getColumnDescription(PATH1))); |
| PageReadStore pages = r.readNextRowGroup(); |
| assertEquals(3, pages.getRowCount()); |
| validateContains(SCHEMA, pages, PATH1, 2, BytesInput.from(BYTES1)); |
| validateContains(SCHEMA, pages, PATH1, 3, BytesInput.from(BYTES1)); |
| assertNull(r.readNextRowGroup()); |
| } |
| |
| { // read all blocks of col #1 and #2 |
| |
| ParquetFileReader r = new ParquetFileReader(conf, readFooter.getFileMetaData(), path, |
| readFooter.getBlocks(), Arrays.asList(SCHEMA.getColumnDescription(PATH1), SCHEMA.getColumnDescription(PATH2))); |
| |
| PageReadStore pages = r.readNextRowGroup(); |
| assertEquals(3, pages.getRowCount()); |
| validateContains(SCHEMA, pages, PATH1, 2, BytesInput.from(BYTES1)); |
| validateContains(SCHEMA, pages, PATH1, 3, BytesInput.from(BYTES1)); |
| validateContains(SCHEMA, pages, PATH2, 2, BytesInput.from(BYTES2)); |
| validateContains(SCHEMA, pages, PATH2, 3, BytesInput.from(BYTES2)); |
| validateContains(SCHEMA, pages, PATH2, 1, BytesInput.from(BYTES2)); |
| |
| pages = r.readNextRowGroup(); |
| assertEquals(4, pages.getRowCount()); |
| |
| validateContains(SCHEMA, pages, PATH1, 7, BytesInput.from(BYTES3)); |
| validateContains(SCHEMA, pages, PATH2, 8, BytesInput.from(BYTES4)); |
| |
| assertNull(r.readNextRowGroup()); |
| } |
| PrintFooter.main(new String[] {path.toString()}); |
| } |
| |
| @Test |
| public void testConvertToThriftStatistics() throws Exception { |
| long[] longArray = new long[] {39L, 99L, 12L, 1000L, 65L, 542L, 2533461316L, -253346131996L, Long.MAX_VALUE, Long.MIN_VALUE}; |
| LongStatistics parquetMRstats = new LongStatistics(); |
| |
| for (long l: longArray) { |
| parquetMRstats.updateStats(l); |
| } |
| final String createdBy = |
| "parquet-mr version 1.8.0 (build d4d5a07ec9bd262ca1e93c309f1d7d4a74ebda4c)"; |
| Statistics thriftStats = |
| org.apache.parquet.format.converter.ParquetMetadataConverter.toParquetStatistics(parquetMRstats); |
| LongStatistics convertedBackStats = |
| (LongStatistics) org.apache.parquet.format.converter.ParquetMetadataConverter.fromParquetStatistics( |
| createdBy, thriftStats, PrimitiveTypeName.INT64); |
| |
| assertEquals(parquetMRstats.getMax(), convertedBackStats.getMax()); |
| assertEquals(parquetMRstats.getMin(), convertedBackStats.getMin()); |
| assertEquals(parquetMRstats.getNumNulls(), convertedBackStats.getNumNulls()); |
| } |
| |
| @Test |
| public void testWriteReadStatistics() throws Exception { |
| // this test assumes statistics will be read |
| Assume.assumeTrue(!shouldIgnoreStatistics(Version.FULL_VERSION, BINARY)); |
| |
| File testFile = temp.newFile(); |
| testFile.delete(); |
| |
| Path path = new Path(testFile.toURI()); |
| Configuration configuration = new Configuration(); |
| configuration.setBoolean("parquet.strings.signed-min-max.enabled", true); |
| |
| MessageType schema = MessageTypeParser.parseMessageType("message m { required group a {required binary b (UTF8);} required group c { required int64 d; }}"); |
| String[] path1 = {"a", "b"}; |
| ColumnDescriptor c1 = schema.getColumnDescription(path1); |
| String[] path2 = {"c", "d"}; |
| ColumnDescriptor c2 = schema.getColumnDescription(path2); |
| |
| byte[] bytes1 = { 0, 1, 2, 3}; |
| byte[] bytes2 = { 1, 2, 3, 4}; |
| byte[] bytes3 = { 2, 3, 4, 5}; |
| byte[] bytes4 = { 3, 4, 5, 6}; |
| CompressionCodecName codec = CompressionCodecName.UNCOMPRESSED; |
| |
| BinaryStatistics statsB1C1P1 = new BinaryStatistics(); |
| BinaryStatistics statsB1C1P2 = new BinaryStatistics(); |
| LongStatistics statsB1C2P1 = new LongStatistics(); |
| LongStatistics statsB1C2P2 = new LongStatistics(); |
| BinaryStatistics statsB2C1P1 = new BinaryStatistics(); |
| LongStatistics statsB2C2P1 = new LongStatistics(); |
| statsB1C1P1.setMinMax(Binary.fromString("s"), Binary.fromString("z")); |
| statsB1C1P2.setMinMax(Binary.fromString("a"), Binary.fromString("b")); |
| statsB1C2P1.setMinMax(2l, 10l); |
| statsB1C2P2.setMinMax(-6l, 4l); |
| statsB2C1P1.setMinMax(Binary.fromString("d"), Binary.fromString("e")); |
| statsB2C2P1.setMinMax(11l, 122l); |
| |
| ParquetFileWriter w = new ParquetFileWriter(configuration, schema, path); |
| w.start(); |
| w.startBlock(3); |
| w.startColumn(c1, 5, codec); |
| w.writeDataPage(2, 4, BytesInput.from(bytes1), statsB1C1P1, BIT_PACKED, BIT_PACKED, PLAIN); |
| w.writeDataPage(3, 4, BytesInput.from(bytes1), statsB1C1P2, BIT_PACKED, BIT_PACKED, PLAIN); |
| w.endColumn(); |
| w.startColumn(c2, 6, codec); |
| w.writeDataPage(3, 4, BytesInput.from(bytes2), statsB1C2P1, BIT_PACKED, BIT_PACKED, PLAIN); |
| w.writeDataPage(1, 4, BytesInput.from(bytes2), statsB1C2P2, BIT_PACKED, BIT_PACKED, PLAIN); |
| w.endColumn(); |
| w.endBlock(); |
| |
| w.startBlock(4); |
| w.startColumn(c1, 7, codec); |
| w.writeDataPage(7, 4, BytesInput.from(bytes3), statsB2C1P1, BIT_PACKED, BIT_PACKED, PLAIN); |
| w.endColumn(); |
| w.startColumn(c2, 8, codec); |
| w.writeDataPage(8, 4, BytesInput.from(bytes4), statsB2C2P1, BIT_PACKED, BIT_PACKED, PLAIN); |
| w.endColumn(); |
| w.endBlock(); |
| w.end(new HashMap<String, String>()); |
| |
| ParquetMetadata readFooter = ParquetFileReader.readFooter(configuration, path); |
| for (BlockMetaData block : readFooter.getBlocks()) { |
| for (ColumnChunkMetaData col : block.getColumns()) { |
| col.getPath(); |
| } |
| } |
| // correct statistics |
| BinaryStatistics bs1 = new BinaryStatistics(); |
| bs1.setMinMax(Binary.fromString("a"), Binary.fromString("z")); |
| LongStatistics ls1 = new LongStatistics(); |
| ls1.setMinMax(-6l, 10l); |
| |
| BinaryStatistics bs2 = new BinaryStatistics(); |
| bs2.setMinMax(Binary.fromString("d"), Binary.fromString("e")); |
| LongStatistics ls2 = new LongStatistics(); |
| ls2.setMinMax(11l, 122l); |
| |
| { // assert stats are correct for the first block |
| BinaryStatistics bsout = (BinaryStatistics)readFooter.getBlocks().get(0).getColumns().get(0).getStatistics(); |
| String str = new String(bsout.getMaxBytes()); |
| String str2 = new String(bsout.getMinBytes()); |
| |
| TestUtils.assertStatsValuesEqual(bs1, readFooter.getBlocks().get(0).getColumns().get(0).getStatistics()); |
| TestUtils.assertStatsValuesEqual(ls1, readFooter.getBlocks().get(0).getColumns().get(1).getStatistics()); |
| } |
| { // assert stats are correct for the second block |
| TestUtils.assertStatsValuesEqual(bs2, readFooter.getBlocks().get(1).getColumns().get(0).getStatistics()); |
| TestUtils.assertStatsValuesEqual(ls2, readFooter.getBlocks().get(1).getColumns().get(1).getStatistics()); |
| } |
| } |
| |
| @Test |
| public void testMetaDataFile() throws Exception { |
| |
| File testDir = temp.newFolder(); |
| |
| Path testDirPath = new Path(testDir.toURI()); |
| Configuration configuration = new Configuration(); |
| |
| final FileSystem fs = testDirPath.getFileSystem(configuration); |
| enforceEmptyDir(configuration, testDirPath); |
| |
| MessageType schema = MessageTypeParser.parseMessageType("message m { required group a {required binary b;} required group c { required int64 d; }}"); |
| createFile(configuration, new Path(testDirPath, "part0"), schema); |
| createFile(configuration, new Path(testDirPath, "part1"), schema); |
| createFile(configuration, new Path(testDirPath, "part2"), schema); |
| |
| FileStatus outputStatus = fs.getFileStatus(testDirPath); |
| List<Footer> footers = ParquetFileReader.readFooters(configuration, outputStatus, false); |
| validateFooters(footers); |
| ParquetFileWriter.writeMetadataFile(configuration, testDirPath, footers, JobSummaryLevel.ALL); |
| |
| footers = ParquetFileReader.readFooters(configuration, outputStatus, false); |
| validateFooters(footers); |
| footers = ParquetFileReader.readFooters(configuration, fs.getFileStatus(new Path(testDirPath, "part0")), false); |
| assertEquals(1, footers.size()); |
| |
| final FileStatus metadataFile = fs.getFileStatus(new Path(testDirPath, ParquetFileWriter.PARQUET_METADATA_FILE)); |
| final FileStatus metadataFileLight = fs.getFileStatus(new Path(testDirPath, ParquetFileWriter.PARQUET_COMMON_METADATA_FILE)); |
| final List<Footer> metadata = ParquetFileReader.readSummaryFile(configuration, metadataFile); |
| |
| validateFooters(metadata); |
| |
| footers = ParquetFileReader.readAllFootersInParallelUsingSummaryFiles(configuration, Arrays.asList(fs.listStatus(testDirPath, HiddenFileFilter.INSTANCE)), false); |
| validateFooters(footers); |
| |
| fs.delete(metadataFile.getPath(), false); |
| fs.delete(metadataFileLight.getPath(), false); |
| |
| footers = ParquetFileReader.readAllFootersInParallelUsingSummaryFiles(configuration, Arrays.asList(fs.listStatus(testDirPath)), false); |
| validateFooters(footers); |
| |
| } |
| |
| @Test |
| public void testWriteReadStatisticsAllNulls() throws Exception { |
| // this test assumes statistics will be read |
| Assume.assumeTrue(!shouldIgnoreStatistics(Version.FULL_VERSION, BINARY)); |
| |
| File testFile = temp.newFile(); |
| testFile.delete(); |
| |
| writeSchema = "message example {\n" + |
| "required binary content (UTF8);\n" + |
| "}"; |
| |
| Path path = new Path(testFile.toURI()); |
| |
| MessageType schema = MessageTypeParser.parseMessageType(writeSchema); |
| Configuration configuration = new Configuration(); |
| configuration.setBoolean("parquet.strings.signed-min-max.enabled", true); |
| GroupWriteSupport.setSchema(schema, configuration); |
| |
| ParquetWriter<Group> writer = new ParquetWriter<Group>(path, configuration, new GroupWriteSupport()); |
| |
| Group r1 = new SimpleGroup(schema); |
| writer.write(r1); |
| writer.close(); |
| |
| ParquetMetadata readFooter = ParquetFileReader.readFooter(configuration, path); |
| |
| // assert the statistics object is not empty |
| org.apache.parquet.column.statistics.Statistics stats = readFooter.getBlocks().get(0).getColumns().get(0).getStatistics(); |
| assertFalse("is empty: " + stats, stats.isEmpty()); |
| // assert the number of nulls are correct for the first block |
| assertEquals("nulls: " + stats, 1, stats.getNumNulls()); |
| } |
| |
| private void validateFooters(final List<Footer> metadata) { |
| LOG.debug("{}", metadata); |
| assertEquals(String.valueOf(metadata), 3, metadata.size()); |
| for (Footer footer : metadata) { |
| final File file = new File(footer.getFile().toUri()); |
| assertTrue(file.getName(), file.getName().startsWith("part")); |
| assertTrue(file.getPath(), file.exists()); |
| final ParquetMetadata parquetMetadata = footer.getParquetMetadata(); |
| assertEquals(2, parquetMetadata.getBlocks().size()); |
| final Map<String, String> keyValueMetaData = parquetMetadata.getFileMetaData().getKeyValueMetaData(); |
| assertEquals("bar", keyValueMetaData.get("foo")); |
| assertEquals(footer.getFile().getName(), keyValueMetaData.get(footer.getFile().getName())); |
| } |
| } |
| |
| |
| private void createFile(Configuration configuration, Path path, MessageType schema) throws IOException { |
| String[] path1 = {"a", "b"}; |
| ColumnDescriptor c1 = schema.getColumnDescription(path1); |
| String[] path2 = {"c", "d"}; |
| ColumnDescriptor c2 = schema.getColumnDescription(path2); |
| |
| byte[] bytes1 = { 0, 1, 2, 3}; |
| byte[] bytes2 = { 1, 2, 3, 4}; |
| byte[] bytes3 = { 2, 3, 4, 5}; |
| byte[] bytes4 = { 3, 4, 5, 6}; |
| CompressionCodecName codec = CompressionCodecName.UNCOMPRESSED; |
| |
| BinaryStatistics stats1 = new BinaryStatistics(); |
| BinaryStatistics stats2 = new BinaryStatistics(); |
| |
| ParquetFileWriter w = new ParquetFileWriter(configuration, schema, path); |
| w.start(); |
| w.startBlock(3); |
| w.startColumn(c1, 5, codec); |
| w.writeDataPage(2, 4, BytesInput.from(bytes1), stats1, BIT_PACKED, BIT_PACKED, PLAIN); |
| w.writeDataPage(3, 4, BytesInput.from(bytes1), stats1, BIT_PACKED, BIT_PACKED, PLAIN); |
| w.endColumn(); |
| w.startColumn(c2, 6, codec); |
| w.writeDataPage(2, 4, BytesInput.from(bytes2), stats2, BIT_PACKED, BIT_PACKED, PLAIN); |
| w.writeDataPage(3, 4, BytesInput.from(bytes2), stats2, BIT_PACKED, BIT_PACKED, PLAIN); |
| w.writeDataPage(1, 4, BytesInput.from(bytes2), stats2, BIT_PACKED, BIT_PACKED, PLAIN); |
| w.endColumn(); |
| w.endBlock(); |
| w.startBlock(4); |
| w.startColumn(c1, 7, codec); |
| w.writeDataPage(7, 4, BytesInput.from(bytes3), stats1, BIT_PACKED, BIT_PACKED, PLAIN); |
| w.endColumn(); |
| w.startColumn(c2, 8, codec); |
| w.writeDataPage(8, 4, BytesInput.from(bytes4), stats2, BIT_PACKED, BIT_PACKED, PLAIN); |
| w.endColumn(); |
| w.endBlock(); |
| final HashMap<String, String> extraMetaData = new HashMap<String, String>(); |
| extraMetaData.put("foo", "bar"); |
| extraMetaData.put(path.getName(), path.getName()); |
| w.end(extraMetaData); |
| } |
| |
| private void validateV2Page(MessageType schema, PageReadStore pages, String[] path, int values, int rows, int nullCount, |
| byte[] repetition, byte[] definition, byte[] data, int uncompressedSize) throws IOException { |
| PageReader pageReader = pages.getPageReader(schema.getColumnDescription(path)); |
| DataPageV2 page = (DataPageV2)pageReader.readPage(); |
| assertEquals(values, page.getValueCount()); |
| assertEquals(rows, page.getRowCount()); |
| assertEquals(nullCount, page.getNullCount()); |
| assertEquals(uncompressedSize, page.getUncompressedSize()); |
| assertArrayEquals(repetition, page.getRepetitionLevels().toByteArray()); |
| assertArrayEquals(definition, page.getDefinitionLevels().toByteArray()); |
| assertArrayEquals(data, page.getData().toByteArray()); |
| } |
| |
| private org.apache.parquet.column.statistics.Statistics<?> createStatistics(String min, String max, ColumnDescriptor col) { |
| return org.apache.parquet.column.statistics.Statistics .getBuilderForReading(col.getPrimitiveType()) |
| .withMin(Binary.fromString(min).getBytes()).withMax(Binary.fromString(max).getBytes()).withNumNulls(0) |
| .build(); |
| } |
| |
| private void validateContains(MessageType schema, PageReadStore pages, String[] path, int values, BytesInput bytes) throws IOException { |
| PageReader pageReader = pages.getPageReader(schema.getColumnDescription(path)); |
| DataPage page = pageReader.readPage(); |
| assertEquals(values, page.getValueCount()); |
| assertArrayEquals(bytes.toByteArray(), ((DataPageV1)page).getBytes().toByteArray()); |
| } |
| |
| @Test |
| public void testMergeMetadata() { |
| FileMetaData md1 = new FileMetaData( |
| new MessageType("root1", |
| new PrimitiveType(REPEATED, BINARY, "a"), |
| new PrimitiveType(OPTIONAL, BINARY, "b")), |
| new HashMap<String, String>(), "test"); |
| FileMetaData md2 = new FileMetaData( |
| new MessageType("root2", |
| new PrimitiveType(REQUIRED, BINARY, "c")), |
| new HashMap<String, String>(), "test2"); |
| GlobalMetaData merged = ParquetFileWriter.mergeInto(md2, ParquetFileWriter.mergeInto(md1, null)); |
| assertEquals( |
| merged.getSchema(), |
| new MessageType("root1", |
| new PrimitiveType(REPEATED, BINARY, "a"), |
| new PrimitiveType(OPTIONAL, BINARY, "b"), |
| new PrimitiveType(REQUIRED, BINARY, "c")) |
| ); |
| |
| } |
| |
| @Test |
| public void testMergeFooters() { |
| List<BlockMetaData> oneBlocks = new ArrayList<BlockMetaData>(); |
| oneBlocks.add(new BlockMetaData()); |
| oneBlocks.add(new BlockMetaData()); |
| List<BlockMetaData> twoBlocks = new ArrayList<BlockMetaData>(); |
| twoBlocks.add(new BlockMetaData()); |
| List<BlockMetaData> expected = new ArrayList<BlockMetaData>(); |
| expected.addAll(oneBlocks); |
| expected.addAll(twoBlocks); |
| |
| Footer one = new Footer(new Path("file:/tmp/output/one.parquet"), |
| new ParquetMetadata(new FileMetaData( |
| new MessageType("root1", |
| new PrimitiveType(REPEATED, BINARY, "a"), |
| new PrimitiveType(OPTIONAL, BINARY, "b")), |
| new HashMap<String, String>(), "test"), |
| oneBlocks)); |
| |
| Footer two = new Footer(new Path("/tmp/output/two.parquet"), |
| new ParquetMetadata(new FileMetaData( |
| new MessageType("root2", |
| new PrimitiveType(REQUIRED, BINARY, "c")), |
| new HashMap<String, String>(), "test2"), |
| twoBlocks)); |
| |
| List<Footer> footers = new ArrayList<Footer>(); |
| footers.add(one); |
| footers.add(two); |
| |
| ParquetMetadata merged = ParquetFileWriter.mergeFooters( |
| new Path("/tmp"), footers); |
| |
| assertEquals( |
| new MessageType("root1", |
| new PrimitiveType(REPEATED, BINARY, "a"), |
| new PrimitiveType(OPTIONAL, BINARY, "b"), |
| new PrimitiveType(REQUIRED, BINARY, "c")), |
| merged.getFileMetaData().getSchema()); |
| |
| assertEquals("Should have all blocks", expected, merged.getBlocks()); |
| } |
| |
| /** |
| * {@link ParquetFileWriter#mergeFooters(Path, List)} expects a fully-qualified |
| * path for the root and crashes if a relative one is provided. |
| */ |
| @Test |
| public void testWriteMetadataFileWithRelativeOutputPath() throws IOException { |
| Configuration conf = new Configuration(); |
| FileSystem fs = FileSystem.get(conf); |
| Path relativeRoot = new Path("target/_test_relative"); |
| Path qualifiedRoot = fs.makeQualified(relativeRoot); |
| |
| ParquetMetadata mock = Mockito.mock(ParquetMetadata.class); |
| FileMetaData fileMetaData = new FileMetaData( |
| new MessageType("root1", |
| new PrimitiveType(REPEATED, BINARY, "a")), |
| new HashMap<String, String>(), "test"); |
| Mockito.when(mock.getFileMetaData()).thenReturn(fileMetaData); |
| |
| List<Footer> footers = new ArrayList<Footer>(); |
| Footer footer = new Footer(new Path(qualifiedRoot, "one"), mock); |
| footers.add(footer); |
| |
| // This should not throw an exception |
| ParquetFileWriter.writeMetadataFile(conf, relativeRoot, footers, JobSummaryLevel.ALL); |
| } |
| |
| @Test |
| public void testColumnIndexWriteRead() throws Exception { |
| File testFile = temp.newFile(); |
| testFile.delete(); |
| |
| Path path = new Path(testFile.toURI()); |
| Configuration configuration = new Configuration(); |
| |
| ParquetFileWriter w = new ParquetFileWriter(configuration, SCHEMA, path); |
| w.start(); |
| w.startBlock(4); |
| w.startColumn(C1, 7, CODEC); |
| w.writeDataPage(7, 4, BytesInput.from(BYTES3), EMPTY_STATS, BIT_PACKED, BIT_PACKED, PLAIN); |
| w.endColumn(); |
| w.startColumn(C2, 8, CODEC); |
| w.writeDataPage(8, 4, BytesInput.from(BYTES4), EMPTY_STATS, BIT_PACKED, BIT_PACKED, PLAIN); |
| w.endColumn(); |
| w.endBlock(); |
| w.startBlock(4); |
| w.startColumn(C1, 5, CODEC); |
| long c1p1Starts = w.getPos(); |
| w.writeDataPage(2, 4, BytesInput.from(BYTES1), statsC1(null, Binary.fromString("aaa")), 1, BIT_PACKED, BIT_PACKED, |
| PLAIN); |
| long c1p2Starts = w.getPos(); |
| w.writeDataPage(3, 4, BytesInput.from(BYTES1), statsC1(Binary.fromString("bbb"), Binary.fromString("ccc")), 3, |
| BIT_PACKED, BIT_PACKED, PLAIN); |
| w.endColumn(); |
| long c1Ends = w.getPos(); |
| w.startColumn(C2, 6, CODEC); |
| long c2p1Starts = w.getPos(); |
| w.writeDataPage(2, 4, BytesInput.from(BYTES2), statsC2(117l, 100l), 1, BIT_PACKED, BIT_PACKED, PLAIN); |
| long c2p2Starts = w.getPos(); |
| w.writeDataPage(3, 4, BytesInput.from(BYTES2), statsC2(null, null, null), 2, BIT_PACKED, BIT_PACKED, PLAIN); |
| long c2p3Starts = w.getPos(); |
| w.writeDataPage(1, 4, BytesInput.from(BYTES2), statsC2(0l), 1, BIT_PACKED, BIT_PACKED, PLAIN); |
| w.endColumn(); |
| long c2Ends = w.getPos(); |
| w.endBlock(); |
| w.startBlock(4); |
| w.startColumn(C1, 7, CODEC); |
| w.writeDataPage(7, 4, BytesInput.from(BYTES3), |
| // Creating huge stats so the column index will reach the limit and won't be written |
| statsC1( |
| Binary.fromConstantByteArray(new byte[(int) MAX_STATS_SIZE]), |
| Binary.fromConstantByteArray(new byte[1])), |
| 4, BIT_PACKED, BIT_PACKED, PLAIN); |
| w.endColumn(); |
| w.startColumn(C2, 8, CODEC); |
| w.writeDataPage(8, 4, BytesInput.from(BYTES4), EMPTY_STATS, BIT_PACKED, BIT_PACKED, PLAIN); |
| w.endColumn(); |
| w.endBlock(); |
| w.end(new HashMap<String, String>()); |
| |
| try (ParquetFileReader reader = new ParquetFileReader(HadoopInputFile.fromPath(path, configuration), |
| ParquetReadOptions.builder().build())) { |
| ParquetMetadata footer = reader.getFooter(); |
| assertEquals(3, footer.getBlocks().size()); |
| BlockMetaData blockMeta = footer.getBlocks().get(1); |
| assertEquals(2, blockMeta.getColumns().size()); |
| |
| ColumnIndex columnIndex = reader.readColumnIndex(blockMeta.getColumns().get(0)); |
| assertEquals(BoundaryOrder.ASCENDING, columnIndex.getBoundaryOrder()); |
| assertTrue(Arrays.asList(1l, 0l).equals(columnIndex.getNullCounts())); |
| assertTrue(Arrays.asList(false, false).equals(columnIndex.getNullPages())); |
| List<ByteBuffer> minValues = columnIndex.getMinValues(); |
| assertEquals(2, minValues.size()); |
| List<ByteBuffer> maxValues = columnIndex.getMaxValues(); |
| assertEquals(2, maxValues.size()); |
| assertEquals("aaa", new String(minValues.get(0).array(), StandardCharsets.UTF_8)); |
| assertEquals("aaa", new String(maxValues.get(0).array(), StandardCharsets.UTF_8)); |
| assertEquals("bbb", new String(minValues.get(1).array(), StandardCharsets.UTF_8)); |
| assertEquals("ccc", new String(maxValues.get(1).array(), StandardCharsets.UTF_8)); |
| |
| columnIndex = reader.readColumnIndex(blockMeta.getColumns().get(1)); |
| assertEquals(BoundaryOrder.DESCENDING, columnIndex.getBoundaryOrder()); |
| assertTrue(Arrays.asList(0l, 3l, 0l).equals(columnIndex.getNullCounts())); |
| assertTrue(Arrays.asList(false, true, false).equals(columnIndex.getNullPages())); |
| minValues = columnIndex.getMinValues(); |
| assertEquals(3, minValues.size()); |
| maxValues = columnIndex.getMaxValues(); |
| assertEquals(3, maxValues.size()); |
| assertEquals(100, BytesUtils.bytesToLong(minValues.get(0).array())); |
| assertEquals(117, BytesUtils.bytesToLong(maxValues.get(0).array())); |
| assertEquals(0, minValues.get(1).array().length); |
| assertEquals(0, maxValues.get(1).array().length); |
| assertEquals(0, BytesUtils.bytesToLong(minValues.get(2).array())); |
| assertEquals(0, BytesUtils.bytesToLong(maxValues.get(2).array())); |
| |
| OffsetIndex offsetIndex = reader.readOffsetIndex(blockMeta.getColumns().get(0)); |
| assertEquals(2, offsetIndex.getPageCount()); |
| assertEquals(c1p1Starts, offsetIndex.getOffset(0)); |
| assertEquals(c1p2Starts, offsetIndex.getOffset(1)); |
| assertEquals(c1p2Starts - c1p1Starts, offsetIndex.getCompressedPageSize(0)); |
| assertEquals(c1Ends - c1p2Starts, offsetIndex.getCompressedPageSize(1)); |
| assertEquals(0, offsetIndex.getFirstRowIndex(0)); |
| assertEquals(1, offsetIndex.getFirstRowIndex(1)); |
| |
| offsetIndex = reader.readOffsetIndex(blockMeta.getColumns().get(1)); |
| assertEquals(3, offsetIndex.getPageCount()); |
| assertEquals(c2p1Starts, offsetIndex.getOffset(0)); |
| assertEquals(c2p2Starts, offsetIndex.getOffset(1)); |
| assertEquals(c2p3Starts, offsetIndex.getOffset(2)); |
| assertEquals(c2p2Starts - c2p1Starts, offsetIndex.getCompressedPageSize(0)); |
| assertEquals(c2p3Starts - c2p2Starts, offsetIndex.getCompressedPageSize(1)); |
| assertEquals(c2Ends - c2p3Starts, offsetIndex.getCompressedPageSize(2)); |
| assertEquals(0, offsetIndex.getFirstRowIndex(0)); |
| assertEquals(1, offsetIndex.getFirstRowIndex(1)); |
| assertEquals(3, offsetIndex.getFirstRowIndex(2)); |
| |
| assertNull(reader.readColumnIndex(footer.getBlocks().get(2).getColumns().get(0))); |
| } |
| } |
| |
| @Test |
| public void testMergeMetadataWithConflictingKeyValues() { |
| Map<String, String> keyValues1 = new HashMap<String, String>() {{ |
| put("a", "b"); |
| }}; |
| Map<String, String> keyValues2 = new HashMap<String, String>() {{ |
| put("a", "c"); |
| }}; |
| FileMetaData md1 = new FileMetaData( |
| new MessageType("root1", |
| new PrimitiveType(REPEATED, BINARY, "a"), |
| new PrimitiveType(OPTIONAL, BINARY, "b")), |
| keyValues1, "test"); |
| FileMetaData md2 = new FileMetaData( |
| new MessageType("root1", |
| new PrimitiveType(REPEATED, BINARY, "a"), |
| new PrimitiveType(OPTIONAL, BINARY, "b")), |
| keyValues2, "test"); |
| GlobalMetaData merged = ParquetFileWriter.mergeInto(md2, ParquetFileWriter.mergeInto(md1, null)); |
| try { |
| merged.merge(new StrictKeyValueMetadataMergeStrategy()); |
| fail("Merge metadata is expected to fail because of conflicting key values"); |
| } catch (RuntimeException e) { |
| // expected because of conflicting values |
| assertTrue(e.getMessage().contains("could not merge metadata")); |
| } |
| |
| Map<String, String> mergedKeyValues = merged.merge(new ConcatenatingKeyValueMetadataMergeStrategy()).getKeyValueMetaData(); |
| assertEquals(1, mergedKeyValues.size()); |
| String mergedValue = mergedKeyValues.get("a"); |
| assertTrue(mergedValue.equals("b,c") || mergedValue.equals("c,b")); |
| } |
| |
| @Test |
| public void testMergeMetadataWithNoConflictingKeyValues() { |
| Map<String, String> keyValues1 = new HashMap<String, String>() {{ |
| put("a", "b"); |
| }}; |
| Map<String, String> keyValues2 = new HashMap<String, String>() {{ |
| put("c", "d"); |
| }}; |
| FileMetaData md1 = new FileMetaData( |
| new MessageType("root1", |
| new PrimitiveType(REPEATED, BINARY, "a"), |
| new PrimitiveType(OPTIONAL, BINARY, "b")), |
| keyValues1, "test"); |
| FileMetaData md2 = new FileMetaData( |
| new MessageType("root1", |
| new PrimitiveType(REPEATED, BINARY, "a"), |
| new PrimitiveType(OPTIONAL, BINARY, "b")), |
| keyValues2, "test"); |
| GlobalMetaData merged = ParquetFileWriter.mergeInto(md2, ParquetFileWriter.mergeInto(md1, null)); |
| Map<String, String> mergedValues = merged.merge(new StrictKeyValueMetadataMergeStrategy()).getKeyValueMetaData(); |
| assertEquals("b", mergedValues.get("a")); |
| assertEquals("d", mergedValues.get("c")); |
| } |
| |
| private org.apache.parquet.column.statistics.Statistics<?> statsC1(Binary... values) { |
| org.apache.parquet.column.statistics.Statistics<?> stats = org.apache.parquet.column.statistics.Statistics |
| .createStats(C1.getPrimitiveType()); |
| for (Binary value : values) { |
| if (value == null) { |
| stats.incrementNumNulls(); |
| } else { |
| stats.updateStats(value); |
| } |
| } |
| return stats; |
| } |
| |
| private org.apache.parquet.column.statistics.Statistics<?> statsC2(Long... values) { |
| org.apache.parquet.column.statistics.Statistics<?> stats = org.apache.parquet.column.statistics.Statistics |
| .createStats(C2.getPrimitiveType()); |
| for (Long value : values) { |
| if (value == null) { |
| stats.incrementNumNulls(); |
| } else { |
| stats.updateStats(value); |
| } |
| } |
| return stats; |
| } |
| } |