blob: b8f707d21c34acb15c76d0c9801de7b786b9cfde [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.parquet.hadoop;
import java.io.Closeable;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.zip.CRC32;
import org.apache.parquet.bytes.BytesInput;
import org.apache.parquet.bytes.CapacityByteArrayOutputStream;
import org.apache.parquet.column.ColumnDescriptor;
import org.apache.parquet.column.Encoding;
import org.apache.parquet.column.page.DictionaryPage;
import org.apache.parquet.column.page.PageWriteStore;
import org.apache.parquet.column.page.PageWriter;
import org.apache.parquet.column.statistics.Statistics;
import org.apache.parquet.format.converter.ParquetMetadataConverter;
import org.apache.parquet.hadoop.CodecFactory.BytesCompressor;
import org.apache.parquet.internal.column.columnindex.ColumnIndexBuilder;
import org.apache.parquet.internal.column.columnindex.OffsetIndexBuilder;
import org.apache.parquet.io.ParquetEncodingException;
import org.apache.parquet.schema.MessageType;
import org.apache.parquet.bytes.ByteBufferAllocator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* This is a copy of ColumnChunkPageWriteStore from parquet library except of OutputStream that is used here.
* Using of CapacityByteArrayOutputStream allows to use different ByteBuffer allocators.
* It will be no need in this class once PARQUET-1006 is resolved.
*/
public class ParquetColumnChunkPageWriteStore implements PageWriteStore, Closeable {
private static final Logger logger = LoggerFactory.getLogger(ParquetColumnChunkPageWriteStore.class);
private static ParquetMetadataConverter parquetMetadataConverter = new ParquetMetadataConverter();
private final Map<ColumnDescriptor, ColumnChunkPageWriter> writers = new HashMap<>();
private final MessageType schema;
public ParquetColumnChunkPageWriteStore(BytesCompressor compressor,
MessageType schema,
int initialSlabSize,
int maxCapacityHint,
ByteBufferAllocator allocator,
boolean pageWriteChecksumEnabled,
int columnIndexTruncateLength) {
this.schema = schema;
for (ColumnDescriptor path : schema.getColumns()) {
writers.put(path, new ColumnChunkPageWriter(path, compressor, initialSlabSize,
maxCapacityHint, allocator, pageWriteChecksumEnabled, columnIndexTruncateLength));
}
}
@Override
public PageWriter getPageWriter(ColumnDescriptor path) {
return writers.get(path);
}
/**
* Writes the column chunks in the corresponding row group
* @param writer the parquet file writer
* @throws IOException if the file can not be created
*/
public void flushToFileWriter(ParquetFileWriter writer) throws IOException {
for (ColumnDescriptor path : schema.getColumns()) {
ColumnChunkPageWriter pageWriter = writers.get(path);
pageWriter.writeToFileWriter(writer);
}
}
@Override
public void close() {
for (ColumnChunkPageWriter pageWriter : writers.values()) {
pageWriter.close();
}
}
private static final class ColumnChunkPageWriter implements PageWriter, Closeable {
private final ColumnDescriptor path;
private final BytesCompressor compressor;
private final CapacityByteArrayOutputStream buf;
private DictionaryPage dictionaryPage;
private long uncompressedLength;
private long compressedLength;
private long totalValueCount;
private int pageCount;
// repetition and definition level encodings are used only for v1 pages and don't change
private Set<Encoding> rlEncodings = new HashSet<>();
private Set<Encoding> dlEncodings = new HashSet<>();
private List<Encoding> dataEncodings = new ArrayList<>();
private ColumnIndexBuilder columnIndexBuilder;
private OffsetIndexBuilder offsetIndexBuilder;
private Statistics totalStatistics;
private final CRC32 crc;
boolean pageWriteChecksumEnabled;
private ColumnChunkPageWriter(ColumnDescriptor path,
BytesCompressor compressor,
int initialSlabSize,
int maxCapacityHint,
ByteBufferAllocator allocator,
boolean pageWriteChecksumEnabled,
int columnIndexTruncateLength) {
this.path = path;
this.compressor = compressor;
this.buf = new CapacityByteArrayOutputStream(initialSlabSize, maxCapacityHint, allocator);
this.totalStatistics = Statistics.createStats(this.path.getPrimitiveType());
this.columnIndexBuilder = ColumnIndexBuilder.getBuilder(path.getPrimitiveType(), columnIndexTruncateLength);
this.offsetIndexBuilder = OffsetIndexBuilder.getBuilder();
this.pageWriteChecksumEnabled = pageWriteChecksumEnabled;
this.crc = pageWriteChecksumEnabled ? new CRC32() : null;
}
@Override
public void writePage(BytesInput bytesInput, int valueCount, Statistics<?> statistics, Encoding rlEncoding,
Encoding dlEncoding, Encoding valuesEncoding) throws IOException {
// Setting the builders to the no-op ones so no column/offset indexes will be written for this column chunk
columnIndexBuilder = ColumnIndexBuilder.getNoOpBuilder();
offsetIndexBuilder = OffsetIndexBuilder.getNoOpBuilder();
writePage(bytesInput, valueCount, -1, statistics, rlEncoding, dlEncoding, valuesEncoding);
}
@Override
public void writePage(BytesInput bytes, int valueCount, int rowCount, Statistics statistics,
Encoding rlEncoding, Encoding dlEncoding, Encoding valuesEncoding) throws IOException {
long uncompressedSize = bytes.size();
if (uncompressedSize > Integer.MAX_VALUE) {
throw new ParquetEncodingException(
"Cannot write page larger than Integer.MAX_VALUE bytes: " +
uncompressedSize);
}
BytesInput compressedBytes = compressor.compress(bytes);
long compressedSize = compressedBytes.size();
if (compressedSize > Integer.MAX_VALUE) {
throw new ParquetEncodingException(
"Cannot write compressed page larger than Integer.MAX_VALUE bytes: "
+ compressedSize);
}
if (pageWriteChecksumEnabled) {
crc.reset();
crc.update(compressedBytes.toByteArray());
parquetMetadataConverter.writeDataPageV1Header((int) uncompressedSize, (int) compressedSize,
valueCount, rlEncoding, dlEncoding, valuesEncoding, (int) crc.getValue(), buf);
} else {
parquetMetadataConverter.writeDataPageV1Header((int) uncompressedSize, (int) compressedSize,
valueCount, rlEncoding, dlEncoding, valuesEncoding, buf);
}
this.uncompressedLength += uncompressedSize;
this.compressedLength += compressedSize;
this.totalValueCount += valueCount;
this.pageCount += 1;
addStatistics(statistics);
offsetIndexBuilder.add(toIntWithCheck(buf.size() + compressedSize), rowCount);
compressedBytes.writeAllTo(buf);
rlEncodings.add(rlEncoding);
dlEncodings.add(dlEncoding);
dataEncodings.add(valuesEncoding);
}
private void addStatistics(Statistics statistics) {
// Copying the statistics if it is not initialized yet so we have the correct typed one
if (totalStatistics == null) {
totalStatistics = statistics.copy();
} else {
totalStatistics.mergeStatistics(statistics);
}
columnIndexBuilder.add(statistics);
}
@Override
public void writePageV2(int rowCount,
int nullCount,
int valueCount,
BytesInput repetitionLevels,
BytesInput definitionLevels,
Encoding dataEncoding,
BytesInput data,
Statistics<?> statistics) throws IOException {
int rlByteLength = toIntWithCheck(repetitionLevels.size());
int dlByteLength = toIntWithCheck(definitionLevels.size());
int uncompressedSize = toIntWithCheck(
data.size() + repetitionLevels.size() + definitionLevels.size()
);
BytesInput compressedData = compressor.compress(data);
int compressedSize = toIntWithCheck(
compressedData.size() + repetitionLevels.size() + definitionLevels.size()
);
parquetMetadataConverter.writeDataPageV2Header(
uncompressedSize, compressedSize,
valueCount, nullCount, rowCount,
statistics,
dataEncoding,
rlByteLength,
dlByteLength,
buf);
this.uncompressedLength += uncompressedSize;
this.compressedLength += compressedSize;
this.totalValueCount += valueCount;
this.pageCount += 1;
addStatistics(statistics);
offsetIndexBuilder.add(toIntWithCheck(buf.size() + compressedSize), rowCount);
repetitionLevels.writeAllTo(buf);
definitionLevels.writeAllTo(buf);
compressedData.writeAllTo(buf);
dataEncodings.add(dataEncoding);
}
private int toIntWithCheck(long size) {
if (size > Integer.MAX_VALUE) {
throw new ParquetEncodingException(
"Cannot write page larger than " + Integer.MAX_VALUE + " bytes: " +
size);
}
return (int)size;
}
@Override
public long getMemSize() {
return buf.size();
}
/**
* Writes a number of pages within corresponding column chunk
* @param writer the parquet file writer
* @throws IOException if the file can not be created
*/
public void writeToFileWriter(ParquetFileWriter writer) throws IOException {
writer.writeColumnChunk(path, totalValueCount, compressor.getCodecName(),
dictionaryPage, BytesInput.from(buf), uncompressedLength, compressedLength, totalStatistics,
columnIndexBuilder, offsetIndexBuilder, rlEncodings, dlEncodings, dataEncodings);
if (logger.isDebugEnabled()) {
logger.debug(
String.format(
"written %,dB for %s: %,d values, %,dB raw, %,dB comp, %d pages, encodings: %s",
buf.size(), path, totalValueCount, uncompressedLength, compressedLength, pageCount, new HashSet<>(dataEncodings))
+ (dictionaryPage != null ? String.format(
", dic { %,d entries, %,dB raw, %,dB comp}",
dictionaryPage.getDictionarySize(), dictionaryPage.getUncompressedSize(), dictionaryPage.getDictionarySize())
: "")
);
}
rlEncodings.clear();
dlEncodings.clear();
dataEncodings.clear();
pageCount = 0;
}
@Override
public long allocatedSize() {
return buf.getCapacity();
}
@Override
public void writeDictionaryPage(DictionaryPage dictionaryPage) throws IOException {
if (this.dictionaryPage != null) {
throw new ParquetEncodingException("Only one dictionary page is allowed");
}
BytesInput dictionaryBytes = dictionaryPage.getBytes();
int uncompressedSize = (int)dictionaryBytes.size();
BytesInput compressedBytes = compressor.compress(dictionaryBytes);
this.dictionaryPage = new DictionaryPage(BytesInput.copy(compressedBytes), uncompressedSize,
dictionaryPage.getDictionarySize(), dictionaryPage.getEncoding());
}
@Override
public String memUsageString(String prefix) {
return buf.memUsageString(prefix + " ColumnChunkPageWriter");
}
@Override
public void close() {
buf.close();
}
}
}