blob: c57a7c2e9d7535af3f87b67421fed60f8abf464a [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.drill.exec.store.parquet;
import static java.lang.Math.ceil;
import static java.lang.Math.max;
import static java.lang.Math.min;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.NoSuchElementException;
import java.util.stream.Collectors;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.drill.common.AutoCloseables;
import org.apache.drill.common.exceptions.DrillRuntimeException;
import org.apache.drill.common.expression.SchemaPath;
import org.apache.drill.common.types.TypeProtos;
import org.apache.drill.common.types.TypeProtos.DataMode;
import org.apache.drill.common.types.TypeProtos.MinorType;
import org.apache.drill.common.types.Types;
import org.apache.drill.common.util.DrillVersionInfo;
import org.apache.drill.exec.ExecConstants;
import org.apache.drill.exec.exception.OutOfMemoryException;
import org.apache.drill.exec.ops.FragmentContext;
import org.apache.drill.exec.ops.OperatorContext;
import org.apache.drill.exec.store.StorageStrategy;
import org.apache.drill.exec.store.parquet.compression.DrillCompressionCodecFactory;
import org.apache.drill.exec.planner.physical.WriterPrel;
import org.apache.drill.exec.record.BatchSchema;
import org.apache.drill.exec.record.MaterializedField;
import org.apache.drill.exec.record.TypedFieldId;
import org.apache.drill.exec.record.VectorAccessible;
import org.apache.drill.exec.record.VectorWrapper;
import org.apache.drill.exec.store.EventBasedRecordWriter;
import org.apache.drill.exec.store.EventBasedRecordWriter.FieldConverter;
import org.apache.drill.exec.store.ParquetOutputRecordWriter;
import org.apache.drill.exec.util.DecimalUtility;
import org.apache.drill.exec.vector.BitVector;
import org.apache.drill.exec.vector.complex.BaseRepeatedValueVector;
import org.apache.drill.exec.vector.complex.reader.FieldReader;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.parquet.column.ColumnWriteStore;
import org.apache.parquet.column.ParquetProperties;
import org.apache.parquet.column.ParquetProperties.WriterVersion;
import org.apache.parquet.column.impl.ColumnWriteStoreV1;
import org.apache.parquet.column.impl.ColumnWriteStoreV2;
import org.apache.parquet.column.values.factory.DefaultV1ValuesWriterFactory;
import org.apache.parquet.column.values.factory.DefaultV2ValuesWriterFactory;
import org.apache.parquet.column.values.factory.ValuesWriterFactory;
import org.apache.parquet.compression.CompressionCodecFactory;
import org.apache.parquet.hadoop.ParquetColumnChunkPageWriteStore;
import org.apache.parquet.hadoop.ParquetFileWriter;
import org.apache.parquet.hadoop.metadata.CompressionCodecName;
import org.apache.parquet.io.ColumnIOFactory;
import org.apache.parquet.io.MessageColumnIO;
import org.apache.parquet.io.api.RecordConsumer;
import org.apache.parquet.schema.DecimalMetadata;
import org.apache.parquet.schema.GroupType;
import org.apache.parquet.schema.MessageType;
import org.apache.parquet.schema.OriginalType;
import org.apache.parquet.schema.PrimitiveType;
import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName;
import org.apache.parquet.schema.Type;
import org.apache.parquet.schema.Type.Repetition;
import org.apache.parquet.schema.Types.ListBuilder;
public class ParquetRecordWriter extends ParquetOutputRecordWriter {
private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ParquetRecordWriter.class);
private static final int MINIMUM_BUFFER_SIZE = 64 * 1024;
private static final int MINIMUM_RECORD_COUNT_FOR_CHECK = 100;
private static final int MAXIMUM_RECORD_COUNT_FOR_CHECK = 10000;
private static final int BLOCKSIZE_MULTIPLE = 64 * 1024;
/**
* Name of nested group for Parquet's {@code MAP} type.
* @see <a href="https://github.com/apache/parquet-format/blob/master/LogicalTypes.md#maps">MAP logical type</a>
*/
private static final String GROUP_KEY_VALUE_NAME = "key_value";
public static final String DRILL_VERSION_PROPERTY = "drill.version";
public static final String WRITER_VERSION_PROPERTY = "drill-writer.version";
private final StorageStrategy storageStrategy;
private ParquetFileWriter parquetFileWriter;
private MessageType schema;
private Map<String, String> extraMetaData = new HashMap<>();
private int blockSize;
private int pageSize;
private int dictionaryPageSize;
private boolean enableDictionary = false;
private boolean useSingleFSBlock = false;
private CompressionCodecName codec = CompressionCodecName.SNAPPY;
private WriterVersion writerVersion = WriterVersion.PARQUET_1_0;
private CompressionCodecFactory codecFactory;
private long recordCount = 0;
private long recordCountForNextMemCheck = MINIMUM_RECORD_COUNT_FOR_CHECK;
private ColumnWriteStore store;
private ParquetColumnChunkPageWriteStore pageStore;
private RecordConsumer consumer;
private BatchSchema batchSchema;
private Configuration conf;
private FileSystem fs;
private String location;
private List<Path> cleanUpLocations;
private String prefix;
private int index = 0;
private OperatorContext oContext;
private List<String> partitionColumns;
private boolean hasPartitions;
private PrimitiveTypeName logicalTypeForDecimals;
private boolean usePrimitiveTypesForDecimals;
/** Is used to ensure that empty Parquet file will be written if no rows were provided. */
private boolean empty = true;
public ParquetRecordWriter(FragmentContext context, ParquetWriter writer) throws OutOfMemoryException {
this.oContext = context.newOperatorContext(writer);
this.codecFactory = DrillCompressionCodecFactory.createDirectCodecFactory(
writer.getFormatPlugin().getFsConf(),
new ParquetDirectByteBufferAllocator(oContext.getAllocator()),
pageSize
);
this.partitionColumns = writer.getPartitionColumns();
this.hasPartitions = partitionColumns != null && partitionColumns.size() > 0;
this.extraMetaData.put(DRILL_VERSION_PROPERTY, DrillVersionInfo.getVersion());
this.extraMetaData.put(WRITER_VERSION_PROPERTY, String.valueOf(ParquetWriter.WRITER_VERSION));
this.storageStrategy = writer.getStorageStrategy() == null ? StorageStrategy.DEFAULT : writer.getStorageStrategy();
this.cleanUpLocations = new ArrayList<>();
this.conf = new Configuration(writer.getFormatPlugin().getFsConf());
}
@Override
public void init(Map<String, String> writerOptions) throws IOException {
this.location = writerOptions.get("location");
this.prefix = writerOptions.get("prefix");
fs = FileSystem.get(conf);
blockSize = Integer.parseInt(writerOptions.get(ExecConstants.PARQUET_BLOCK_SIZE));
pageSize = Integer.parseInt(writerOptions.get(ExecConstants.PARQUET_PAGE_SIZE));
dictionaryPageSize= Integer.parseInt(writerOptions.get(ExecConstants.PARQUET_DICT_PAGE_SIZE));
String codecName = writerOptions.get(ExecConstants.PARQUET_WRITER_COMPRESSION_TYPE).toLowerCase();
switch(codecName) {
case "none":
case "uncompressed":
codec = CompressionCodecName.UNCOMPRESSED;
break;
case "brotli":
codec = CompressionCodecName.BROTLI;
break;
case "gzip":
codec = CompressionCodecName.GZIP;
break;
case "lz4":
codec = CompressionCodecName.LZ4;
break;
case "lzo":
codec = CompressionCodecName.LZO;
break;
case "snappy":
codec = CompressionCodecName.SNAPPY;
break;
case "zstd":
codec = CompressionCodecName.ZSTD;
break;
default:
throw new UnsupportedOperationException(String.format("Unknown compression type: %s", codecName));
}
String logicalTypeNameForDecimals = writerOptions.get(ExecConstants.PARQUET_WRITER_LOGICAL_TYPE_FOR_DECIMALS).toLowerCase();
switch (logicalTypeNameForDecimals) {
case "fixed_len_byte_array":
logicalTypeForDecimals = PrimitiveTypeName.FIXED_LEN_BYTE_ARRAY;
break;
case "binary":
logicalTypeForDecimals = PrimitiveTypeName.BINARY;
break;
default:
throw new UnsupportedOperationException(
String.format(
"Unsupported logical type for decimals: %s\n" +
"Supported types: ['fixed_len_byte_array', 'binary']", codecName));
}
enableDictionary = Boolean.parseBoolean(writerOptions.get(ExecConstants.PARQUET_WRITER_ENABLE_DICTIONARY_ENCODING));
useSingleFSBlock = Boolean.parseBoolean(writerOptions.get(ExecConstants.PARQUET_WRITER_USE_SINGLE_FS_BLOCK));
usePrimitiveTypesForDecimals = Boolean.parseBoolean(writerOptions.get(ExecConstants.PARQUET_WRITER_USE_PRIMITIVE_TYPES_FOR_DECIMALS));
writerVersion = WriterVersion.fromString(
writerOptions.get(ExecConstants.PARQUET_WRITER_FORMAT_VERSION)
);
if (useSingleFSBlock) {
// Round up blockSize to multiple of 64K.
blockSize = (int)ceil((double)blockSize/BLOCKSIZE_MULTIPLE) * BLOCKSIZE_MULTIPLE;
}
}
private boolean containsComplexVectors(BatchSchema schema) {
for (MaterializedField field : schema) {
MinorType type = field.getType().getMinorType();
switch (type) {
case MAP:
case DICT:
case LIST:
return true;
default:
}
}
return false;
}
@Override
public void updateSchema(VectorAccessible batch) throws IOException {
if (this.batchSchema == null || !this.batchSchema.equals(batch.getSchema()) || containsComplexVectors(this.batchSchema)) {
if (this.batchSchema != null) {
flush(false);
}
this.batchSchema = batch.getSchema();
newSchema();
}
TypedFieldId fieldId = batch.getValueVectorId(SchemaPath.getSimplePath(WriterPrel.PARTITION_COMPARATOR_FIELD));
if (fieldId != null) {
VectorWrapper w = batch.getValueAccessorById(BitVector.class, fieldId.getFieldIds());
setPartitionVector((BitVector) w.getValueVector());
}
}
private void newSchema() {
List<Type> types = new ArrayList<>();
for (MaterializedField field : batchSchema) {
pruneUnsupported(field);
if (!supportsField(field)) {
continue;
}
types.add(getType(field));
}
schema = new MessageType("root", types);
// We don't want this number to be too small, ideally we divide the block equally across the columns.
// It is unlikely all columns are going to be the same size.
// Its value is likely below Integer.MAX_VALUE (2GB), although rowGroupSize is a long type.
// Therefore this size is cast to int, since allocating byte array in under layer needs to
// limit the array size in an int scope.
int initialBlockBufferSize = this.schema.getColumns().size() > 0 ?
max(MINIMUM_BUFFER_SIZE, blockSize / this.schema.getColumns().size() / 5) : MINIMUM_BUFFER_SIZE;
// We don't want this number to be too small either. Ideally, slightly bigger than the page size,
// but not bigger than the block buffer
int initialPageBufferSize = max(MINIMUM_BUFFER_SIZE, min(pageSize + pageSize / 10, initialBlockBufferSize));
ValuesWriterFactory valWriterFactory = writerVersion == WriterVersion.PARQUET_1_0
? new DefaultV1ValuesWriterFactory()
: new DefaultV2ValuesWriterFactory();
ParquetProperties parquetProperties = ParquetProperties.builder()
.withPageSize(pageSize)
.withDictionaryEncoding(enableDictionary)
.withDictionaryPageSize(initialPageBufferSize)
.withAllocator(new ParquetDirectByteBufferAllocator(oContext))
.withValuesWriterFactory(valWriterFactory)
.withWriterVersion(writerVersion)
.build();
// TODO: Replace ParquetColumnChunkPageWriteStore with ColumnChunkPageWriteStore from parquet library
// once DRILL-7906 (PARQUET-1006) will be resolved
pageStore = new ParquetColumnChunkPageWriteStore(codecFactory.getCompressor(codec), schema,
parquetProperties.getInitialSlabSize(), pageSize, parquetProperties.getAllocator(),
parquetProperties.getColumnIndexTruncateLength(), parquetProperties.getPageWriteChecksumEnabled());
store = writerVersion == WriterVersion.PARQUET_1_0
? new ColumnWriteStoreV1(schema, pageStore, parquetProperties)
: new ColumnWriteStoreV2(schema, pageStore, parquetProperties);
MessageColumnIO columnIO = new ColumnIOFactory(false).getColumnIO(this.schema);
consumer = columnIO.getRecordWriter(store);
setUp(schema, consumer);
}
/**
* Recursively prunes childless MAPs from the field tree proceeding depth
* first so that fields that are rendered childless by removals of their
* descendants are eventually correctly removed themselves.
* @param field a top level field.
*/
private void pruneUnsupported(MaterializedField field) {
for (MaterializedField child: field.getChildren()) {
pruneUnsupported(child);
if (!supportsField(child)) {
field.removeChild(child);
}
}
}
@Override
public boolean supportsField(MaterializedField field) {
return super.supportsField(field)
&& (field.getType().getMinorType() != MinorType.MAP || field.getChildCount() > 0);
}
@Override
protected PrimitiveType getPrimitiveType(MaterializedField field) {
MinorType minorType = field.getType().getMinorType();
String name = field.getName();
int length = ParquetTypeHelper.getLengthForMinorType(minorType);
PrimitiveTypeName primitiveTypeName = ParquetTypeHelper.getPrimitiveTypeNameForMinorType(minorType);
if (Types.isDecimalType(minorType)) {
primitiveTypeName = logicalTypeForDecimals;
if (usePrimitiveTypesForDecimals) {
if (field.getPrecision() <= ParquetTypeHelper.getMaxPrecisionForPrimitiveType(PrimitiveTypeName.INT32)) {
primitiveTypeName = PrimitiveTypeName.INT32;
} else if (field.getPrecision() <= ParquetTypeHelper.getMaxPrecisionForPrimitiveType(PrimitiveTypeName.INT64)) {
primitiveTypeName = PrimitiveTypeName.INT64;
}
}
length = DecimalUtility.getMaxBytesSizeForPrecision(field.getPrecision());
}
Repetition repetition = ParquetTypeHelper.getRepetitionForDataMode(field.getDataMode());
OriginalType originalType = ParquetTypeHelper.getOriginalTypeForMinorType(minorType);
DecimalMetadata decimalMetadata = ParquetTypeHelper.getDecimalMetadataForField(field);
return new PrimitiveType(repetition, primitiveTypeName, length, name, originalType, decimalMetadata, null);
}
private Type getType(MaterializedField field) {
MinorType minorType = field.getType().getMinorType();
DataMode dataMode = field.getType().getMode();
switch (minorType) {
case MAP:
List<Type> types = getChildrenTypes(field);
return new GroupType(dataMode == DataMode.REPEATED ? Repetition.REPEATED : Repetition.OPTIONAL, field.getName(), types);
case DICT:
// RepeatedDictVector has DictVector as data vector hence the need to get the first child
// for REPEATED case to be able to access map's key and value fields
MaterializedField dictField = dataMode != DataMode.REPEATED
? field : ((List<MaterializedField>) field.getChildren()).get(0);
List<Type> keyValueTypes = getChildrenTypes(dictField);
GroupType keyValueGroup = new GroupType(Repetition.REPEATED, GROUP_KEY_VALUE_NAME, keyValueTypes);
if (dataMode == DataMode.REPEATED) {
// Parquet's MAP repetition must be either optional or required, so nest it inside Parquet's LIST type
GroupType elementType = org.apache.parquet.schema.Types.buildGroup(Repetition.OPTIONAL)
.as(OriginalType.MAP)
.addField(keyValueGroup)
.named(LIST);
GroupType listGroup = new GroupType(Repetition.REPEATED, LIST, elementType);
return org.apache.parquet.schema.Types.buildGroup(Repetition.OPTIONAL)
.as(OriginalType.LIST)
.addField(listGroup)
.named(field.getName());
} else {
return org.apache.parquet.schema.Types.buildGroup(Repetition.OPTIONAL)
.as(OriginalType.MAP)
.addField(keyValueGroup)
.named(field.getName());
}
case LIST:
MaterializedField elementField = getDataField(field);
ListBuilder<GroupType> listBuilder = org.apache.parquet.schema.Types
.list(dataMode == DataMode.OPTIONAL ? Repetition.OPTIONAL : Repetition.REQUIRED);
addElementType(listBuilder, elementField);
GroupType listType = listBuilder.named(field.getName());
return listType;
case NULL:
MaterializedField newField = field.withType(
TypeProtos.MajorType.newBuilder().setMinorType(MinorType.INT).setMode(DataMode.OPTIONAL).build());
return getPrimitiveType(newField);
default:
return getPrimitiveType(field);
}
}
/**
* Helper method for conversion of map child
* fields.
*
* @param field map
* @return converted child fields
*/
private List<Type> getChildrenTypes(MaterializedField field) {
return field.getChildren().stream()
.map(this::getType)
.collect(Collectors.toList());
}
/**
* For list or repeated type possible child fields are {@link BaseRepeatedValueVector#DATA_VECTOR_NAME}
* and {@link BaseRepeatedValueVector#OFFSETS_VECTOR_NAME}. This method used to find the data field.
*
* @param field parent repeated field
* @return child data field
*/
private MaterializedField getDataField(MaterializedField field) {
return field.getChildren().stream()
.filter(child -> BaseRepeatedValueVector.DATA_VECTOR_NAME.equals(child.getName()))
.findAny()
.orElseThrow(() -> new NoSuchElementException(String.format(
"Failed to get elementField '%s' from list: %s",
BaseRepeatedValueVector.DATA_VECTOR_NAME, field.getChildren())));
}
/**
* Adds element type to {@code listBuilder} based on Drill's
* {@code elementField}.
*
* @param listBuilder list schema builder
* @param elementField Drill's type of list elements
*/
private void addElementType(ListBuilder<GroupType> listBuilder, MaterializedField elementField) {
if (elementField.getDataMode() == DataMode.REPEATED) {
ListBuilder<GroupType> inner = org.apache.parquet.schema.Types.requiredList();
if (elementField.getType().getMinorType() == MinorType.MAP) {
GroupType mapGroupType = new GroupType(Repetition.REQUIRED, ELEMENT, getChildrenTypes(elementField));
inner.element(mapGroupType);
} else {
MaterializedField child2 = getDataField(elementField);
addElementType(inner, child2);
}
listBuilder.setElementType(inner.named(ELEMENT));
} else {
Type element = getType(elementField);
// element may have internal name '$data$',
// rename it to 'element' according to Parquet list schema
if (element.isPrimitive()) {
PrimitiveType primitiveElement = element.asPrimitiveType();
element = new PrimitiveType(
primitiveElement.getRepetition(),
primitiveElement.getPrimitiveTypeName(),
ELEMENT,
primitiveElement.getOriginalType()
);
} else {
GroupType groupElement = element.asGroupType();
element = new GroupType(groupElement.getRepetition(),
ELEMENT, groupElement.getFields());
}
listBuilder.element(element);
}
}
@Override
public void checkForNewPartition(int index) {
if (!hasPartitions) {
return;
}
try {
boolean newPartition = newPartition(index);
if (newPartition) {
flush(false);
newSchema();
}
} catch (Exception e) {
throw new DrillRuntimeException(e);
}
}
private void flush(boolean cleanUp) throws IOException {
try {
if (recordCount > 0) {
flushParquetFileWriter();
} else if (cleanUp && empty && schema != null && schema.getFieldCount() > 0) {
// Write empty parquet if:
// 1) This is a cleanup - no any additional records can be written
// 2) No file was written until this moment
// 3) Schema is set
// 4) Schema is not empty
createParquetFileWriter();
flushParquetFileWriter();
}
} finally {
AutoCloseables.closeSilently(pageStore);
// ColumnWriteStore doesn't implement AutoCloseable so a manual safe
// closure must be written.
try {
store.close();
} catch (Exception e) {
logger.warn("Error closing {}", store, e);
}
codecFactory.release();
store = null;
pageStore = null;
index++;
}
}
private void checkBlockSizeReached() throws IOException {
if (recordCount >= recordCountForNextMemCheck) { // checking the memory size is relatively expensive, so let's not do it for every record.
long memSize = store.getBufferedSize();
if (memSize > blockSize) {
logger.debug("Reached block size " + blockSize);
flush(false);
newSchema();
recordCountForNextMemCheck = min(max(MINIMUM_RECORD_COUNT_FOR_CHECK, recordCount / 2), MAXIMUM_RECORD_COUNT_FOR_CHECK);
} else {
float recordSize = (float) memSize / recordCount;
recordCountForNextMemCheck = min(
max(MINIMUM_RECORD_COUNT_FOR_CHECK, (recordCount + (long)(blockSize / recordSize)) / 2), // will check halfway
recordCount + MAXIMUM_RECORD_COUNT_FOR_CHECK // will not look more than max records ahead
);
}
}
}
@Override
public FieldConverter getNewMapConverter(int fieldId, String fieldName, FieldReader reader) {
return new MapParquetConverter(fieldId, fieldName, reader);
}
public class MapParquetConverter extends FieldConverter {
List<FieldConverter> converters = new ArrayList<>();
public MapParquetConverter(int fieldId, String fieldName, FieldReader reader) {
super(fieldId, fieldName, reader);
int i = 0;
for (String name : reader) {
FieldConverter converter = EventBasedRecordWriter.getConverter(ParquetRecordWriter.this, i++, name, reader.reader(name));
converters.add(converter);
}
}
@Override
public void writeField() throws IOException {
if (!converters.isEmpty()) {
consumer.startField(fieldName, fieldId);
consumer.startGroup();
for (FieldConverter converter : converters) {
converter.writeField();
}
consumer.endGroup();
consumer.endField(fieldName, fieldId);
}
}
}
@Override
public FieldConverter getNewRepeatedMapConverter(int fieldId, String fieldName, FieldReader reader) {
return new RepeatedMapParquetConverter(fieldId, fieldName, reader);
}
public class RepeatedMapParquetConverter extends FieldConverter {
List<FieldConverter> converters = new ArrayList<>();
public RepeatedMapParquetConverter(int fieldId, String fieldName, FieldReader reader) {
super(fieldId, fieldName, reader);
int i = 0;
for (String name : reader) {
FieldConverter converter = EventBasedRecordWriter.getConverter(ParquetRecordWriter.this, i++, name, reader.reader(name));
converters.add(converter);
}
}
@Override
public void writeField() throws IOException {
if (reader.size() == 0) {
return;
}
consumer.startField(fieldName, fieldId);
while (reader.next()) {
consumer.startGroup();
for (FieldConverter converter : converters) {
converter.writeField();
}
consumer.endGroup();
}
consumer.endField(fieldName, fieldId);
}
@Override
public void writeListField() throws IOException {
if (reader.size() == 0) {
return;
}
consumer.startField(LIST, ZERO_IDX);
while (reader.next()) {
consumer.startGroup();
consumer.startField(ELEMENT, ZERO_IDX);
consumer.startGroup();
for (FieldConverter converter : converters) {
converter.writeField();
}
consumer.endGroup();
consumer.endField(ELEMENT, ZERO_IDX);
consumer.endGroup();
}
consumer.endField(LIST, ZERO_IDX);
}
}
@Override
public FieldConverter getNewRepeatedListConverter(int fieldId, String fieldName, FieldReader reader) {
return new RepeatedListParquetConverter(fieldId, fieldName, reader);
}
public class RepeatedListParquetConverter extends FieldConverter {
private final FieldConverter converter;
RepeatedListParquetConverter(int fieldId, String fieldName, FieldReader reader) {
super(fieldId, fieldName, reader);
converter = EventBasedRecordWriter.getConverter(ParquetRecordWriter.this, 0, "", reader.reader());
}
@Override
public void writeField() throws IOException {
consumer.startField(fieldName, fieldId);
consumer.startField(LIST, ZERO_IDX);
while (reader.next()) {
consumer.startGroup();
consumer.startField(ELEMENT, ZERO_IDX);
converter.writeListField();
consumer.endField(ELEMENT, ZERO_IDX);
consumer.endGroup();
}
consumer.endField(LIST, ZERO_IDX);
consumer.endField(fieldName, fieldId);
}
}
@Override
public FieldConverter getNewDictConverter(int fieldId, String fieldName, FieldReader reader) {
return new DictParquetConverter(fieldId, fieldName, reader);
}
public class DictParquetConverter extends FieldConverter {
List<FieldConverter> converters = new ArrayList<>();
public DictParquetConverter(int fieldId, String fieldName, FieldReader reader) {
super(fieldId, fieldName, reader);
int i = 0;
for (String name : reader) {
FieldConverter converter = EventBasedRecordWriter.getConverter(
ParquetRecordWriter.this, i++, name, reader.reader(name));
converters.add(converter);
}
}
@Override
public void writeField() throws IOException {
if (reader.size() == 0) {
return;
}
consumer.startField(fieldName, fieldId);
consumer.startGroup();
consumer.startField(GROUP_KEY_VALUE_NAME, 0);
while (reader.next()) {
consumer.startGroup();
for (FieldConverter converter : converters) {
converter.writeField();
}
consumer.endGroup();
}
consumer.endField(GROUP_KEY_VALUE_NAME, 0);
consumer.endGroup();
consumer.endField(fieldName, fieldId);
}
}
@Override
public FieldConverter getNewRepeatedDictConverter(int fieldId, String fieldName, FieldReader reader) {
return new RepeatedDictParquetConverter(fieldId, fieldName, reader);
}
public class RepeatedDictParquetConverter extends FieldConverter {
private final FieldConverter dictConverter;
public RepeatedDictParquetConverter(int fieldId, String fieldName, FieldReader reader) {
super(fieldId, fieldName, reader);
dictConverter = new DictParquetConverter(0, ELEMENT, reader.reader());
}
@Override
public void writeField() throws IOException {
if (reader.size() == 0) {
return;
}
consumer.startField(fieldName, fieldId);
consumer.startGroup();
consumer.startField(LIST, 0);
while (reader.next()) {
consumer.startGroup();
dictConverter.writeField();
consumer.endGroup();
}
consumer.endField(LIST, 0);
consumer.endGroup();
consumer.endField(fieldName, fieldId);
}
}
@Override
public void startRecord() throws IOException {
if (CollectionUtils.isEmpty(schema.getFields())) {
return;
}
consumer.startMessage();
}
@Override
public void endRecord() throws IOException {
if (CollectionUtils.isEmpty(schema.getFields())) {
return;
}
consumer.endMessage();
// we wait until there is at least one record before creating the parquet file
if (parquetFileWriter == null) {
createParquetFileWriter();
}
empty = false;
recordCount++;
checkBlockSizeReached();
}
@Override
public void abort() throws IOException {
List<String> errors = new ArrayList<>();
for (Path location : cleanUpLocations) {
try {
if (fs.exists(location)) {
fs.delete(location, true);
logger.info("Aborting writer. Location [{}] on file system [{}] is deleted.",
location.toUri().getPath(), fs.getUri());
}
} catch (IOException e) {
errors.add(location.toUri().getPath());
logger.error("Failed to delete location [{}] on file system [{}].",
location, fs.getUri(), e);
}
}
if (!errors.isEmpty()) {
throw new IOException(String.format("Failed to delete the following locations %s on file system [%s]" +
" during aborting writer", errors, fs.getUri()));
}
}
@Override
public void cleanup() throws IOException {
flush(true);
}
private void createParquetFileWriter() throws IOException {
Path path = new Path(location, prefix + "_" + index + ".parquet");
// to ensure that our writer was the first to create output file, we create empty file first and fail if file exists
Path firstCreatedPath = storageStrategy.createFileAndApply(fs, path);
// since parquet reader supports partitions, it means that several output files may be created
// if this writer was the one to create table folder, we store only folder and delete it with its content in case of abort
// if table location was created before, we store only files created by this writer and delete them in case of abort
addCleanUpLocation(fs, firstCreatedPath);
// since ParquetFileWriter will overwrite empty output file (append is not supported)
// we need to re-apply file permission
if (useSingleFSBlock) {
// Passing blockSize creates files with this blockSize instead of filesystem default blockSize.
// Currently, this is supported only by filesystems included in
// BLOCK_FS_SCHEMES (ParquetFileWriter.java in parquet-mr), which includes HDFS.
// For other filesystems, it uses default blockSize configured for the file system.
parquetFileWriter = new ParquetFileWriter(conf, schema, path, ParquetFileWriter.Mode.OVERWRITE, blockSize, 0);
} else {
parquetFileWriter = new ParquetFileWriter(conf, schema, path, ParquetFileWriter.Mode.OVERWRITE);
}
storageStrategy.applyToFile(fs, path);
parquetFileWriter.start();
}
private void flushParquetFileWriter() throws IOException {
parquetFileWriter.startBlock(recordCount);
consumer.flush();
store.flush();
pageStore.flushToFileWriter(parquetFileWriter);
recordCount = 0;
parquetFileWriter.endBlock();
// we are writing one single block per file
parquetFileWriter.end(extraMetaData);
parquetFileWriter = null;
}
/**
* Adds passed location to the list of locations to be cleaned up in case of abort.
* Add locations if:
* <li>if no locations were added before</li>
* <li>if first location is a file</li>
*
* If first added location is a folder, we don't add other locations (which can be only files),
* since this writer was the one to create main folder where files are located,
* on abort we'll delete this folder with its content.
*
* If first location is a file, then we add other files, since this writer didn't create main folder
* and on abort we need to delete only created files but not the whole folder.
*
* @param fs file system where location is created
* @param location passed location
* @throws IOException in case of errors during check if passed location is a file
*/
private void addCleanUpLocation(FileSystem fs, Path location) throws IOException {
if (cleanUpLocations.isEmpty() || fs.isFile(cleanUpLocations.get(0))) {
cleanUpLocations.add(location);
}
}
}