blob: 88c8d83ea14dfb342973fb4c24c565b8933462e4 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.parquet.hadoop;
import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.eq;
import static org.mockito.Matchers.isNull;
import static org.mockito.Matchers.same;
import static org.mockito.Mockito.inOrder;
import static org.apache.parquet.column.Encoding.PLAIN;
import static org.apache.parquet.column.Encoding.RLE;
import static org.apache.parquet.format.converter.ParquetMetadataConverter.NO_FILTER;
import static org.apache.parquet.hadoop.metadata.CompressionCodecName.GZIP;
import static org.apache.parquet.hadoop.metadata.CompressionCodecName.UNCOMPRESSED;
import static org.apache.parquet.schema.OriginalType.UTF8;
import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.BINARY;
import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.DOUBLE;
import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.FLOAT;
import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.INT32;
import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.INT64;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.util.HashMap;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.parquet.column.ParquetProperties;
import org.junit.Before;
import org.junit.Test;
import org.mockito.InOrder;
import org.mockito.Mockito;
import org.apache.parquet.bytes.BytesInput;
import org.apache.parquet.bytes.LittleEndianDataInputStream;
import org.apache.parquet.column.ColumnDescriptor;
import org.apache.parquet.column.Encoding;
import org.apache.parquet.column.page.DataPageV2;
import org.apache.parquet.column.page.DictionaryPage;
import org.apache.parquet.column.page.PageReadStore;
import org.apache.parquet.column.page.PageReader;
import org.apache.parquet.column.page.PageWriter;
import org.apache.parquet.column.statistics.BinaryStatistics;
import org.apache.parquet.column.statistics.Statistics;
import org.apache.parquet.hadoop.ParquetFileWriter.Mode;
import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData;
import org.apache.parquet.hadoop.metadata.CompressionCodecName;
import org.apache.parquet.hadoop.metadata.ParquetMetadata;
import org.apache.parquet.hadoop.util.HadoopOutputFile;
import org.apache.parquet.internal.column.columnindex.ColumnIndex;
import org.apache.parquet.internal.column.columnindex.ColumnIndexBuilder;
import org.apache.parquet.internal.column.columnindex.OffsetIndex;
import org.apache.parquet.internal.column.columnindex.OffsetIndexBuilder;
import org.apache.parquet.io.OutputFile;
import org.apache.parquet.io.PositionOutputStream;
import org.apache.parquet.schema.MessageType;
import org.apache.parquet.schema.MessageTypeParser;
import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName;
import org.apache.parquet.schema.Types;
import org.apache.parquet.bytes.HeapByteBufferAllocator;
public class TestColumnChunkPageWriteStore {
// OutputFile implementation to expose the PositionOutputStream internally used by the writer
private static class OutputFileForTesting implements OutputFile {
private PositionOutputStream out;
private final HadoopOutputFile file;
OutputFileForTesting(Path path, Configuration conf) throws IOException {
file = HadoopOutputFile.fromPath(path, conf);
}
PositionOutputStream out() {
return out;
}
@Override
public PositionOutputStream create(long blockSizeHint) throws IOException {
return out = file.create(blockSizeHint);
}
@Override
public PositionOutputStream createOrOverwrite(long blockSizeHint) throws IOException {
return out = file.createOrOverwrite(blockSizeHint);
}
@Override
public boolean supportsBlockSize() {
return file.supportsBlockSize();
}
@Override
public long defaultBlockSize() {
return file.defaultBlockSize();
}
}
private int pageSize = 1024;
private int initialSize = 1024;
private Configuration conf;
@Before
public void initConfiguration() {
this.conf = new Configuration();
}
@Test
public void test() throws Exception {
Path file = new Path("target/test/TestColumnChunkPageWriteStore/test.parquet");
Path root = file.getParent();
FileSystem fs = file.getFileSystem(conf);
if (fs.exists(root)) {
fs.delete(root, true);
}
fs.mkdirs(root);
MessageType schema = MessageTypeParser.parseMessageType("message test { repeated binary bar; }");
ColumnDescriptor col = schema.getColumns().get(0);
Encoding dataEncoding = PLAIN;
int valueCount = 10;
int d = 1;
int r = 2;
int v = 3;
BytesInput definitionLevels = BytesInput.fromInt(d);
BytesInput repetitionLevels = BytesInput.fromInt(r);
Statistics<?> statistics = Statistics.getBuilderForReading(Types.required(PrimitiveTypeName.BINARY).named("test_binary"))
.build();
BytesInput data = BytesInput.fromInt(v);
int rowCount = 5;
int nullCount = 1;
statistics.incrementNumNulls(nullCount);
statistics.setMinMaxFromBytes(new byte[] {0, 1, 2}, new byte[] {0, 1, 2, 3});
long pageOffset;
long pageSize;
{
OutputFileForTesting outputFile = new OutputFileForTesting(file, conf);
ParquetFileWriter writer = new ParquetFileWriter(outputFile, schema, Mode.CREATE,
ParquetWriter.DEFAULT_BLOCK_SIZE, ParquetWriter.MAX_PADDING_SIZE_DEFAULT);
writer.start();
writer.startBlock(rowCount);
pageOffset = outputFile.out().getPos();
{
ColumnChunkPageWriteStore store = new ColumnChunkPageWriteStore(compressor(GZIP), schema,
new HeapByteBufferAllocator(), Integer.MAX_VALUE);
PageWriter pageWriter = store.getPageWriter(col);
pageWriter.writePageV2(
rowCount, nullCount, valueCount,
repetitionLevels, definitionLevels,
dataEncoding, data,
statistics);
store.flushToFileWriter(writer);
pageSize = outputFile.out().getPos() - pageOffset;
}
writer.endBlock();
writer.end(new HashMap<String, String>());
}
{
ParquetMetadata footer = ParquetFileReader.readFooter(conf, file, NO_FILTER);
ParquetFileReader reader = new ParquetFileReader(
conf, footer.getFileMetaData(), file, footer.getBlocks(), schema.getColumns());
PageReadStore rowGroup = reader.readNextRowGroup();
PageReader pageReader = rowGroup.getPageReader(col);
DataPageV2 page = (DataPageV2)pageReader.readPage();
assertEquals(rowCount, page.getRowCount());
assertEquals(nullCount, page.getNullCount());
assertEquals(valueCount, page.getValueCount());
assertEquals(d, intValue(page.getDefinitionLevels()));
assertEquals(r, intValue(page.getRepetitionLevels()));
assertEquals(dataEncoding, page.getDataEncoding());
assertEquals(v, intValue(page.getData()));
// Checking column/offset indexes for the one page
ColumnChunkMetaData column = footer.getBlocks().get(0).getColumns().get(0);
ColumnIndex columnIndex = reader.readColumnIndex(column);
assertArrayEquals(statistics.getMinBytes(), columnIndex.getMinValues().get(0).array());
assertArrayEquals(statistics.getMaxBytes(), columnIndex.getMaxValues().get(0).array());
assertEquals(statistics.getNumNulls(), columnIndex.getNullCounts().get(0).longValue());
assertFalse(columnIndex.getNullPages().get(0));
OffsetIndex offsetIndex = reader.readOffsetIndex(column);
assertEquals(1, offsetIndex.getPageCount());
assertEquals(pageSize, offsetIndex.getCompressedPageSize(0));
assertEquals(0, offsetIndex.getFirstRowIndex(0));
assertEquals(pageOffset, offsetIndex.getOffset(0));
reader.close();
}
}
private int intValue(BytesInput in) throws IOException {
ByteArrayOutputStream baos = new ByteArrayOutputStream();
in.writeAllTo(baos);
LittleEndianDataInputStream os = new LittleEndianDataInputStream(new ByteArrayInputStream(baos.toByteArray()));
int i = os.readInt();
os.close();
return i;
}
@Test
public void testColumnOrderV1() throws IOException {
ParquetFileWriter mockFileWriter = Mockito.mock(ParquetFileWriter.class);
InOrder inOrder = inOrder(mockFileWriter);
MessageType schema = Types.buildMessage()
.required(BINARY).as(UTF8).named("a_string")
.required(INT32).named("an_int")
.required(INT64).named("a_long")
.required(FLOAT).named("a_float")
.required(DOUBLE).named("a_double")
.named("order_test");
BytesInput fakeData = BytesInput.fromInt(34);
int fakeCount = 3;
BinaryStatistics fakeStats = new BinaryStatistics();
// TODO - look back at this, an allocator was being passed here in the ByteBuffer changes
// see comment at this constructor
ColumnChunkPageWriteStore store = new ColumnChunkPageWriteStore(
compressor(UNCOMPRESSED), schema, new HeapByteBufferAllocator(), Integer.MAX_VALUE);
for (ColumnDescriptor col : schema.getColumns()) {
PageWriter pageWriter = store.getPageWriter(col);
pageWriter.writePage(fakeData, fakeCount, fakeStats, RLE, RLE, PLAIN);
}
// flush to the mock writer
store.flushToFileWriter(mockFileWriter);
for (ColumnDescriptor col : schema.getColumns()) {
inOrder.verify(mockFileWriter).writeColumnChunk(
eq(col),
eq((long) fakeCount),
eq(UNCOMPRESSED),
isNull(DictionaryPage.class),
any(),
eq(fakeData.size()),
eq(fakeData.size()),
eq(fakeStats),
same(ColumnIndexBuilder.getNoOpBuilder()), // Deprecated writePage -> no column index
same(OffsetIndexBuilder.getNoOpBuilder()), // Deprecated writePage -> no offset index
any(),
any(),
any());
}
}
private CodecFactory.BytesCompressor compressor(CompressionCodecName codec) {
return new CodecFactory(conf, pageSize).getCompressor(codec);
}
}