blob: a246a52c73dfa3100a87eeaa3ef820c598e32016 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.parquet.hadoop;
import static org.apache.parquet.format.Util.writeFileCryptoMetaData;
import static org.apache.parquet.format.Util.writeFileMetaData;
import static org.apache.parquet.format.converter.ParquetMetadataConverter.MAX_STATS_SIZE;
import static org.apache.parquet.hadoop.ParquetWriter.DEFAULT_BLOCK_SIZE;
import static org.apache.parquet.hadoop.ParquetWriter.MAX_PADDING_SIZE_DEFAULT;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import java.util.zip.CRC32;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.parquet.Preconditions;
import org.apache.parquet.Version;
import org.apache.parquet.bytes.BytesInput;
import org.apache.parquet.bytes.BytesUtils;
import org.apache.parquet.column.ColumnDescriptor;
import org.apache.parquet.column.Encoding;
import org.apache.parquet.column.EncodingStats;
import org.apache.parquet.column.ParquetProperties;
import org.apache.parquet.column.page.DictionaryPage;
import org.apache.parquet.column.statistics.Statistics;
import org.apache.parquet.column.values.bloomfilter.BloomFilter;
import org.apache.parquet.crypto.AesCipher;
import org.apache.parquet.crypto.ColumnEncryptionProperties;
import org.apache.parquet.crypto.FileEncryptionProperties;
import org.apache.parquet.crypto.InternalColumnEncryptionSetup;
import org.apache.parquet.crypto.InternalFileEncryptor;
import org.apache.parquet.crypto.ModuleCipherFactory;
import org.apache.parquet.crypto.ModuleCipherFactory.ModuleType;
import org.apache.parquet.crypto.ParquetCryptoRuntimeException;
import org.apache.parquet.hadoop.ParquetOutputFormat.JobSummaryLevel;
import org.apache.parquet.hadoop.metadata.ColumnPath;
import org.apache.parquet.format.BlockCipher;
import org.apache.parquet.format.Util;
import org.apache.parquet.format.converter.ParquetMetadataConverter;
import org.apache.parquet.hadoop.metadata.BlockMetaData;
import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData;
import org.apache.parquet.hadoop.metadata.CompressionCodecName;
import org.apache.parquet.hadoop.metadata.StrictKeyValueMetadataMergeStrategy;
import org.apache.parquet.hadoop.metadata.FileMetaData;
import org.apache.parquet.hadoop.metadata.GlobalMetaData;
import org.apache.parquet.hadoop.metadata.KeyValueMetadataMergeStrategy;
import org.apache.parquet.hadoop.metadata.ParquetMetadata;
import org.apache.parquet.hadoop.util.HadoopOutputFile;
import org.apache.parquet.hadoop.util.HadoopStreams;
import org.apache.parquet.internal.column.columnindex.ColumnIndex;
import org.apache.parquet.internal.column.columnindex.ColumnIndexBuilder;
import org.apache.parquet.internal.column.columnindex.OffsetIndex;
import org.apache.parquet.internal.column.columnindex.OffsetIndexBuilder;
import org.apache.parquet.internal.hadoop.metadata.IndexReference;
import org.apache.parquet.io.InputFile;
import org.apache.parquet.io.OutputFile;
import org.apache.parquet.io.SeekableInputStream;
import org.apache.parquet.io.ParquetEncodingException;
import org.apache.parquet.io.PositionOutputStream;
import org.apache.parquet.schema.MessageType;
import org.apache.parquet.schema.PrimitiveType;
import org.apache.parquet.schema.TypeUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Internal implementation of the Parquet file writer as a block container
*/
public class ParquetFileWriter {
private static final Logger LOG = LoggerFactory.getLogger(ParquetFileWriter.class);
private final ParquetMetadataConverter metadataConverter;
public static final String PARQUET_METADATA_FILE = "_metadata";
public static final String MAGIC_STR = "PAR1";
public static final byte[] MAGIC = MAGIC_STR.getBytes(StandardCharsets.US_ASCII);
public static final String EF_MAGIC_STR = "PARE";
public static final byte[] EFMAGIC = EF_MAGIC_STR.getBytes(StandardCharsets.US_ASCII);
public static final String PARQUET_COMMON_METADATA_FILE = "_common_metadata";
public static final int CURRENT_VERSION = 1;
// File creation modes
public static enum Mode {
CREATE,
OVERWRITE
}
protected final PositionOutputStream out;
private final MessageType schema;
private final AlignmentStrategy alignment;
private final int columnIndexTruncateLength;
// file data
private List<BlockMetaData> blocks = new ArrayList<BlockMetaData>();
// The column/offset indexes per blocks per column chunks
private final List<List<ColumnIndex>> columnIndexes = new ArrayList<>();
private final List<List<OffsetIndex>> offsetIndexes = new ArrayList<>();
// The Bloom filters
private final List<Map<String, BloomFilter>> bloomFilters = new ArrayList<>();
// The file encryptor
private final InternalFileEncryptor fileEncryptor;
// row group data
private BlockMetaData currentBlock; // appended to by endColumn
// The column/offset indexes for the actual block
private List<ColumnIndex> currentColumnIndexes;
private List<OffsetIndex> currentOffsetIndexes;
// The Bloom filter for the actual block
private Map<String, BloomFilter> currentBloomFilters;
// row group data set at the start of a row group
private long currentRecordCount; // set in startBlock
// column chunk data accumulated as pages are written
private EncodingStats.Builder encodingStatsBuilder;
private Set<Encoding> currentEncodings;
private long uncompressedLength;
private long compressedLength;
private Statistics currentStatistics; // accumulated in writePage(s)
private ColumnIndexBuilder columnIndexBuilder;
private OffsetIndexBuilder offsetIndexBuilder;
// column chunk data set at the start of a column
private CompressionCodecName currentChunkCodec; // set in startColumn
private ColumnPath currentChunkPath; // set in startColumn
private PrimitiveType currentChunkType; // set in startColumn
private long currentChunkValueCount; // set in startColumn
private long currentChunkFirstDataPage; // set in startColumn & page writes
private long currentChunkDictionaryPageOffset; // set in writeDictionaryPage
// set when end is called
private ParquetMetadata footer = null;
private final CRC32 crc;
private boolean pageWriteChecksumEnabled;
/**
* Captures the order in which methods should be called
*/
private enum STATE {
NOT_STARTED {
STATE start() {
return STARTED;
}
},
STARTED {
STATE startBlock() {
return BLOCK;
}
STATE end() {
return ENDED;
}
},
BLOCK {
STATE startColumn() {
return COLUMN;
}
STATE endBlock() {
return STARTED;
}
},
COLUMN {
STATE endColumn() {
return BLOCK;
};
STATE write() {
return this;
}
},
ENDED;
STATE start() throws IOException { return error(); }
STATE startBlock() throws IOException { return error(); }
STATE startColumn() throws IOException { return error(); }
STATE write() throws IOException { return error(); }
STATE endColumn() throws IOException { return error(); }
STATE endBlock() throws IOException { return error(); }
STATE end() throws IOException { return error(); }
private final STATE error() throws IOException {
throw new IOException("The file being written is in an invalid state. Probably caused by an error thrown previously. Current state: " + this.name());
}
}
private STATE state = STATE.NOT_STARTED;
/**
* @param configuration Hadoop configuration
* @param schema the schema of the data
* @param file the file to write to
* @throws IOException if the file can not be created
* @deprecated will be removed in 2.0.0
*/
@Deprecated
public ParquetFileWriter(Configuration configuration, MessageType schema,
Path file) throws IOException {
this(HadoopOutputFile.fromPath(file, configuration),
schema, Mode.CREATE, DEFAULT_BLOCK_SIZE, MAX_PADDING_SIZE_DEFAULT);
}
/**
* @param configuration Hadoop configuration
* @param schema the schema of the data
* @param file the file to write to
* @param mode file creation mode
* @throws IOException if the file can not be created
* @deprecated will be removed in 2.0.0
*/
@Deprecated
public ParquetFileWriter(Configuration configuration, MessageType schema,
Path file, Mode mode) throws IOException {
this(HadoopOutputFile.fromPath(file, configuration),
schema, mode, DEFAULT_BLOCK_SIZE, MAX_PADDING_SIZE_DEFAULT);
}
/**
* @param configuration Hadoop configuration
* @param schema the schema of the data
* @param file the file to write to
* @param mode file creation mode
* @param rowGroupSize the row group size
* @param maxPaddingSize the maximum padding
* @throws IOException if the file can not be created
* @deprecated will be removed in 2.0.0
*/
@Deprecated
public ParquetFileWriter(Configuration configuration, MessageType schema,
Path file, Mode mode, long rowGroupSize,
int maxPaddingSize)
throws IOException {
this(HadoopOutputFile.fromPath(file, configuration),
schema, mode, rowGroupSize, maxPaddingSize);
}
/**
* @param file OutputFile to create or overwrite
* @param schema the schema of the data
* @param mode file creation mode
* @param rowGroupSize the row group size
* @param maxPaddingSize the maximum padding
* @throws IOException if the file can not be created
* @deprecated will be removed in 2.0.0
*/
@Deprecated
public ParquetFileWriter(OutputFile file, MessageType schema, Mode mode,
long rowGroupSize, int maxPaddingSize)
throws IOException {
this(file, schema, mode, rowGroupSize, maxPaddingSize,
ParquetProperties.DEFAULT_COLUMN_INDEX_TRUNCATE_LENGTH,
ParquetProperties.DEFAULT_STATISTICS_TRUNCATE_LENGTH,
ParquetProperties.DEFAULT_PAGE_WRITE_CHECKSUM_ENABLED);
}
/**
* @param file OutputFile to create or overwrite
* @param schema the schema of the data
* @param mode file creation mode
* @param rowGroupSize the row group size
* @param maxPaddingSize the maximum padding
* @param columnIndexTruncateLength the length which the min/max values in column indexes tried to be truncated to
* @param statisticsTruncateLength the length which the min/max values in row groups tried to be truncated to
* @param pageWriteChecksumEnabled whether to write out page level checksums
* @throws IOException if the file can not be created
*/
public ParquetFileWriter(OutputFile file, MessageType schema, Mode mode,
long rowGroupSize, int maxPaddingSize, int columnIndexTruncateLength,
int statisticsTruncateLength, boolean pageWriteChecksumEnabled)
throws IOException{
this(file, schema, mode, rowGroupSize, maxPaddingSize, columnIndexTruncateLength,
statisticsTruncateLength, pageWriteChecksumEnabled, null);
}
public ParquetFileWriter(OutputFile file, MessageType schema, Mode mode,
long rowGroupSize, int maxPaddingSize, int columnIndexTruncateLength,
int statisticsTruncateLength, boolean pageWriteChecksumEnabled,
FileEncryptionProperties encryptionProperties)
throws IOException {
TypeUtil.checkValidWriteSchema(schema);
this.schema = schema;
long blockSize = rowGroupSize;
if (file.supportsBlockSize()) {
blockSize = Math.max(file.defaultBlockSize(), rowGroupSize);
this.alignment = PaddingAlignment.get(blockSize, rowGroupSize, maxPaddingSize);
} else {
this.alignment = NoAlignment.get(rowGroupSize);
}
if (mode == Mode.OVERWRITE) {
this.out = file.createOrOverwrite(blockSize);
} else {
this.out = file.create(blockSize);
}
this.encodingStatsBuilder = new EncodingStats.Builder();
this.columnIndexTruncateLength = columnIndexTruncateLength;
this.pageWriteChecksumEnabled = pageWriteChecksumEnabled;
this.crc = pageWriteChecksumEnabled ? new CRC32() : null;
this.metadataConverter = new ParquetMetadataConverter(statisticsTruncateLength);
if (null == encryptionProperties) {
this.fileEncryptor = null;
} else {
// Verify that every encrypted column is in file schema
Map<ColumnPath, ColumnEncryptionProperties> columnEncryptionProperties = encryptionProperties.getEncryptedColumns();
if (null != columnEncryptionProperties) { // if null, every column in file schema will be encrypted with footer key
for (Map.Entry<ColumnPath, ColumnEncryptionProperties> entry : columnEncryptionProperties.entrySet()) {
String[] path = entry.getKey().toArray();
if(!schema.containsPath(path)) {
throw new ParquetCryptoRuntimeException("Encrypted column " + Arrays.toString(path) + " not in file schema");
}
}
}
this.fileEncryptor = new InternalFileEncryptor(encryptionProperties);
}
}
/**
* FOR TESTING ONLY. This supports testing block padding behavior on the local FS.
*
* @param configuration Hadoop configuration
* @param schema the schema of the data
* @param file the file to write to
* @param rowAndBlockSize the row group size
* @param maxPaddingSize the maximum padding
* @throws IOException if the file can not be created
*/
ParquetFileWriter(Configuration configuration, MessageType schema,
Path file, long rowAndBlockSize, int maxPaddingSize)
throws IOException {
FileSystem fs = file.getFileSystem(configuration);
this.schema = schema;
this.alignment = PaddingAlignment.get(
rowAndBlockSize, rowAndBlockSize, maxPaddingSize);
this.out = HadoopStreams.wrap(
fs.create(file, true, 8192, fs.getDefaultReplication(file), rowAndBlockSize));
this.encodingStatsBuilder = new EncodingStats.Builder();
// no truncation is needed for testing
this.columnIndexTruncateLength = Integer.MAX_VALUE;
this.pageWriteChecksumEnabled = ParquetOutputFormat.getPageWriteChecksumEnabled(configuration);
this.crc = pageWriteChecksumEnabled ? new CRC32() : null;
this.metadataConverter = new ParquetMetadataConverter(ParquetProperties.DEFAULT_STATISTICS_TRUNCATE_LENGTH);
this.fileEncryptor = null;
}
/**
* start the file
* @throws IOException if there is an error while writing
*/
public void start() throws IOException {
state = state.start();
LOG.debug("{}: start", out.getPos());
byte[] magic = MAGIC;
if (null != fileEncryptor && fileEncryptor.isFooterEncrypted()) {
magic = EFMAGIC;
}
out.write(magic);
}
InternalFileEncryptor getEncryptor() {
return fileEncryptor;
}
/**
* start a block
* @param recordCount the record count in this block
* @throws IOException if there is an error while writing
*/
public void startBlock(long recordCount) throws IOException {
state = state.startBlock();
LOG.debug("{}: start block", out.getPos());
// out.write(MAGIC); // TODO: add a magic delimiter
alignment.alignForRowGroup(out);
currentBlock = new BlockMetaData();
currentRecordCount = recordCount;
currentColumnIndexes = new ArrayList<>();
currentOffsetIndexes = new ArrayList<>();
currentBloomFilters = new HashMap<>();
}
/**
* start a column inside a block
* @param descriptor the column descriptor
* @param valueCount the value count in this column
* @param compressionCodecName a compression codec name
* @throws IOException if there is an error while writing
*/
public void startColumn(ColumnDescriptor descriptor,
long valueCount,
CompressionCodecName compressionCodecName) throws IOException {
state = state.startColumn();
encodingStatsBuilder.clear();
currentEncodings = new HashSet<Encoding>();
currentChunkPath = ColumnPath.get(descriptor.getPath());
currentChunkType = descriptor.getPrimitiveType();
currentChunkCodec = compressionCodecName;
currentChunkValueCount = valueCount;
currentChunkFirstDataPage = -1;
compressedLength = 0;
uncompressedLength = 0;
// The statistics will be copied from the first one added at writeDataPage(s) so we have the correct typed one
currentStatistics = null;
columnIndexBuilder = ColumnIndexBuilder.getBuilder(currentChunkType, columnIndexTruncateLength);
offsetIndexBuilder = OffsetIndexBuilder.getBuilder();
}
/**
* writes a dictionary page page
* @param dictionaryPage the dictionary page
* @throws IOException if there is an error while writing
*/
public void writeDictionaryPage(DictionaryPage dictionaryPage) throws IOException {
writeDictionaryPage(dictionaryPage, null, null);
}
public void writeDictionaryPage(DictionaryPage dictionaryPage,
BlockCipher.Encryptor headerBlockEncryptor, byte[] AAD) throws IOException {
state = state.write();
LOG.debug("{}: write dictionary page: {} values", out.getPos(), dictionaryPage.getDictionarySize());
currentChunkDictionaryPageOffset = out.getPos();
int uncompressedSize = dictionaryPage.getUncompressedSize();
int compressedPageSize = (int)dictionaryPage.getBytes().size(); // TODO: fix casts
if (pageWriteChecksumEnabled) {
crc.reset();
crc.update(dictionaryPage.getBytes().toByteArray());
metadataConverter.writeDictionaryPageHeader(
uncompressedSize,
compressedPageSize,
dictionaryPage.getDictionarySize(),
dictionaryPage.getEncoding(),
(int) crc.getValue(),
out,
headerBlockEncryptor,
AAD);
} else {
metadataConverter.writeDictionaryPageHeader(
uncompressedSize,
compressedPageSize,
dictionaryPage.getDictionarySize(),
dictionaryPage.getEncoding(),
out,
headerBlockEncryptor,
AAD);
}
long headerSize = out.getPos() - currentChunkDictionaryPageOffset;
this.uncompressedLength += uncompressedSize + headerSize;
this.compressedLength += compressedPageSize + headerSize;
LOG.debug("{}: write dictionary page content {}", out.getPos(), compressedPageSize);
dictionaryPage.getBytes().writeAllTo(out); // for encrypted column, dictionary page bytes are already encrypted
encodingStatsBuilder.addDictEncoding(dictionaryPage.getEncoding());
currentEncodings.add(dictionaryPage.getEncoding());
}
/**
* writes a single page
* @param valueCount count of values
* @param uncompressedPageSize the size of the data once uncompressed
* @param bytes the compressed data for the page without header
* @param rlEncoding encoding of the repetition level
* @param dlEncoding encoding of the definition level
* @param valuesEncoding encoding of values
* @throws IOException if there is an error while writing
*/
@Deprecated
public void writeDataPage(
int valueCount, int uncompressedPageSize,
BytesInput bytes,
Encoding rlEncoding,
Encoding dlEncoding,
Encoding valuesEncoding) throws IOException {
state = state.write();
// We are unable to build indexes without rowCount so skip them for this column
offsetIndexBuilder = OffsetIndexBuilder.getNoOpBuilder();
columnIndexBuilder = ColumnIndexBuilder.getNoOpBuilder();
long beforeHeader = out.getPos();
LOG.debug("{}: write data page: {} values", beforeHeader, valueCount);
int compressedPageSize = (int)bytes.size();
metadataConverter.writeDataPageV1Header(
uncompressedPageSize, compressedPageSize,
valueCount,
rlEncoding,
dlEncoding,
valuesEncoding,
out);
long headerSize = out.getPos() - beforeHeader;
this.uncompressedLength += uncompressedPageSize + headerSize;
this.compressedLength += compressedPageSize + headerSize;
LOG.debug("{}: write data page content {}", out.getPos(), compressedPageSize);
bytes.writeAllTo(out);
encodingStatsBuilder.addDataEncoding(valuesEncoding);
currentEncodings.add(rlEncoding);
currentEncodings.add(dlEncoding);
currentEncodings.add(valuesEncoding);
if (currentChunkFirstDataPage < 0) {
currentChunkFirstDataPage = beforeHeader;
}
}
/**
* writes a single page
* @param valueCount count of values
* @param uncompressedPageSize the size of the data once uncompressed
* @param bytes the compressed data for the page without header
* @param statistics statistics for the page
* @param rlEncoding encoding of the repetition level
* @param dlEncoding encoding of the definition level
* @param valuesEncoding encoding of values
* @throws IOException if there is an error while writing
* @deprecated this method does not support writing column indexes; Use
* {@link #writeDataPage(int, int, BytesInput, Statistics, long, Encoding, Encoding, Encoding)} instead
*/
@Deprecated
public void writeDataPage(
int valueCount, int uncompressedPageSize,
BytesInput bytes,
Statistics statistics,
Encoding rlEncoding,
Encoding dlEncoding,
Encoding valuesEncoding) throws IOException {
// We are unable to build indexes without rowCount so skip them for this column
offsetIndexBuilder = OffsetIndexBuilder.getNoOpBuilder();
columnIndexBuilder = ColumnIndexBuilder.getNoOpBuilder();
innerWriteDataPage(valueCount, uncompressedPageSize, bytes, statistics, rlEncoding, dlEncoding, valuesEncoding);
}
/**
* Writes a single page
* @param valueCount count of values
* @param uncompressedPageSize the size of the data once uncompressed
* @param bytes the compressed data for the page without header
* @param statistics the statistics of the page
* @param rowCount the number of rows in the page
* @param rlEncoding encoding of the repetition level
* @param dlEncoding encoding of the definition level
* @param valuesEncoding encoding of values
* @throws IOException if any I/O error occurs during writing the file
*/
public void writeDataPage(
int valueCount, int uncompressedPageSize,
BytesInput bytes,
Statistics statistics,
long rowCount,
Encoding rlEncoding,
Encoding dlEncoding,
Encoding valuesEncoding) throws IOException {
long beforeHeader = out.getPos();
innerWriteDataPage(valueCount, uncompressedPageSize, bytes, statistics, rlEncoding, dlEncoding, valuesEncoding);
offsetIndexBuilder.add((int) (out.getPos() - beforeHeader), rowCount);
}
private void innerWriteDataPage(
int valueCount, int uncompressedPageSize,
BytesInput bytes,
Statistics statistics,
Encoding rlEncoding,
Encoding dlEncoding,
Encoding valuesEncoding) throws IOException {
state = state.write();
long beforeHeader = out.getPos();
if (currentChunkFirstDataPage < 0) {
currentChunkFirstDataPage = beforeHeader;
}
LOG.debug("{}: write data page: {} values", beforeHeader, valueCount);
int compressedPageSize = (int) bytes.size();
if (pageWriteChecksumEnabled) {
crc.reset();
crc.update(bytes.toByteArray());
metadataConverter.writeDataPageV1Header(
uncompressedPageSize, compressedPageSize,
valueCount,
rlEncoding,
dlEncoding,
valuesEncoding,
(int) crc.getValue(),
out);
} else {
metadataConverter.writeDataPageV1Header(
uncompressedPageSize, compressedPageSize,
valueCount,
rlEncoding,
dlEncoding,
valuesEncoding,
out);
}
long headerSize = out.getPos() - beforeHeader;
this.uncompressedLength += uncompressedPageSize + headerSize;
this.compressedLength += compressedPageSize + headerSize;
LOG.debug("{}: write data page content {}", out.getPos(), compressedPageSize);
bytes.writeAllTo(out);
// Copying the statistics if it is not initialized yet so we have the correct typed one
if (currentStatistics == null) {
currentStatistics = statistics.copy();
} else {
currentStatistics.mergeStatistics(statistics);
}
columnIndexBuilder.add(statistics);
encodingStatsBuilder.addDataEncoding(valuesEncoding);
currentEncodings.add(rlEncoding);
currentEncodings.add(dlEncoding);
currentEncodings.add(valuesEncoding);
}
/**
* Add a Bloom filter that will be written out. This is only used in unit test.
*
* @param column the column name
* @param bloomFilter the bloom filter of column values
*/
void addBloomFilter(String column, BloomFilter bloomFilter) {
currentBloomFilters.put(column , bloomFilter);
}
/**
* Writes a single v2 data page
* @param rowCount count of rows
* @param nullCount count of nulls
* @param valueCount count of values
* @param repetitionLevels repetition level bytes
* @param definitionLevels definition level bytes
* @param dataEncoding encoding for data
* @param compressedData compressed data bytes
* @param uncompressedDataSize the size of uncompressed data
* @param statistics the statistics of the page
* @throws IOException if any I/O error occurs during writing the file
*/
public void writeDataPageV2(int rowCount, int nullCount, int valueCount,
BytesInput repetitionLevels,
BytesInput definitionLevels,
Encoding dataEncoding,
BytesInput compressedData,
int uncompressedDataSize,
Statistics<?> statistics) throws IOException {
state = state.write();
int rlByteLength = toIntWithCheck(repetitionLevels.size());
int dlByteLength = toIntWithCheck(definitionLevels.size());
int compressedSize = toIntWithCheck(
compressedData.size() + repetitionLevels.size() + definitionLevels.size()
);
int uncompressedSize = toIntWithCheck(
uncompressedDataSize + repetitionLevels.size() + definitionLevels.size()
);
long beforeHeader = out.getPos();
if (currentChunkFirstDataPage < 0) {
currentChunkFirstDataPage = beforeHeader;
}
metadataConverter.writeDataPageV2Header(
uncompressedSize, compressedSize,
valueCount, nullCount, rowCount,
dataEncoding,
rlByteLength,
dlByteLength,
out);
long headersSize = out.getPos() - beforeHeader;
this.uncompressedLength += uncompressedSize + headersSize;
this.compressedLength += compressedSize + headersSize;
if (currentStatistics == null) {
currentStatistics = statistics.copy();
} else {
currentStatistics.mergeStatistics(statistics);
}
columnIndexBuilder.add(statistics);
currentEncodings.add(dataEncoding);
encodingStatsBuilder.addDataEncoding(dataEncoding);
BytesInput.concat(repetitionLevels, definitionLevels, compressedData)
.writeAllTo(out);
offsetIndexBuilder.add((int) (out.getPos() - beforeHeader), rowCount);
}
/**
* Writes a column chunk at once
* @param descriptor the descriptor of the column
* @param valueCount the value count in this column
* @param compressionCodecName the name of the compression codec used for compressing the pages
* @param dictionaryPage the dictionary page for this column chunk (might be null)
* @param bytes the encoded pages including page headers to be written as is
* @param uncompressedTotalPageSize total uncompressed size (without page headers)
* @param compressedTotalPageSize total compressed size (without page headers)
* @param totalStats accumulated statistics for the column chunk
* @param columnIndexBuilder the builder object for the column index
* @param offsetIndexBuilder the builder object for the offset index
* @param bloomFilter the bloom filter for this column
* @param rlEncodings the RL encodings used in this column chunk
* @param dlEncodings the DL encodings used in this column chunk
* @param dataEncodings the data encodings used in this column chunk
* @throws IOException if there is an error while writing
*/
void writeColumnChunk(ColumnDescriptor descriptor,
long valueCount,
CompressionCodecName compressionCodecName,
DictionaryPage dictionaryPage,
BytesInput bytes,
long uncompressedTotalPageSize,
long compressedTotalPageSize,
Statistics<?> totalStats,
ColumnIndexBuilder columnIndexBuilder,
OffsetIndexBuilder offsetIndexBuilder,
BloomFilter bloomFilter,
Set<Encoding> rlEncodings,
Set<Encoding> dlEncodings,
List<Encoding> dataEncodings) throws IOException {
writeColumnChunk(descriptor, valueCount, compressionCodecName, dictionaryPage, bytes,
uncompressedTotalPageSize, compressedTotalPageSize, totalStats, columnIndexBuilder, offsetIndexBuilder,
bloomFilter, rlEncodings, dlEncodings, dataEncodings, null, 0, 0, null);
}
void writeColumnChunk(ColumnDescriptor descriptor,
long valueCount,
CompressionCodecName compressionCodecName,
DictionaryPage dictionaryPage,
BytesInput bytes,
long uncompressedTotalPageSize,
long compressedTotalPageSize,
Statistics<?> totalStats,
ColumnIndexBuilder columnIndexBuilder,
OffsetIndexBuilder offsetIndexBuilder,
BloomFilter bloomFilter,
Set<Encoding> rlEncodings,
Set<Encoding> dlEncodings,
List<Encoding> dataEncodings,
BlockCipher.Encryptor headerBlockEncryptor,
int rowGroupOrdinal,
int columnOrdinal,
byte[] fileAAD) throws IOException {
startColumn(descriptor, valueCount, compressionCodecName);
state = state.write();
if (dictionaryPage != null) {
byte[] dictonaryPageHeaderAAD = null;
if (null != headerBlockEncryptor) {
dictonaryPageHeaderAAD = AesCipher.createModuleAAD(fileAAD, ModuleType.DictionaryPageHeader,
rowGroupOrdinal, columnOrdinal, -1);
}
writeDictionaryPage(dictionaryPage, headerBlockEncryptor, dictonaryPageHeaderAAD);
}
if (bloomFilter != null) {
// write bloom filter if one of data pages is not dictionary encoded
boolean isWriteBloomFilter = false;
for (Encoding encoding : dataEncodings) {
if (encoding != Encoding.RLE_DICTIONARY) {
isWriteBloomFilter = true;
break;
}
}
if (isWriteBloomFilter) {
currentBloomFilters.put(String.join(".", descriptor.getPath()), bloomFilter);
}
}
LOG.debug("{}: write data pages", out.getPos());
long headersSize = bytes.size() - compressedTotalPageSize;
this.uncompressedLength += uncompressedTotalPageSize + headersSize;
this.compressedLength += compressedTotalPageSize + headersSize;
LOG.debug("{}: write data pages content", out.getPos());
currentChunkFirstDataPage = out.getPos();
bytes.writeAllTo(out);
encodingStatsBuilder.addDataEncodings(dataEncodings);
if (rlEncodings.isEmpty()) {
encodingStatsBuilder.withV2Pages();
}
currentEncodings.addAll(rlEncodings);
currentEncodings.addAll(dlEncodings);
currentEncodings.addAll(dataEncodings);
currentStatistics = totalStats;
this.columnIndexBuilder = columnIndexBuilder;
this.offsetIndexBuilder = offsetIndexBuilder;
endColumn();
}
/**
* end a column (once all rep, def and data have been written)
* @throws IOException if there is an error while writing
*/
public void endColumn() throws IOException {
state = state.endColumn();
LOG.debug("{}: end column", out.getPos());
if (columnIndexBuilder.getMinMaxSize() > columnIndexBuilder.getPageCount() * MAX_STATS_SIZE) {
currentColumnIndexes.add(null);
} else {
currentColumnIndexes.add(columnIndexBuilder.build());
}
currentOffsetIndexes.add(offsetIndexBuilder.build(currentChunkFirstDataPage));
currentBlock.addColumn(ColumnChunkMetaData.get(
currentChunkPath,
currentChunkType,
currentChunkCodec,
encodingStatsBuilder.build(),
currentEncodings,
currentStatistics,
currentChunkFirstDataPage,
currentChunkDictionaryPageOffset,
currentChunkValueCount,
compressedLength,
uncompressedLength));
this.currentBlock.setTotalByteSize(currentBlock.getTotalByteSize() + uncompressedLength);
this.uncompressedLength = 0;
this.compressedLength = 0;
columnIndexBuilder = null;
offsetIndexBuilder = null;
}
/**
* ends a block once all column chunks have been written
* @throws IOException if there is an error while writing
*/
public void endBlock() throws IOException {
if (currentRecordCount == 0) {
throw new ParquetEncodingException("End block with zero record");
}
state = state.endBlock();
LOG.debug("{}: end block", out.getPos());
currentBlock.setRowCount(currentRecordCount);
currentBlock.setOrdinal(blocks.size());
blocks.add(currentBlock);
columnIndexes.add(currentColumnIndexes);
offsetIndexes.add(currentOffsetIndexes);
bloomFilters.add(currentBloomFilters);
currentColumnIndexes = null;
currentOffsetIndexes = null;
currentBloomFilters = null;
currentBlock = null;
}
/**
* @param conf a configuration
* @param file a file path to append the contents of to this file
* @throws IOException if there is an error while reading or writing
* @deprecated will be removed in 2.0.0; use {@link #appendFile(InputFile)} instead
*/
@Deprecated
public void appendFile(Configuration conf, Path file) throws IOException {
ParquetFileReader.open(conf, file).appendTo(this);
}
public void appendFile(InputFile file) throws IOException {
try (ParquetFileReader reader = ParquetFileReader.open(file)) {
reader.appendTo(this);
}
}
/**
* @param file a file stream to read from
* @param rowGroups row groups to copy
* @param dropColumns whether to drop columns from the file that are not in this file's schema
* @throws IOException if there is an error while reading or writing
* @deprecated will be removed in 2.0.0;
* use {@link #appendRowGroups(SeekableInputStream,List,boolean)} instead
*/
@Deprecated
public void appendRowGroups(FSDataInputStream file,
List<BlockMetaData> rowGroups,
boolean dropColumns) throws IOException {
appendRowGroups(HadoopStreams.wrap(file), rowGroups, dropColumns);
}
public void appendRowGroups(SeekableInputStream file,
List<BlockMetaData> rowGroups,
boolean dropColumns) throws IOException {
for (BlockMetaData block : rowGroups) {
appendRowGroup(file, block, dropColumns);
}
}
/**
* @param from a file stream to read from
* @param rowGroup row group to copy
* @param dropColumns whether to drop columns from the file that are not in this file's schema
* @throws IOException if there is an error while reading or writing
* @deprecated will be removed in 2.0.0;
* use {@link #appendRowGroup(SeekableInputStream,BlockMetaData,boolean)} instead
*/
@Deprecated
public void appendRowGroup(FSDataInputStream from, BlockMetaData rowGroup,
boolean dropColumns) throws IOException {
appendRowGroup(HadoopStreams.wrap(from), rowGroup, dropColumns);
}
public void appendRowGroup(SeekableInputStream from, BlockMetaData rowGroup,
boolean dropColumns) throws IOException {
startBlock(rowGroup.getRowCount());
Map<String, ColumnChunkMetaData> columnsToCopy =
new HashMap<String, ColumnChunkMetaData>();
for (ColumnChunkMetaData chunk : rowGroup.getColumns()) {
columnsToCopy.put(chunk.getPath().toDotString(), chunk);
}
List<ColumnChunkMetaData> columnsInOrder =
new ArrayList<ColumnChunkMetaData>();
for (ColumnDescriptor descriptor : schema.getColumns()) {
String path = ColumnPath.get(descriptor.getPath()).toDotString();
ColumnChunkMetaData chunk = columnsToCopy.remove(path);
if (chunk != null) {
columnsInOrder.add(chunk);
} else {
throw new IllegalArgumentException(String.format(
"Missing column '%s', cannot copy row group: %s", path, rowGroup));
}
}
// complain if some columns would be dropped and that's not okay
if (!dropColumns && !columnsToCopy.isEmpty()) {
throw new IllegalArgumentException(String.format(
"Columns cannot be copied (missing from target schema): %s",
String.join(", ", columnsToCopy.keySet())));
}
// copy the data for all chunks
long start = -1;
long length = 0;
long blockUncompressedSize = 0L;
for (int i = 0; i < columnsInOrder.size(); i += 1) {
ColumnChunkMetaData chunk = columnsInOrder.get(i);
// get this chunk's start position in the new file
long newChunkStart = out.getPos() + length;
// add this chunk to be copied with any previous chunks
if (start < 0) {
// no previous chunk included, start at this chunk's starting pos
start = chunk.getStartingPos();
}
length += chunk.getTotalSize();
if ((i + 1) == columnsInOrder.size() ||
columnsInOrder.get(i + 1).getStartingPos() != (start + length)) {
// not contiguous. do the copy now.
copy(from, out, start, length);
// reset to start at the next column chunk
start = -1;
length = 0;
}
// TODO: column/offset indexes are not copied
// (it would require seeking to the end of the file for each row groups)
currentColumnIndexes.add(null);
currentOffsetIndexes.add(null);
Offsets offsets = Offsets.getOffsets(from, chunk, newChunkStart);
currentBlock.addColumn(ColumnChunkMetaData.get(
chunk.getPath(),
chunk.getPrimitiveType(),
chunk.getCodec(),
chunk.getEncodingStats(),
chunk.getEncodings(),
chunk.getStatistics(),
offsets.firstDataPageOffset,
offsets.dictionaryPageOffset,
chunk.getValueCount(),
chunk.getTotalSize(),
chunk.getTotalUncompressedSize()));
blockUncompressedSize += chunk.getTotalUncompressedSize();
}
currentBlock.setTotalByteSize(blockUncompressedSize);
endBlock();
}
/**
* @param descriptor the descriptor for the target column
* @param from a file stream to read from
* @param chunk the column chunk to be copied
* @param bloomFilter the bloomFilter for this chunk
* @param columnIndex the column index for this chunk
* @param offsetIndex the offset index for this chunk
* @throws IOException
*/
public void appendColumnChunk(ColumnDescriptor descriptor, SeekableInputStream from, ColumnChunkMetaData chunk,
BloomFilter bloomFilter, ColumnIndex columnIndex, OffsetIndex offsetIndex) throws IOException {
long start = chunk.getStartingPos();
long length = chunk.getTotalSize();
long newChunkStart = out.getPos();
copy(from, out, start, length);
currentBloomFilters.put(String.join(".", descriptor.getPath()), bloomFilter);
currentColumnIndexes.add(columnIndex);
currentOffsetIndexes.add(offsetIndex);
Offsets offsets = Offsets.getOffsets(from, chunk, newChunkStart);
currentBlock.addColumn(ColumnChunkMetaData.get(
chunk.getPath(),
chunk.getPrimitiveType(),
chunk.getCodec(),
chunk.getEncodingStats(),
chunk.getEncodings(),
chunk.getStatistics(),
offsets.firstDataPageOffset,
offsets.dictionaryPageOffset,
chunk.getValueCount(),
chunk.getTotalSize(),
chunk.getTotalUncompressedSize()));
currentBlock.setTotalByteSize(currentBlock.getTotalByteSize() + chunk.getTotalUncompressedSize());
}
// Buffers for the copy function.
private static final ThreadLocal<byte[]> COPY_BUFFER = ThreadLocal.withInitial(() -> new byte[8192]);
/**
* Copy from a FS input stream to an output stream. Thread-safe
*
* @param from a {@link SeekableInputStream}
* @param to any {@link PositionOutputStream}
* @param start where in the from stream to start copying
* @param length the number of bytes to copy
* @throws IOException if there is an error while reading or writing
*/
private static void copy(SeekableInputStream from, PositionOutputStream to,
long start, long length) throws IOException{
LOG.debug("Copying {} bytes at {} to {}" ,length , start , to.getPos());
from.seek(start);
long bytesCopied = 0;
byte[] buffer = COPY_BUFFER.get();
while (bytesCopied < length) {
long bytesLeft = length - bytesCopied;
int bytesRead = from.read(buffer, 0,
(buffer.length < bytesLeft ? buffer.length : (int) bytesLeft));
if (bytesRead < 0) {
throw new IllegalArgumentException(
"Unexpected end of input file at " + start + bytesCopied);
}
to.write(buffer, 0, bytesRead);
bytesCopied += bytesRead;
}
}
/**
* ends a file once all blocks have been written.
* closes the file.
* @param extraMetaData the extra meta data to write in the footer
* @throws IOException if there is an error while writing
*/
public void end(Map<String, String> extraMetaData) throws IOException {
state = state.end();
serializeColumnIndexes(columnIndexes, blocks, out, fileEncryptor);
serializeOffsetIndexes(offsetIndexes, blocks, out, fileEncryptor);
serializeBloomFilters(bloomFilters, blocks, out, fileEncryptor);
LOG.debug("{}: end", out.getPos());
this.footer = new ParquetMetadata(new FileMetaData(schema, extraMetaData, Version.FULL_VERSION), blocks);
serializeFooter(footer, out, fileEncryptor);
out.close();
}
private static void serializeColumnIndexes(
List<List<ColumnIndex>> columnIndexes,
List<BlockMetaData> blocks,
PositionOutputStream out,
InternalFileEncryptor fileEncryptor) throws IOException {
LOG.debug("{}: column indexes", out.getPos());
for (int bIndex = 0, bSize = blocks.size(); bIndex < bSize; ++bIndex) {
BlockMetaData block = blocks.get(bIndex);
List<ColumnChunkMetaData> columns = block.getColumns();
List<ColumnIndex> blockColumnIndexes = columnIndexes.get(bIndex);
for (int cIndex = 0, cSize = columns.size(); cIndex < cSize; ++cIndex) {
ColumnChunkMetaData column = columns.get(cIndex);
org.apache.parquet.format.ColumnIndex columnIndex = ParquetMetadataConverter
.toParquetColumnIndex(column.getPrimitiveType(), blockColumnIndexes.get(cIndex));
if (columnIndex == null) {
continue;
}
BlockCipher.Encryptor columnIndexEncryptor = null;
byte[] columnIndexAAD = null;
if (null != fileEncryptor) {
InternalColumnEncryptionSetup columnEncryptionSetup = fileEncryptor.getColumnSetup(column.getPath(), false, cIndex);
if (columnEncryptionSetup.isEncrypted()) {
columnIndexEncryptor = columnEncryptionSetup.getMetaDataEncryptor();
columnIndexAAD = AesCipher.createModuleAAD(fileEncryptor.getFileAAD(), ModuleType.ColumnIndex,
block.getOrdinal(), columnEncryptionSetup.getOrdinal(), -1);
}
}
long offset = out.getPos();
Util.writeColumnIndex(columnIndex, out, columnIndexEncryptor, columnIndexAAD);
column.setColumnIndexReference(new IndexReference(offset, (int) (out.getPos() - offset)));
}
}
}
private int toIntWithCheck(long size) {
if ((int)size != size) {
throw new ParquetEncodingException("Cannot write page larger than " + Integer.MAX_VALUE + " bytes: " + size);
}
return (int)size;
}
private static void serializeOffsetIndexes(
List<List<OffsetIndex>> offsetIndexes,
List<BlockMetaData> blocks,
PositionOutputStream out,
InternalFileEncryptor fileEncryptor) throws IOException {
LOG.debug("{}: offset indexes", out.getPos());
for (int bIndex = 0, bSize = blocks.size(); bIndex < bSize; ++bIndex) {
BlockMetaData block = blocks.get(bIndex);
List<ColumnChunkMetaData> columns = block.getColumns();
List<OffsetIndex> blockOffsetIndexes = offsetIndexes.get(bIndex);
for (int cIndex = 0, cSize = columns.size(); cIndex < cSize; ++cIndex) {
OffsetIndex offsetIndex = blockOffsetIndexes.get(cIndex);
if (offsetIndex == null) {
continue;
}
ColumnChunkMetaData column = columns.get(cIndex);
BlockCipher.Encryptor offsetIndexEncryptor = null;
byte[] offsetIndexAAD = null;
if (null != fileEncryptor) {
InternalColumnEncryptionSetup columnEncryptionSetup = fileEncryptor.getColumnSetup(column.getPath(), false, cIndex);
if (columnEncryptionSetup.isEncrypted()) {
offsetIndexEncryptor = columnEncryptionSetup.getMetaDataEncryptor();
offsetIndexAAD = AesCipher.createModuleAAD(fileEncryptor.getFileAAD(), ModuleType.OffsetIndex,
block.getOrdinal(), columnEncryptionSetup.getOrdinal(), -1);
}
}
long offset = out.getPos();
Util.writeOffsetIndex(ParquetMetadataConverter.toParquetOffsetIndex(offsetIndex), out, offsetIndexEncryptor, offsetIndexAAD);
column.setOffsetIndexReference(new IndexReference(offset, (int) (out.getPos() - offset)));
}
}
}
private static void serializeBloomFilters(
List<Map<String, BloomFilter>> bloomFilters,
List<BlockMetaData> blocks,
PositionOutputStream out,
InternalFileEncryptor fileEncryptor) throws IOException {
LOG.debug("{}: bloom filters", out.getPos());
for (int bIndex = 0, bSize = blocks.size(); bIndex < bSize; ++bIndex) {
BlockMetaData block = blocks.get(bIndex);
List<ColumnChunkMetaData> columns = block.getColumns();
Map<String, BloomFilter> blockBloomFilters = bloomFilters.get(bIndex);
if (blockBloomFilters.isEmpty()) continue;
for (int cIndex = 0, cSize = columns.size(); cIndex < cSize; ++cIndex) {
ColumnChunkMetaData column = columns.get(cIndex);
BloomFilter bloomFilter = blockBloomFilters.get(column.getPath().toDotString());
if (bloomFilter == null) {
continue;
}
long offset = out.getPos();
column.setBloomFilterOffset(offset);
BlockCipher.Encryptor bloomFilterEncryptor = null;
byte[] bloomFilterHeaderAAD = null;
byte[] bloomFilterBitsetAAD = null;
if (null != fileEncryptor) {
InternalColumnEncryptionSetup columnEncryptionSetup = fileEncryptor.getColumnSetup(column.getPath(), false, cIndex);
if (columnEncryptionSetup.isEncrypted()) {
bloomFilterEncryptor = columnEncryptionSetup.getMetaDataEncryptor();
int columnOrdinal = columnEncryptionSetup.getOrdinal();
bloomFilterHeaderAAD = AesCipher.createModuleAAD(fileEncryptor.getFileAAD(), ModuleType.BloomFilterHeader,
block.getOrdinal(), columnOrdinal, -1);
bloomFilterBitsetAAD = AesCipher.createModuleAAD(fileEncryptor.getFileAAD(), ModuleType.BloomFilterBitset,
block.getOrdinal(), columnOrdinal, -1);
}
}
Util.writeBloomFilterHeader(ParquetMetadataConverter.toBloomFilterHeader(bloomFilter), out,
bloomFilterEncryptor, bloomFilterHeaderAAD);
ByteArrayOutputStream tempOutStream = new ByteArrayOutputStream();
bloomFilter.writeTo(tempOutStream);
byte[] serializedBitset = tempOutStream.toByteArray();
if (null != bloomFilterEncryptor) {
serializedBitset = bloomFilterEncryptor.encrypt(serializedBitset, bloomFilterBitsetAAD);
}
out.write(serializedBitset);
}
}
}
private static void serializeFooter(ParquetMetadata footer, PositionOutputStream out,
InternalFileEncryptor fileEncryptor) throws IOException {
ParquetMetadataConverter metadataConverter = new ParquetMetadataConverter();
// Unencrypted file
if (null == fileEncryptor) {
long footerIndex = out.getPos();
org.apache.parquet.format.FileMetaData parquetMetadata = metadataConverter.toParquetMetadata(CURRENT_VERSION, footer);
writeFileMetaData(parquetMetadata, out);
LOG.debug("{}: footer length = {}" , out.getPos(), (out.getPos() - footerIndex));
BytesUtils.writeIntLittleEndian(out, (int) (out.getPos() - footerIndex));
out.write(MAGIC);
return;
}
org.apache.parquet.format.FileMetaData parquetMetadata =
metadataConverter.toParquetMetadata(CURRENT_VERSION, footer, fileEncryptor);
// Encrypted file with plaintext footer
if (!fileEncryptor.isFooterEncrypted()) {
long footerIndex = out.getPos();
parquetMetadata.setEncryption_algorithm(fileEncryptor.getEncryptionAlgorithm());
// create footer signature (nonce + tag of encrypted footer)
byte[] footerSigningKeyMetaData = fileEncryptor.getFooterSigningKeyMetaData();
if (null != footerSigningKeyMetaData) {
parquetMetadata.setFooter_signing_key_metadata(footerSigningKeyMetaData);
}
ByteArrayOutputStream tempOutStream = new ByteArrayOutputStream();
writeFileMetaData(parquetMetadata, tempOutStream);
byte[] serializedFooter = tempOutStream.toByteArray();
byte[] footerAAD = AesCipher.createFooterAAD(fileEncryptor.getFileAAD());
byte[] encryptedFooter = fileEncryptor.getSignedFooterEncryptor().encrypt(serializedFooter, footerAAD);
byte[] signature = new byte[AesCipher.NONCE_LENGTH + AesCipher.GCM_TAG_LENGTH];
System.arraycopy(encryptedFooter, ModuleCipherFactory.SIZE_LENGTH, signature, 0, AesCipher.NONCE_LENGTH); // copy Nonce
System.arraycopy(encryptedFooter, encryptedFooter.length - AesCipher.GCM_TAG_LENGTH,
signature, AesCipher.NONCE_LENGTH, AesCipher.GCM_TAG_LENGTH); // copy GCM Tag
out.write(serializedFooter);
out.write(signature);
LOG.debug("{}: footer and signature length = {}" , out.getPos(), (out.getPos() - footerIndex));
BytesUtils.writeIntLittleEndian(out, (int) (out.getPos() - footerIndex));
out.write(MAGIC);
return;
}
// Encrypted file with encrypted footer
long cryptoFooterIndex = out.getPos();
writeFileCryptoMetaData(fileEncryptor.getFileCryptoMetaData(), out);
byte[] footerAAD = AesCipher.createFooterAAD(fileEncryptor.getFileAAD());
writeFileMetaData(parquetMetadata, out, fileEncryptor.getFooterEncryptor(), footerAAD);
int combinedMetaDataLength = (int)(out.getPos() - cryptoFooterIndex);
LOG.debug("{}: crypto metadata and footer length = {}" , out.getPos(), combinedMetaDataLength);
BytesUtils.writeIntLittleEndian(out, combinedMetaDataLength);
out.write(EFMAGIC);
}
public ParquetMetadata getFooter() {
Preconditions.checkState(state == STATE.ENDED, "Cannot return unfinished footer.");
return footer;
}
/**
* Given a list of metadata files, merge them into a single ParquetMetadata
* Requires that the schemas be compatible, and the extraMetadata be exactly equal.
* @param files a list of files to merge metadata from
* @param conf a configuration
* @return merged parquet metadata for the files
* @throws IOException if there is an error while writing
* @deprecated metadata files are not recommended and will be removed in 2.0.0
*/
@Deprecated
public static ParquetMetadata mergeMetadataFiles(List<Path> files, Configuration conf) throws IOException {
return mergeMetadataFiles(files, conf, new StrictKeyValueMetadataMergeStrategy());
}
/**
* Given a list of metadata files, merge them into a single ParquetMetadata
* Requires that the schemas be compatible, and the extraMetadata be exactly equal.
* @param files a list of files to merge metadata from
* @param conf a configuration
* @param keyValueMetadataMergeStrategy strategy to merge values for same key, if there are multiple
* @return merged parquet metadata for the files
* @throws IOException if there is an error while writing
* @deprecated metadata files are not recommended and will be removed in 2.0.0
*/
@Deprecated
public static ParquetMetadata mergeMetadataFiles(List<Path> files, Configuration conf,
KeyValueMetadataMergeStrategy keyValueMetadataMergeStrategy) throws IOException {
Preconditions.checkArgument(!files.isEmpty(), "Cannot merge an empty list of metadata");
GlobalMetaData globalMetaData = null;
List<BlockMetaData> blocks = new ArrayList<BlockMetaData>();
for (Path p : files) {
ParquetMetadata pmd = ParquetFileReader.readFooter(conf, p, ParquetMetadataConverter.NO_FILTER);
FileMetaData fmd = pmd.getFileMetaData();
globalMetaData = mergeInto(fmd, globalMetaData, true);
blocks.addAll(pmd.getBlocks());
}
// collapse GlobalMetaData into a single FileMetaData, which will throw if they are not compatible
return new ParquetMetadata(globalMetaData.merge(keyValueMetadataMergeStrategy), blocks);
}
/**
* Given a list of metadata files, merge them into a single metadata file.
* Requires that the schemas be compatible, and the extraMetaData be exactly equal.
* This is useful when merging 2 directories of parquet files into a single directory, as long
* as both directories were written with compatible schemas and equal extraMetaData.
* @param files a list of files to merge metadata from
* @param outputPath path to write merged metadata to
* @param conf a configuration
* @throws IOException if there is an error while reading or writing
* @deprecated metadata files are not recommended and will be removed in 2.0.0
*/
@Deprecated
public static void writeMergedMetadataFile(List<Path> files, Path outputPath, Configuration conf) throws IOException {
ParquetMetadata merged = mergeMetadataFiles(files, conf);
writeMetadataFile(outputPath, merged, outputPath.getFileSystem(conf));
}
/**
* writes a _metadata and _common_metadata file
* @param configuration the configuration to use to get the FileSystem
* @param outputPath the directory to write the _metadata file to
* @param footers the list of footers to merge
* @throws IOException if there is an error while writing
* @deprecated metadata files are not recommended and will be removed in 2.0.0
*/
@Deprecated
public static void writeMetadataFile(Configuration configuration, Path outputPath, List<Footer> footers) throws IOException {
writeMetadataFile(configuration, outputPath, footers, JobSummaryLevel.ALL);
}
/**
* writes _common_metadata file, and optionally a _metadata file depending on the {@link JobSummaryLevel} provided
* @param configuration the configuration to use to get the FileSystem
* @param outputPath the directory to write the _metadata file to
* @param footers the list of footers to merge
* @param level level of summary to write
* @throws IOException if there is an error while writing
* @deprecated metadata files are not recommended and will be removed in 2.0.0
*/
@Deprecated
public static void writeMetadataFile(Configuration configuration, Path outputPath, List<Footer> footers, JobSummaryLevel level) throws IOException {
Preconditions.checkArgument(level == JobSummaryLevel.ALL || level == JobSummaryLevel.COMMON_ONLY,
"Unsupported level: " + level);
FileSystem fs = outputPath.getFileSystem(configuration);
outputPath = outputPath.makeQualified(fs);
ParquetMetadata metadataFooter = mergeFooters(outputPath, footers);
if (level == JobSummaryLevel.ALL) {
writeMetadataFile(outputPath, metadataFooter, fs, PARQUET_METADATA_FILE);
}
metadataFooter.getBlocks().clear();
writeMetadataFile(outputPath, metadataFooter, fs, PARQUET_COMMON_METADATA_FILE);
}
/**
* @deprecated metadata files are not recommended and will be removed in 2.0.0
*/
@Deprecated
private static void writeMetadataFile(Path outputPathRoot, ParquetMetadata metadataFooter, FileSystem fs, String parquetMetadataFile)
throws IOException {
Path metaDataPath = new Path(outputPathRoot, parquetMetadataFile);
writeMetadataFile(metaDataPath, metadataFooter, fs);
}
/**
* @deprecated metadata files are not recommended and will be removed in 2.0.0
*/
@Deprecated
private static void writeMetadataFile(Path outputPath, ParquetMetadata metadataFooter, FileSystem fs)
throws IOException {
PositionOutputStream metadata = HadoopStreams.wrap(fs.create(outputPath));
metadata.write(MAGIC);
serializeFooter(metadataFooter, metadata, null);
metadata.close();
}
/**
* Will merge the metadata of all the footers together
* @param root the directory containing all footers
* @param footers the list files footers to merge
* @return the global meta data for all the footers
*/
static ParquetMetadata mergeFooters(Path root, List<Footer> footers) {
return mergeFooters(root, footers, new StrictKeyValueMetadataMergeStrategy());
}
/**
* Will merge the metadata of all the footers together
* @param root the directory containing all footers
* @param footers the list files footers to merge
* @param keyValueMergeStrategy strategy to merge values for a given key (if there are multiple values)
* @return the global meta data for all the footers
*/
static ParquetMetadata mergeFooters(Path root, List<Footer> footers, KeyValueMetadataMergeStrategy keyValueMergeStrategy) {
String rootPath = root.toUri().getPath();
GlobalMetaData fileMetaData = null;
List<BlockMetaData> blocks = new ArrayList<BlockMetaData>();
for (Footer footer : footers) {
String footerPath = footer.getFile().toUri().getPath();
if (!footerPath.startsWith(rootPath)) {
throw new ParquetEncodingException(footerPath + " invalid: all the files must be contained in the root " + root);
}
footerPath = footerPath.substring(rootPath.length());
while (footerPath.startsWith("/")) {
footerPath = footerPath.substring(1);
}
fileMetaData = mergeInto(footer.getParquetMetadata().getFileMetaData(), fileMetaData);
for (BlockMetaData block : footer.getParquetMetadata().getBlocks()) {
block.setPath(footerPath);
blocks.add(block);
}
}
return new ParquetMetadata(fileMetaData.merge(keyValueMergeStrategy), blocks);
}
/**
* @return the current position in the underlying file
* @throws IOException if there is an error while getting the current stream's position
*/
public long getPos() throws IOException {
return out.getPos();
}
public long getNextRowGroupSize() throws IOException {
return alignment.nextRowGroupSize(out);
}
/**
* Will merge the metadata of all the footers together
* @param footers the list files footers to merge
* @return the global meta data for all the footers
*/
static GlobalMetaData getGlobalMetaData(List<Footer> footers) {
return getGlobalMetaData(footers, true);
}
static GlobalMetaData getGlobalMetaData(List<Footer> footers, boolean strict) {
GlobalMetaData fileMetaData = null;
for (Footer footer : footers) {
ParquetMetadata currentMetadata = footer.getParquetMetadata();
fileMetaData = mergeInto(currentMetadata.getFileMetaData(), fileMetaData, strict);
}
return fileMetaData;
}
/**
* Will return the result of merging toMerge into mergedMetadata
* @param toMerge the metadata toMerge
* @param mergedMetadata the reference metadata to merge into
* @return the result of the merge
*/
static GlobalMetaData mergeInto(
FileMetaData toMerge,
GlobalMetaData mergedMetadata) {
return mergeInto(toMerge, mergedMetadata, true);
}
static GlobalMetaData mergeInto(
FileMetaData toMerge,
GlobalMetaData mergedMetadata,
boolean strict) {
MessageType schema = null;
Map<String, Set<String>> newKeyValues = new HashMap<String, Set<String>>();
Set<String> createdBy = new HashSet<String>();
if (mergedMetadata != null) {
schema = mergedMetadata.getSchema();
newKeyValues.putAll(mergedMetadata.getKeyValueMetaData());
createdBy.addAll(mergedMetadata.getCreatedBy());
}
if ((schema == null && toMerge.getSchema() != null)
|| (schema != null && !schema.equals(toMerge.getSchema()))) {
schema = mergeInto(toMerge.getSchema(), schema, strict);
}
for (Entry<String, String> entry : toMerge.getKeyValueMetaData().entrySet()) {
Set<String> values = newKeyValues.get(entry.getKey());
if (values == null) {
values = new LinkedHashSet<String>();
newKeyValues.put(entry.getKey(), values);
}
values.add(entry.getValue());
}
createdBy.add(toMerge.getCreatedBy());
return new GlobalMetaData(
schema,
newKeyValues,
createdBy);
}
/**
* will return the result of merging toMerge into mergedSchema
* @param toMerge the schema to merge into mergedSchema
* @param mergedSchema the schema to append the fields to
* @return the resulting schema
*/
static MessageType mergeInto(MessageType toMerge, MessageType mergedSchema) {
return mergeInto(toMerge, mergedSchema, true);
}
/**
* will return the result of merging toMerge into mergedSchema
* @param toMerge the schema to merge into mergedSchema
* @param mergedSchema the schema to append the fields to
* @param strict should schema primitive types match
* @return the resulting schema
*/
static MessageType mergeInto(MessageType toMerge, MessageType mergedSchema, boolean strict) {
if (mergedSchema == null) {
return toMerge;
}
return mergedSchema.union(toMerge, strict);
}
private interface AlignmentStrategy {
void alignForRowGroup(PositionOutputStream out) throws IOException;
long nextRowGroupSize(PositionOutputStream out) throws IOException;
}
private static class NoAlignment implements AlignmentStrategy {
public static NoAlignment get(long rowGroupSize) {
return new NoAlignment(rowGroupSize);
}
private final long rowGroupSize;
private NoAlignment(long rowGroupSize) {
this.rowGroupSize = rowGroupSize;
}
@Override
public void alignForRowGroup(PositionOutputStream out) {
}
@Override
public long nextRowGroupSize(PositionOutputStream out) {
return rowGroupSize;
}
}
/**
* Alignment strategy that pads when less than half the row group size is
* left before the next DFS block.
*/
private static class PaddingAlignment implements AlignmentStrategy {
private static final byte[] zeros = new byte[4096];
public static PaddingAlignment get(long dfsBlockSize, long rowGroupSize,
int maxPaddingSize) {
return new PaddingAlignment(dfsBlockSize, rowGroupSize, maxPaddingSize);
}
protected final long dfsBlockSize;
protected final long rowGroupSize;
protected final int maxPaddingSize;
private PaddingAlignment(long dfsBlockSize, long rowGroupSize,
int maxPaddingSize) {
this.dfsBlockSize = dfsBlockSize;
this.rowGroupSize = rowGroupSize;
this.maxPaddingSize = maxPaddingSize;
}
@Override
public void alignForRowGroup(PositionOutputStream out) throws IOException {
long remaining = dfsBlockSize - (out.getPos() % dfsBlockSize);
if (isPaddingNeeded(remaining)) {
LOG.debug("Adding {} bytes of padding (row group size={}B, block size={}B)", remaining, rowGroupSize, dfsBlockSize);
for (; remaining > 0; remaining -= zeros.length) {
out.write(zeros, 0, (int) Math.min((long) zeros.length, remaining));
}
}
}
@Override
public long nextRowGroupSize(PositionOutputStream out) throws IOException {
if (maxPaddingSize <= 0) {
return rowGroupSize;
}
long remaining = dfsBlockSize - (out.getPos() % dfsBlockSize);
if (isPaddingNeeded(remaining)) {
return rowGroupSize;
}
return Math.min(remaining, rowGroupSize);
}
protected boolean isPaddingNeeded(long remaining) {
return (remaining <= maxPaddingSize);
}
}
}