blob: 73ce7c2ab688d61253a4609a40c86e7de6a81541 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.tajo.storage.thirdparty.parquet;
import static parquet.Log.DEBUG;
import static parquet.format.Util.writeFileMetaData;
import java.io.IOException;
import java.nio.charset.Charset;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import parquet.Log;
import parquet.Version;
import parquet.bytes.BytesInput;
import parquet.bytes.BytesUtils;
import parquet.column.ColumnDescriptor;
import parquet.column.page.DictionaryPage;
import parquet.column.statistics.Statistics;
import parquet.format.converter.ParquetMetadataConverter;
import parquet.hadoop.Footer;
import parquet.hadoop.metadata.BlockMetaData;
import parquet.hadoop.metadata.ColumnChunkMetaData;
import parquet.hadoop.metadata.ColumnPath;
import parquet.hadoop.metadata.CompressionCodecName;
import parquet.hadoop.metadata.FileMetaData;
import parquet.hadoop.metadata.GlobalMetaData;
import parquet.hadoop.metadata.ParquetMetadata;
import parquet.io.ParquetEncodingException;
import parquet.schema.MessageType;
import parquet.schema.PrimitiveType.PrimitiveTypeName;
/**
* Internal implementation of the Parquet file writer as a block container
*
* @author Julien Le Dem
*
*/
public class ParquetFileWriter {
private static final Log LOG = Log.getLog(ParquetFileWriter.class);
public static final String PARQUET_METADATA_FILE = "_metadata";
public static final byte[] MAGIC = "PAR1".getBytes(Charset.forName("ASCII"));
public static final int CURRENT_VERSION = 1;
private static final ParquetMetadataConverter metadataConverter = new ParquetMetadataConverter();
private final MessageType schema;
private final FSDataOutputStream out;
private BlockMetaData currentBlock;
private ColumnChunkMetaData currentColumn;
private long currentRecordCount;
private List<BlockMetaData> blocks = new ArrayList<BlockMetaData>();
private long uncompressedLength;
private long compressedLength;
private Set<parquet.column.Encoding> currentEncodings;
private CompressionCodecName currentChunkCodec;
private ColumnPath currentChunkPath;
private PrimitiveTypeName currentChunkType;
private long currentChunkFirstDataPage;
private long currentChunkDictionaryPageOffset;
private long currentChunkValueCount;
private Statistics currentStatistics;
/**
* Captures the order in which methods should be called
*
* @author Julien Le Dem
*
*/
private enum STATE {
NOT_STARTED {
STATE start() {
return STARTED;
}
},
STARTED {
STATE startBlock() {
return BLOCK;
}
STATE end() {
return ENDED;
}
},
BLOCK {
STATE startColumn() {
return COLUMN;
}
STATE endBlock() {
return STARTED;
}
},
COLUMN {
STATE endColumn() {
return BLOCK;
};
STATE write() {
return this;
}
},
ENDED;
STATE start() throws IOException { return error(); }
STATE startBlock() throws IOException { return error(); }
STATE startColumn() throws IOException { return error(); }
STATE write() throws IOException { return error(); }
STATE endColumn() throws IOException { return error(); }
STATE endBlock() throws IOException { return error(); }
STATE end() throws IOException { return error(); }
private final STATE error() throws IOException {
throw new IOException("The file being written is in an invalid state. Probably caused by an error thrown previously. Current state: " + this.name());
}
}
private STATE state = STATE.NOT_STARTED;
/**
*
* @param schema the schema of the data
* @param out the file to write to
* @param codec the codec to use to compress blocks
* @throws IOException if the file can not be created
*/
public ParquetFileWriter(Configuration configuration, MessageType schema, Path file) throws IOException {
super();
this.schema = schema;
FileSystem fs = file.getFileSystem(configuration);
this.out = fs.create(file, false);
}
/**
* start the file
* @throws IOException
*/
public void start() throws IOException {
state = state.start();
if (DEBUG) LOG.debug(out.getPos() + ": start");
out.write(MAGIC);
}
/**
* start a block
* @param recordCount the record count in this block
* @throws IOException
*/
public void startBlock(long recordCount) throws IOException {
state = state.startBlock();
if (DEBUG) LOG.debug(out.getPos() + ": start block");
// out.write(MAGIC); // TODO: add a magic delimiter
currentBlock = new BlockMetaData();
currentRecordCount = recordCount;
}
/**
* start a column inside a block
* @param descriptor the column descriptor
* @param valueCount the value count in this column
* @param statistics the statistics in this column
* @param compressionCodecName
* @throws IOException
*/
public void startColumn(ColumnDescriptor descriptor,
long valueCount,
CompressionCodecName compressionCodecName) throws IOException {
state = state.startColumn();
if (DEBUG) LOG.debug(out.getPos() + ": start column: " + descriptor + " count=" + valueCount);
currentEncodings = new HashSet<parquet.column.Encoding>();
currentChunkPath = ColumnPath.get(descriptor.getPath());
currentChunkType = descriptor.getType();
currentChunkCodec = compressionCodecName;
currentChunkValueCount = valueCount;
currentChunkFirstDataPage = out.getPos();
compressedLength = 0;
uncompressedLength = 0;
// need to know what type of stats to initialize to
// better way to do this?
currentStatistics = Statistics.getStatsBasedOnType(currentChunkType);
}
/**
* writes a dictionary page page
* @param dictionaryPage the dictionary page
*/
public void writeDictionaryPage(DictionaryPage dictionaryPage) throws IOException {
state = state.write();
if (DEBUG) LOG.debug(out.getPos() + ": write dictionary page: " + dictionaryPage.getDictionarySize() + " values");
currentChunkDictionaryPageOffset = out.getPos();
int uncompressedSize = dictionaryPage.getUncompressedSize();
int compressedPageSize = (int)dictionaryPage.getBytes().size(); // TODO: fix casts
metadataConverter.writeDictionaryPageHeader(
uncompressedSize,
compressedPageSize,
dictionaryPage.getDictionarySize(),
dictionaryPage.getEncoding(),
out);
long headerSize = out.getPos() - currentChunkDictionaryPageOffset;
this.uncompressedLength += uncompressedSize + headerSize;
this.compressedLength += compressedPageSize + headerSize;
if (DEBUG) LOG.debug(out.getPos() + ": write dictionary page content " + compressedPageSize);
dictionaryPage.getBytes().writeAllTo(out);
currentEncodings.add(dictionaryPage.getEncoding());
}
/**
* writes a single page
* @param valueCount count of values
* @param uncompressedPageSize the size of the data once uncompressed
* @param bytes the compressed data for the page without header
* @param rlEncoding encoding of the repetition level
* @param dlEncoding encoding of the definition level
* @param valuesEncoding encoding of values
*/
@Deprecated
public void writeDataPage(
int valueCount, int uncompressedPageSize,
BytesInput bytes,
parquet.column.Encoding rlEncoding,
parquet.column.Encoding dlEncoding,
parquet.column.Encoding valuesEncoding) throws IOException {
state = state.write();
long beforeHeader = out.getPos();
if (DEBUG) LOG.debug(beforeHeader + ": write data page: " + valueCount + " values");
int compressedPageSize = (int)bytes.size();
metadataConverter.writeDataPageHeader(
uncompressedPageSize, compressedPageSize,
valueCount,
rlEncoding,
dlEncoding,
valuesEncoding,
out);
long headerSize = out.getPos() - beforeHeader;
this.uncompressedLength += uncompressedPageSize + headerSize;
this.compressedLength += compressedPageSize + headerSize;
if (DEBUG) LOG.debug(out.getPos() + ": write data page content " + compressedPageSize);
bytes.writeAllTo(out);
currentEncodings.add(rlEncoding);
currentEncodings.add(dlEncoding);
currentEncodings.add(valuesEncoding);
}
/**
* writes a single page
* @param valueCount count of values
* @param uncompressedPageSize the size of the data once uncompressed
* @param bytes the compressed data for the page without header
* @param rlEncoding encoding of the repetition level
* @param dlEncoding encoding of the definition level
* @param valuesEncoding encoding of values
*/
public void writeDataPage(
int valueCount, int uncompressedPageSize,
BytesInput bytes,
Statistics statistics,
parquet.column.Encoding rlEncoding,
parquet.column.Encoding dlEncoding,
parquet.column.Encoding valuesEncoding) throws IOException {
state = state.write();
long beforeHeader = out.getPos();
if (DEBUG) LOG.debug(beforeHeader + ": write data page: " + valueCount + " values");
int compressedPageSize = (int)bytes.size();
metadataConverter.writeDataPageHeader(
uncompressedPageSize, compressedPageSize,
valueCount,
statistics,
rlEncoding,
dlEncoding,
valuesEncoding,
out);
long headerSize = out.getPos() - beforeHeader;
this.uncompressedLength += uncompressedPageSize + headerSize;
this.compressedLength += compressedPageSize + headerSize;
if (DEBUG) LOG.debug(out.getPos() + ": write data page content " + compressedPageSize);
bytes.writeAllTo(out);
currentStatistics.mergeStatistics(statistics);
currentEncodings.add(rlEncoding);
currentEncodings.add(dlEncoding);
currentEncodings.add(valuesEncoding);
}
/**
* writes a number of pages at once
* @param bytes bytes to be written including page headers
* @param uncompressedTotalPageSize total uncompressed size (without page headers)
* @param compressedTotalPageSize total compressed size (without page headers)
* @throws IOException
*/
void writeDataPages(BytesInput bytes,
long uncompressedTotalPageSize,
long compressedTotalPageSize,
Statistics totalStats,
List<parquet.column.Encoding> encodings) throws IOException {
state = state.write();
if (DEBUG) LOG.debug(out.getPos() + ": write data pages");
long headersSize = bytes.size() - compressedTotalPageSize;
this.uncompressedLength += uncompressedTotalPageSize + headersSize;
this.compressedLength += compressedTotalPageSize + headersSize;
if (DEBUG) LOG.debug(out.getPos() + ": write data pages content");
bytes.writeAllTo(out);
currentEncodings.addAll(encodings);
currentStatistics = totalStats;
}
/**
* end a column (once all rep, def and data have been written)
* @throws IOException
*/
public void endColumn() throws IOException {
state = state.endColumn();
if (DEBUG) LOG.debug(out.getPos() + ": end column");
currentBlock.addColumn(ColumnChunkMetaData.get(
currentChunkPath,
currentChunkType,
currentChunkCodec,
currentEncodings,
currentStatistics,
currentChunkFirstDataPage,
currentChunkDictionaryPageOffset,
currentChunkValueCount,
compressedLength,
uncompressedLength));
if (DEBUG) LOG.info("ended Column chumk: " + currentColumn);
currentColumn = null;
this.currentBlock.setTotalByteSize(currentBlock.getTotalByteSize() + uncompressedLength);
this.uncompressedLength = 0;
this.compressedLength = 0;
}
/**
* ends a block once all column chunks have been written
* @throws IOException
*/
public void endBlock() throws IOException {
state = state.endBlock();
if (DEBUG) LOG.debug(out.getPos() + ": end block");
currentBlock.setRowCount(currentRecordCount);
blocks.add(currentBlock);
currentBlock = null;
}
/**
* ends a file once all blocks have been written.
* closes the file.
* @param extraMetaData the extra meta data to write in the footer
* @throws IOException
*/
public void end(Map<String, String> extraMetaData) throws IOException {
state = state.end();
if (DEBUG) LOG.debug(out.getPos() + ": end");
ParquetMetadata footer = new ParquetMetadata(new FileMetaData(schema, extraMetaData, Version.FULL_VERSION), blocks);
serializeFooter(footer, out);
out.close();
}
private static void serializeFooter(ParquetMetadata footer, FSDataOutputStream out) throws IOException {
long footerIndex = out.getPos();
parquet.format.FileMetaData parquetMetadata = new ParquetMetadataConverter().toParquetMetadata(CURRENT_VERSION, footer);
writeFileMetaData(parquetMetadata, out);
if (DEBUG) LOG.debug(out.getPos() + ": footer length = " + (out.getPos() - footerIndex));
BytesUtils.writeIntLittleEndian(out, (int)(out.getPos() - footerIndex));
out.write(MAGIC);
}
/**
* writes a _metadata file
* @param configuration the configuration to use to get the FileSystem
* @param outputPath the directory to write the _metadata file to
* @param footers the list of footers to merge
* @throws IOException
*/
public static void writeMetadataFile(Configuration configuration, Path outputPath, List<Footer> footers) throws IOException {
Path metaDataPath = new Path(outputPath, PARQUET_METADATA_FILE);
FileSystem fs = outputPath.getFileSystem(configuration);
outputPath = outputPath.makeQualified(fs);
FSDataOutputStream metadata = fs.create(metaDataPath);
metadata.write(MAGIC);
ParquetMetadata metadataFooter = mergeFooters(outputPath, footers);
serializeFooter(metadataFooter, metadata);
metadata.close();
}
private static ParquetMetadata mergeFooters(Path root, List<Footer> footers) {
String rootPath = root.toString();
GlobalMetaData fileMetaData = null;
List<BlockMetaData> blocks = new ArrayList<BlockMetaData>();
for (Footer footer : footers) {
String path = footer.getFile().toString();
if (!path.startsWith(rootPath)) {
throw new ParquetEncodingException(path + " invalid: all the files must be contained in the root " + root);
}
path = path.substring(rootPath.length());
while (path.startsWith("/")) {
path = path.substring(1);
}
fileMetaData = mergeInto(footer.getParquetMetadata().getFileMetaData(), fileMetaData);
for (BlockMetaData block : footer.getParquetMetadata().getBlocks()) {
block.setPath(path);
blocks.add(block);
}
}
return new ParquetMetadata(fileMetaData.merge(), blocks);
}
/**
* @return the current position in the underlying file
* @throws IOException
*/
public long getPos() throws IOException {
return out.getPos();
}
/**
* Will merge the metadata of all the footers together
* @param footers the list files footers to merge
* @return the global meta data for all the footers
*/
static GlobalMetaData getGlobalMetaData(List<Footer> footers) {
GlobalMetaData fileMetaData = null;
for (Footer footer : footers) {
ParquetMetadata currentMetadata = footer.getParquetMetadata();
fileMetaData = mergeInto(currentMetadata.getFileMetaData(), fileMetaData);
}
return fileMetaData;
}
/**
* Will return the result of merging toMerge into mergedMetadata
* @param toMerge the metadata toMerge
* @param mergedMetadata the reference metadata to merge into
* @return the result of the merge
*/
static GlobalMetaData mergeInto(
FileMetaData toMerge,
GlobalMetaData mergedMetadata) {
MessageType schema = null;
Map<String, Set<String>> newKeyValues = new HashMap<String, Set<String>>();
Set<String> createdBy = new HashSet<String>();
if (mergedMetadata != null) {
schema = mergedMetadata.getSchema();
newKeyValues.putAll(mergedMetadata.getKeyValueMetaData());
createdBy.addAll(mergedMetadata.getCreatedBy());
}
if ((schema == null && toMerge.getSchema() != null)
|| (schema != null && !schema.equals(toMerge.getSchema()))) {
schema = mergeInto(toMerge.getSchema(), schema);
}
for (Entry<String, String> entry : toMerge.getKeyValueMetaData().entrySet()) {
Set<String> values = newKeyValues.get(entry.getKey());
if (values == null) {
values = new HashSet<String>();
newKeyValues.put(entry.getKey(), values);
}
values.add(entry.getValue());
}
createdBy.add(toMerge.getCreatedBy());
return new GlobalMetaData(
schema,
newKeyValues,
createdBy);
}
/**
* will return the result of merging toMerge into mergedSchema
* @param toMerge the schema to merge into mergedSchema
* @param mergedSchema the schema to append the fields to
* @return the resulting schema
*/
static MessageType mergeInto(MessageType toMerge, MessageType mergedSchema) {
if (mergedSchema == null) {
return toMerge;
}
return mergedSchema.union(toMerge);
}
}