blob: c77674d8c26ef1535f99624e111add0ecdef9842 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.parquet.hadoop.util;
import org.apache.parquet.ParquetReadOptions;
import org.apache.parquet.bytes.BytesInput;
import org.apache.parquet.column.ColumnDescriptor;
import org.apache.parquet.column.ParquetProperties;
import org.apache.parquet.column.impl.ColumnReadStoreImpl;
import org.apache.parquet.column.page.DictionaryPage;
import org.apache.parquet.column.page.PageReadStore;
import org.apache.parquet.column.statistics.Statistics;
import org.apache.parquet.compression.CompressionCodecFactory;
import org.apache.parquet.format.DataPageHeader;
import org.apache.parquet.format.DataPageHeaderV2;
import org.apache.parquet.format.DictionaryPageHeader;
import org.apache.parquet.format.PageHeader;
import org.apache.parquet.format.Util;
import org.apache.parquet.format.converter.ParquetMetadataConverter;
import org.apache.parquet.hadoop.ParquetFileReader;
import org.apache.parquet.hadoop.ParquetFileWriter;
import org.apache.parquet.hadoop.metadata.BlockMetaData;
import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData;
import org.apache.parquet.hadoop.metadata.ColumnPath;
import org.apache.parquet.hadoop.metadata.CompressionCodecName;
import org.apache.parquet.hadoop.metadata.ParquetMetadata;
import org.apache.parquet.internal.column.columnindex.ColumnIndex;
import org.apache.parquet.internal.column.columnindex.OffsetIndex;
import org.apache.parquet.io.InputFile;
import org.apache.parquet.io.ParquetEncodingException;
import org.apache.parquet.io.SeekableInputStream;
import org.apache.parquet.io.api.Converter;
import org.apache.parquet.io.api.GroupConverter;
import org.apache.parquet.io.api.PrimitiveConverter;
import org.apache.parquet.schema.MessageType;
import org.apache.parquet.schema.PrimitiveType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
public class CompressionConverter {
private static final Logger LOG = LoggerFactory.getLogger(CompressionConverter.class);
private final int pageBufferSize = ParquetProperties.DEFAULT_PAGE_SIZE * 2;
private byte[] pageBuffer;
public CompressionConverter() {
this.pageBuffer = new byte[pageBufferSize];
}
public void processBlocks(TransParquetFileReader reader, ParquetFileWriter writer, ParquetMetadata meta, MessageType schema,
String createdBy, CompressionCodecName codecName) throws IOException {
int blockIndex = 0;
PageReadStore store = reader.readNextRowGroup();
while (store != null) {
writer.startBlock(store.getRowCount());
BlockMetaData blockMetaData = meta.getBlocks().get(blockIndex);
List<ColumnChunkMetaData> columnsInOrder = blockMetaData.getColumns();
Map<ColumnPath, ColumnDescriptor> descriptorsMap = schema.getColumns().stream().collect(
Collectors.toMap(x -> ColumnPath.get(x.getPath()), x -> x));
for (int i = 0; i < columnsInOrder.size(); i += 1) {
ColumnChunkMetaData chunk = columnsInOrder.get(i);
ColumnReadStoreImpl crstore = new ColumnReadStoreImpl(store, new DummyGroupConverter(), schema, createdBy);
ColumnDescriptor columnDescriptor = descriptorsMap.get(chunk.getPath());
writer.startColumn(columnDescriptor, crstore.getColumnReader(columnDescriptor).getTotalValueCount(), codecName);
processChunk(reader, writer, chunk, createdBy, codecName);
writer.endColumn();
}
writer.endBlock();
store = reader.readNextRowGroup();
blockIndex++;
}
}
private void processChunk(TransParquetFileReader reader, ParquetFileWriter writer, ColumnChunkMetaData chunk,
String createdBy, CompressionCodecName codecName) throws IOException {
CompressionCodecFactory codecFactory = HadoopCodecs.newFactory(0);
CompressionCodecFactory.BytesInputDecompressor decompressor = codecFactory.getDecompressor(chunk.getCodec());
CompressionCodecFactory.BytesInputCompressor compressor = codecFactory.getCompressor(codecName);
ColumnIndex columnIndex = reader.readColumnIndex(chunk);
OffsetIndex offsetIndex = reader.readOffsetIndex(chunk);
reader.setStreamPosition(chunk.getStartingPos());
DictionaryPage dictionaryPage = null;
long readValues = 0;
Statistics statistics = null;
ParquetMetadataConverter converter = new ParquetMetadataConverter();
int pageIndex = 0;
long totalChunkValues = chunk.getValueCount();
while (readValues < totalChunkValues) {
PageHeader pageHeader = reader.readPageHeader();
int compressedPageSize = pageHeader.getCompressed_page_size();
byte[] pageLoad;
switch (pageHeader.type) {
case DICTIONARY_PAGE:
if (dictionaryPage != null) {
throw new IOException("has more than one dictionary page in column chunk");
}
DictionaryPageHeader dictPageHeader = pageHeader.dictionary_page_header;
pageLoad = translatePageLoad(reader, true, compressor, decompressor, pageHeader.getCompressed_page_size(), pageHeader.getUncompressed_page_size());
writer.writeDictionaryPage(new DictionaryPage(BytesInput.from(pageLoad),
pageHeader.getUncompressed_page_size(),
dictPageHeader.getNum_values(),
converter.getEncoding(dictPageHeader.getEncoding())));
break;
case DATA_PAGE:
DataPageHeader headerV1 = pageHeader.data_page_header;
pageLoad = translatePageLoad(reader, true, compressor, decompressor, pageHeader.getCompressed_page_size(), pageHeader.getUncompressed_page_size());
statistics = convertStatistics(createdBy, chunk.getPrimitiveType(), headerV1.getStatistics(), columnIndex, pageIndex, converter);
readValues += headerV1.getNum_values();
if (offsetIndex != null) {
long rowCount = 1 + offsetIndex.getLastRowIndex(pageIndex, totalChunkValues) - offsetIndex.getFirstRowIndex(pageIndex);
writer.writeDataPage(toIntWithCheck(headerV1.getNum_values()),
pageHeader.getUncompressed_page_size(),
BytesInput.from(pageLoad),
statistics,
toIntWithCheck(rowCount),
converter.getEncoding(headerV1.getRepetition_level_encoding()),
converter.getEncoding(headerV1.getDefinition_level_encoding()),
converter.getEncoding(headerV1.getEncoding()));
} else {
writer.writeDataPage(toIntWithCheck(headerV1.getNum_values()),
pageHeader.getUncompressed_page_size(),
BytesInput.from(pageLoad),
statistics,
converter.getEncoding(headerV1.getRepetition_level_encoding()),
converter.getEncoding(headerV1.getDefinition_level_encoding()),
converter.getEncoding(headerV1.getEncoding()));
}
pageIndex++;
break;
case DATA_PAGE_V2:
DataPageHeaderV2 headerV2 = pageHeader.data_page_header_v2;
int rlLength = headerV2.getRepetition_levels_byte_length();
BytesInput rlLevels = readBlockAllocate(rlLength, reader);
int dlLength = headerV2.getDefinition_levels_byte_length();
BytesInput dlLevels = readBlockAllocate(dlLength, reader);
int payLoadLength = pageHeader.getCompressed_page_size() - rlLength - dlLength;
int rawDataLength = pageHeader.getUncompressed_page_size() - rlLength - dlLength;
pageLoad = translatePageLoad(reader, headerV2.is_compressed, compressor, decompressor, payLoadLength, rawDataLength);
statistics = convertStatistics(createdBy, chunk.getPrimitiveType(), headerV2.getStatistics(), columnIndex, pageIndex, converter);
readValues += headerV2.getNum_values();
writer.writeDataPageV2(headerV2.getNum_rows(),
headerV2.getNum_nulls(),
headerV2.getNum_values(),
rlLevels,
dlLevels,
converter.getEncoding(headerV2.getEncoding()),
BytesInput.from(pageLoad),
rawDataLength,
statistics);
pageIndex++;
break;
default:
LOG.debug("skipping page of type {} of size {}", pageHeader.getType(), compressedPageSize);
break;
}
}
}
private Statistics convertStatistics(String createdBy, PrimitiveType type, org.apache.parquet.format.Statistics pageStatistics,
ColumnIndex columnIndex, int pageIndex, ParquetMetadataConverter converter) throws IOException {
if (columnIndex != null) {
if (columnIndex.getNullPages() == null) {
throw new IOException("columnIndex has null variable 'nullPages' which indicates corrupted data for type: " + type.getName());
}
if (pageIndex > columnIndex.getNullPages().size()) {
throw new IOException("There are more pages " + pageIndex + " found in the column than in the columnIndex " + columnIndex.getNullPages().size());
}
org.apache.parquet.column.statistics.Statistics.Builder statsBuilder = org.apache.parquet.column.statistics.Statistics.getBuilderForReading(type);
statsBuilder.withNumNulls(columnIndex.getNullCounts().get(pageIndex));
if (!columnIndex.getNullPages().get(pageIndex)) {
statsBuilder.withMin(columnIndex.getMinValues().get(pageIndex).array().clone());
statsBuilder.withMax(columnIndex.getMaxValues().get(pageIndex).array().clone());
}
return statsBuilder.build();
} else if (pageStatistics != null) {
return converter.fromParquetStatistics(createdBy, pageStatistics, type);
} else {
return null;
}
}
private byte[] translatePageLoad(TransParquetFileReader reader, boolean isCompressed, CompressionCodecFactory.BytesInputCompressor compressor,
CompressionCodecFactory.BytesInputDecompressor decompressor, int payloadLength, int rawDataLength) throws IOException {
BytesInput data = readBlock(payloadLength, reader);
if (isCompressed) {
data = decompressor.decompress(data, rawDataLength);
}
BytesInput newCompressedData = compressor.compress(data);
return newCompressedData.toByteArray();
}
public BytesInput readBlock(int length, TransParquetFileReader reader) throws IOException {
byte[] data;
if (length > pageBufferSize) {
data = new byte[length];
} else {
data = pageBuffer;
}
reader.blockRead(data, 0, length);
return BytesInput.from(data, 0, length);
}
public BytesInput readBlockAllocate(int length, TransParquetFileReader reader) throws IOException {
byte[] data = new byte[length];
reader.blockRead(data, 0, length);
return BytesInput.from(data, 0, length);
}
private int toIntWithCheck(long size) {
if ((int)size != size) {
throw new ParquetEncodingException("size is bigger than " + Integer.MAX_VALUE + " bytes: " + size);
}
return (int)size;
}
private static final class DummyGroupConverter extends GroupConverter {
@Override public void start() {}
@Override public void end() {}
@Override public Converter getConverter(int fieldIndex) { return new DummyConverter(); }
}
private static final class DummyConverter extends PrimitiveConverter {
@Override public GroupConverter asGroupConverter() { return new DummyGroupConverter(); }
}
public static final class TransParquetFileReader extends ParquetFileReader {
public TransParquetFileReader(InputFile file, ParquetReadOptions options) throws IOException {
super(file, options);
}
public void setStreamPosition(long newPos) throws IOException {
f.seek(newPos);
}
public void blockRead(byte[] data, int start, int len) throws IOException {
f.readFully(data, start, len);
}
public PageHeader readPageHeader() throws IOException {
return Util.readPageHeader(f);
}
public long getPos() throws IOException {
return f.getPos();
}
public SeekableInputStream getStream() {
return f;
}
}
}