blob: 8bd1a8488553d3203dfe7db4073d422c6d3096b3 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi.io.storage.row.parquet;
import org.apache.flink.api.common.typeinfo.BasicArrayTypeInfo;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.SqlTimeTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.typeutils.MapTypeInfo;
import org.apache.flink.api.java.typeutils.ObjectArrayTypeInfo;
import org.apache.flink.api.java.typeutils.RowTypeInfo;
import org.apache.flink.table.types.logical.ArrayType;
import org.apache.flink.table.types.logical.DecimalType;
import org.apache.flink.table.types.logical.LocalZonedTimestampType;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.MapType;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.table.types.logical.TimestampType;
import org.apache.parquet.schema.GroupType;
import org.apache.parquet.schema.LogicalTypeAnnotation;
import org.apache.parquet.schema.MessageType;
import org.apache.parquet.schema.OriginalType;
import org.apache.parquet.schema.PrimitiveType;
import org.apache.parquet.schema.Type;
import org.apache.parquet.schema.Types;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.lang.reflect.Array;
import java.util.ArrayList;
import java.util.List;
import static org.apache.parquet.schema.LogicalTypeAnnotation.TimeUnit;
/**
* Schema converter converts Parquet schema to and from Flink internal types.
*
* <p>Reference org.apache.flink.formats.parquet.utils.ParquetSchemaConverter to support timestamp of INT64 8 bytes.
*/
public class ParquetSchemaConverter {
private static final Logger LOGGER = LoggerFactory.getLogger(ParquetSchemaConverter.class);
public static final String MAP_VALUE = "value";
public static final String LIST_ARRAY_TYPE = "array";
public static final String LIST_ELEMENT = "element";
public static final String LIST_GROUP_NAME = "list";
public static final String MESSAGE_ROOT = "root";
/**
* Converts Parquet schema to Flink Internal Type.
*
* @param type Parquet schema
* @return Flink type information
*/
public static TypeInformation<?> fromParquetType(MessageType type) {
return convertFields(type.getFields());
}
/**
* Converts Flink Internal Type to Parquet schema.
*
* @param typeInformation Flink type information
* @param legacyMode is standard LIST and MAP schema or back-compatible schema
* @return Parquet schema
*/
public static MessageType toParquetType(
TypeInformation<?> typeInformation, boolean legacyMode) {
return (MessageType)
convertField(null, typeInformation, Type.Repetition.OPTIONAL, legacyMode);
}
public static TypeInformation<?> convertFields(List<Type> parquetFields) {
List<TypeInformation<?>> types = new ArrayList<>();
List<String> names = new ArrayList<>();
for (Type field : parquetFields) {
TypeInformation<?> subType = convertParquetTypeToTypeInfo(field);
if (subType != null) {
types.add(subType);
names.add(field.getName());
} else {
LOGGER.error(
"Parquet field {} in schema type {} can not be converted to Flink Internal Type",
field.getName(),
field.getOriginalType().name());
}
}
return new RowTypeInfo(
types.toArray(new TypeInformation<?>[0]), names.toArray(new String[0]));
}
public static TypeInformation<?> convertParquetTypeToTypeInfo(final Type fieldType) {
TypeInformation<?> typeInfo;
if (fieldType.isPrimitive()) {
OriginalType originalType = fieldType.getOriginalType();
PrimitiveType primitiveType = fieldType.asPrimitiveType();
switch (primitiveType.getPrimitiveTypeName()) {
case BINARY:
if (originalType != null) {
switch (originalType) {
case DECIMAL:
typeInfo = BasicTypeInfo.BIG_DEC_TYPE_INFO;
break;
case UTF8:
case ENUM:
case JSON:
case BSON:
typeInfo = BasicTypeInfo.STRING_TYPE_INFO;
break;
default:
throw new UnsupportedOperationException(
"Unsupported original type : "
+ originalType.name()
+ " for primitive type BINARY");
}
} else {
typeInfo = BasicTypeInfo.STRING_TYPE_INFO;
}
break;
case BOOLEAN:
typeInfo = BasicTypeInfo.BOOLEAN_TYPE_INFO;
break;
case INT32:
if (originalType != null) {
switch (originalType) {
case TIME_MICROS:
case TIME_MILLIS:
typeInfo = SqlTimeTypeInfo.TIME;
break;
case TIMESTAMP_MICROS:
case TIMESTAMP_MILLIS:
typeInfo = SqlTimeTypeInfo.TIMESTAMP;
break;
case DATE:
typeInfo = SqlTimeTypeInfo.DATE;
break;
case UINT_8:
case UINT_16:
case UINT_32:
typeInfo = BasicTypeInfo.INT_TYPE_INFO;
break;
case INT_8:
typeInfo = org.apache.flink.api.common.typeinfo.Types.BYTE;
break;
case INT_16:
typeInfo = org.apache.flink.api.common.typeinfo.Types.SHORT;
break;
case INT_32:
typeInfo = BasicTypeInfo.INT_TYPE_INFO;
break;
default:
throw new UnsupportedOperationException(
"Unsupported original type : "
+ originalType.name()
+ " for primitive type INT32");
}
} else {
typeInfo = BasicTypeInfo.INT_TYPE_INFO;
}
break;
case INT64:
if (originalType != null) {
switch (originalType) {
case TIME_MICROS:
typeInfo = SqlTimeTypeInfo.TIME;
break;
case TIMESTAMP_MICROS:
case TIMESTAMP_MILLIS:
typeInfo = SqlTimeTypeInfo.TIMESTAMP;
break;
case INT_64:
case DECIMAL:
typeInfo = BasicTypeInfo.LONG_TYPE_INFO;
break;
default:
throw new UnsupportedOperationException(
"Unsupported original type : "
+ originalType.name()
+ " for primitive type INT64");
}
} else {
typeInfo = BasicTypeInfo.LONG_TYPE_INFO;
}
break;
case INT96:
// It stores a timestamp type data, we read it as millisecond
typeInfo = SqlTimeTypeInfo.TIMESTAMP;
break;
case FLOAT:
typeInfo = BasicTypeInfo.FLOAT_TYPE_INFO;
break;
case DOUBLE:
typeInfo = BasicTypeInfo.DOUBLE_TYPE_INFO;
break;
case FIXED_LEN_BYTE_ARRAY:
if (originalType != null) {
switch (originalType) {
case DECIMAL:
typeInfo = BasicTypeInfo.BIG_DEC_TYPE_INFO;
break;
default:
throw new UnsupportedOperationException(
"Unsupported original type : "
+ originalType.name()
+ " for primitive type FIXED_LEN_BYTE_ARRAY");
}
} else {
typeInfo = BasicTypeInfo.BIG_DEC_TYPE_INFO;
}
break;
default:
throw new UnsupportedOperationException("Unsupported schema: " + fieldType);
}
} else {
GroupType parquetGroupType = fieldType.asGroupType();
OriginalType originalType = parquetGroupType.getOriginalType();
if (originalType != null) {
switch (originalType) {
case LIST:
if (parquetGroupType.getFieldCount() != 1) {
throw new UnsupportedOperationException(
"Invalid list type " + parquetGroupType);
}
Type repeatedType = parquetGroupType.getType(0);
if (!repeatedType.isRepetition(Type.Repetition.REPEATED)) {
throw new UnsupportedOperationException(
"Invalid list type " + parquetGroupType);
}
if (repeatedType.isPrimitive()) {
typeInfo = convertParquetPrimitiveListToFlinkArray(repeatedType);
} else {
// Backward-compatibility element group name can be any string
// (element/array/other)
GroupType elementType = repeatedType.asGroupType();
// If the repeated field is a group with multiple fields, then its type
// is the element
// type and elements are required.
if (elementType.getFieldCount() > 1) {
for (Type type : elementType.getFields()) {
if (!type.isRepetition(Type.Repetition.REQUIRED)) {
throw new UnsupportedOperationException(
String.format(
"List field [%s] in List [%s] has to be required. ",
type.toString(), fieldType.getName()));
}
}
typeInfo =
ObjectArrayTypeInfo.getInfoFor(
convertParquetTypeToTypeInfo(elementType));
} else {
Type internalType = elementType.getType(0);
if (internalType.isPrimitive()) {
typeInfo =
convertParquetPrimitiveListToFlinkArray(internalType);
} else {
// No need to do special process for group named array and tuple
GroupType tupleGroup = internalType.asGroupType();
if (tupleGroup.getFieldCount() == 1
&& tupleGroup
.getFields()
.get(0)
.isRepetition(Type.Repetition.REQUIRED)) {
typeInfo =
ObjectArrayTypeInfo.getInfoFor(
convertParquetTypeToTypeInfo(internalType));
} else {
throw new UnsupportedOperationException(
String.format(
"Unrecgonized List schema [%s] according to Parquet"
+ " standard",
parquetGroupType.toString()));
}
}
}
}
break;
case MAP_KEY_VALUE:
case MAP:
// The outer-most level must be a group annotated with MAP
// that contains a single field named key_value
if (parquetGroupType.getFieldCount() != 1
|| parquetGroupType.getType(0).isPrimitive()) {
throw new UnsupportedOperationException(
"Invalid map type " + parquetGroupType);
}
// The middle level must be a repeated group with a key field for map keys
// and, optionally, a value field for map values. But we can't enforce two
// strict condition here
// the schema generated by Parquet lib doesn't contain LogicalType
// ! mapKeyValType.getOriginalType().equals(OriginalType.MAP_KEY_VALUE)
GroupType mapKeyValType = parquetGroupType.getType(0).asGroupType();
if (!mapKeyValType.isRepetition(Type.Repetition.REPEATED)
|| mapKeyValType.getFieldCount() != 2) {
throw new UnsupportedOperationException(
"The middle level of Map should be single field named key_value. Invalid map type "
+ parquetGroupType);
}
Type keyType = mapKeyValType.getType(0);
// The key field encodes the map's key type. This field must have repetition
// required and
// must always be present.
if (!keyType.isPrimitive()
|| !keyType.isRepetition(Type.Repetition.REQUIRED)
|| !keyType.asPrimitiveType()
.getPrimitiveTypeName()
.equals(PrimitiveType.PrimitiveTypeName.BINARY)
|| !keyType.getOriginalType().equals(OriginalType.UTF8)) {
throw new IllegalArgumentException(
"Map key type must be required binary (UTF8): " + keyType);
}
Type valueType = mapKeyValType.getType(1);
return new MapTypeInfo<>(
BasicTypeInfo.STRING_TYPE_INFO,
convertParquetTypeToTypeInfo(valueType));
default:
throw new UnsupportedOperationException("Unsupported schema: " + fieldType);
}
} else {
// if no original type than it is a record
return convertFields(parquetGroupType.getFields());
}
}
return typeInfo;
}
private static TypeInformation<?> convertParquetPrimitiveListToFlinkArray(Type type) {
// Backward-compatibility element group doesn't exist also allowed
TypeInformation<?> flinkType = convertParquetTypeToTypeInfo(type);
if (flinkType.isBasicType()) {
return BasicArrayTypeInfo.getInfoFor(
Array.newInstance(flinkType.getTypeClass(), 0).getClass());
} else {
// flinkType here can be either SqlTimeTypeInfo or BasicTypeInfo.BIG_DEC_TYPE_INFO,
// So it should be converted to ObjectArrayTypeInfo
return ObjectArrayTypeInfo.getInfoFor(flinkType);
}
}
private static Type convertField(
String fieldName,
TypeInformation<?> typeInfo,
Type.Repetition inheritRepetition,
boolean legacyMode) {
Type fieldType = null;
Type.Repetition repetition =
inheritRepetition == null ? Type.Repetition.OPTIONAL : inheritRepetition;
if (typeInfo instanceof BasicTypeInfo) {
BasicTypeInfo basicTypeInfo = (BasicTypeInfo) typeInfo;
if (basicTypeInfo.equals(BasicTypeInfo.BIG_DEC_TYPE_INFO)
|| basicTypeInfo.equals(BasicTypeInfo.BIG_INT_TYPE_INFO)) {
fieldType =
Types.primitive(PrimitiveType.PrimitiveTypeName.BINARY, repetition)
.as(OriginalType.DECIMAL)
.named(fieldName);
} else if (basicTypeInfo.equals(BasicTypeInfo.INT_TYPE_INFO)) {
fieldType =
Types.primitive(PrimitiveType.PrimitiveTypeName.INT32, repetition)
.as(OriginalType.INT_32)
.named(fieldName);
} else if (basicTypeInfo.equals(BasicTypeInfo.DOUBLE_TYPE_INFO)) {
fieldType =
Types.primitive(PrimitiveType.PrimitiveTypeName.DOUBLE, repetition)
.named(fieldName);
} else if (basicTypeInfo.equals(BasicTypeInfo.FLOAT_TYPE_INFO)) {
fieldType =
Types.primitive(PrimitiveType.PrimitiveTypeName.FLOAT, repetition)
.named(fieldName);
} else if (basicTypeInfo.equals(BasicTypeInfo.LONG_TYPE_INFO)) {
fieldType =
Types.primitive(PrimitiveType.PrimitiveTypeName.INT64, repetition)
.as(OriginalType.INT_64)
.named(fieldName);
} else if (basicTypeInfo.equals(BasicTypeInfo.SHORT_TYPE_INFO)) {
fieldType =
Types.primitive(PrimitiveType.PrimitiveTypeName.INT32, repetition)
.as(OriginalType.INT_16)
.named(fieldName);
} else if (basicTypeInfo.equals(BasicTypeInfo.BYTE_TYPE_INFO)) {
fieldType =
Types.primitive(PrimitiveType.PrimitiveTypeName.INT32, repetition)
.as(OriginalType.INT_8)
.named(fieldName);
} else if (basicTypeInfo.equals(BasicTypeInfo.CHAR_TYPE_INFO)) {
fieldType =
Types.primitive(PrimitiveType.PrimitiveTypeName.BINARY, repetition)
.as(OriginalType.UTF8)
.named(fieldName);
} else if (basicTypeInfo.equals(BasicTypeInfo.BOOLEAN_TYPE_INFO)) {
fieldType =
Types.primitive(PrimitiveType.PrimitiveTypeName.BOOLEAN, repetition)
.named(fieldName);
} else if (basicTypeInfo.equals(BasicTypeInfo.DATE_TYPE_INFO)
|| basicTypeInfo.equals(BasicTypeInfo.STRING_TYPE_INFO)) {
fieldType =
Types.primitive(PrimitiveType.PrimitiveTypeName.BINARY, repetition)
.as(OriginalType.UTF8)
.named(fieldName);
}
} else if (typeInfo instanceof MapTypeInfo) {
MapTypeInfo mapTypeInfo = (MapTypeInfo) typeInfo;
if (mapTypeInfo.getKeyTypeInfo().equals(BasicTypeInfo.STRING_TYPE_INFO)) {
fieldType =
Types.map(repetition)
.value(
convertField(
MAP_VALUE,
mapTypeInfo.getValueTypeInfo(),
Type.Repetition.OPTIONAL,
legacyMode))
.named(fieldName);
} else {
throw new UnsupportedOperationException(
String.format(
"Can not convert Flink MapTypeInfo %s to Parquet"
+ " Map type as key has to be String",
typeInfo));
}
} else if (typeInfo instanceof ObjectArrayTypeInfo) {
ObjectArrayTypeInfo objectArrayTypeInfo = (ObjectArrayTypeInfo) typeInfo;
// Get all required sub fields
GroupType componentGroup =
(GroupType)
convertField(
LIST_ELEMENT,
objectArrayTypeInfo.getComponentInfo(),
Type.Repetition.REQUIRED,
legacyMode);
GroupType elementGroup = Types.repeatedGroup().named(LIST_ELEMENT);
elementGroup = elementGroup.withNewFields(componentGroup.getFields());
fieldType =
Types.buildGroup(repetition)
.addField(elementGroup)
.as(OriginalType.LIST)
.named(fieldName);
} else if (typeInfo instanceof BasicArrayTypeInfo) {
BasicArrayTypeInfo basicArrayType = (BasicArrayTypeInfo) typeInfo;
if (legacyMode) {
// Add extra layer of Group according to Parquet's standard
Type listGroup =
Types.repeatedGroup()
.addField(
convertField(
LIST_ELEMENT,
basicArrayType.getComponentInfo(),
Type.Repetition.REQUIRED,
legacyMode))
.named(LIST_GROUP_NAME);
fieldType =
Types.buildGroup(repetition)
.addField(listGroup)
.as(OriginalType.LIST)
.named(fieldName);
} else {
PrimitiveType primitiveTyp =
convertField(
fieldName,
basicArrayType.getComponentInfo(),
Type.Repetition.REQUIRED,
legacyMode)
.asPrimitiveType();
fieldType =
Types.buildGroup(repetition)
.repeated(primitiveTyp.getPrimitiveTypeName())
.as(primitiveTyp.getOriginalType())
.named(LIST_ARRAY_TYPE)
.as(OriginalType.LIST)
.named(fieldName);
}
} else if (typeInfo instanceof SqlTimeTypeInfo) {
if (typeInfo.equals(SqlTimeTypeInfo.DATE)) {
fieldType =
Types.primitive(PrimitiveType.PrimitiveTypeName.INT32, repetition)
.as(OriginalType.DATE)
.named(fieldName);
} else if (typeInfo.equals(SqlTimeTypeInfo.TIME)) {
fieldType =
Types.primitive(PrimitiveType.PrimitiveTypeName.INT32, repetition)
.as(OriginalType.TIME_MILLIS)
.named(fieldName);
} else if (typeInfo.equals(SqlTimeTypeInfo.TIMESTAMP)) {
fieldType =
Types.primitive(PrimitiveType.PrimitiveTypeName.INT64, repetition)
.as(OriginalType.TIMESTAMP_MILLIS)
.named(fieldName);
} else {
throw new UnsupportedOperationException(
"Unsupported SqlTimeTypeInfo " + typeInfo.toString());
}
} else {
RowTypeInfo rowTypeInfo = (RowTypeInfo) typeInfo;
List<Type> types = new ArrayList<>();
String[] fieldNames = rowTypeInfo.getFieldNames();
TypeInformation<?>[] fieldTypes = rowTypeInfo.getFieldTypes();
for (int i = 0; i < rowTypeInfo.getArity(); i++) {
types.add(convertField(fieldNames[i], fieldTypes[i], repetition, legacyMode));
}
if (fieldName == null) {
fieldType = new MessageType(MESSAGE_ROOT, types);
} else {
fieldType = new GroupType(repetition, fieldName, types);
}
}
return fieldType;
}
public static MessageType convertToParquetMessageType(String name, RowType rowType) {
Type[] types = new Type[rowType.getFieldCount()];
for (int i = 0; i < rowType.getFieldCount(); i++) {
String fieldName = rowType.getFieldNames().get(i);
LogicalType fieldType = rowType.getTypeAt(i);
types[i] = convertToParquetType(fieldName, fieldType, fieldType.isNullable() ? Type.Repetition.OPTIONAL : Type.Repetition.REQUIRED);
}
return new MessageType(name, types);
}
private static Type convertToParquetType(
String name, LogicalType type, Type.Repetition repetition) {
switch (type.getTypeRoot()) {
case CHAR:
case VARCHAR:
return Types.primitive(PrimitiveType.PrimitiveTypeName.BINARY, repetition)
.as(OriginalType.UTF8)
.named(name);
case BOOLEAN:
return Types.primitive(PrimitiveType.PrimitiveTypeName.BOOLEAN, repetition)
.named(name);
case BINARY:
case VARBINARY:
return Types.primitive(PrimitiveType.PrimitiveTypeName.BINARY, repetition)
.named(name);
case DECIMAL:
int precision = ((DecimalType) type).getPrecision();
int scale = ((DecimalType) type).getScale();
int numBytes = computeMinBytesForDecimalPrecision(precision);
return Types.primitive(
PrimitiveType.PrimitiveTypeName.FIXED_LEN_BYTE_ARRAY, repetition)
.as(LogicalTypeAnnotation.decimalType(scale, precision))
.length(numBytes)
.named(name);
case TINYINT:
return Types.primitive(PrimitiveType.PrimitiveTypeName.INT32, repetition)
.as(LogicalTypeAnnotation.intType(8, true))
.named(name);
case SMALLINT:
return Types.primitive(PrimitiveType.PrimitiveTypeName.INT32, repetition)
.as(LogicalTypeAnnotation.intType(16, true))
.named(name);
case INTEGER:
return Types.primitive(PrimitiveType.PrimitiveTypeName.INT32, repetition)
.named(name);
case BIGINT:
return Types.primitive(PrimitiveType.PrimitiveTypeName.INT64, repetition)
.named(name);
case FLOAT:
return Types.primitive(PrimitiveType.PrimitiveTypeName.FLOAT, repetition)
.named(name);
case DOUBLE:
return Types.primitive(PrimitiveType.PrimitiveTypeName.DOUBLE, repetition)
.named(name);
case DATE:
return Types.primitive(PrimitiveType.PrimitiveTypeName.INT32, repetition)
.as(LogicalTypeAnnotation.dateType())
.named(name);
case TIME_WITHOUT_TIME_ZONE:
return Types.primitive(PrimitiveType.PrimitiveTypeName.INT32, repetition)
.as(LogicalTypeAnnotation.timeType(true, TimeUnit.MILLIS))
.named(name);
case TIMESTAMP_WITHOUT_TIME_ZONE:
TimestampType timestampType = (TimestampType) type;
if (timestampType.getPrecision() == 3 || timestampType.getPrecision() == 6) {
TimeUnit timeunit = timestampType.getPrecision() == 3 ? TimeUnit.MILLIS : TimeUnit.MICROS;
return Types.primitive(PrimitiveType.PrimitiveTypeName.INT64, repetition)
.as(LogicalTypeAnnotation.timestampType(true, timeunit))
.named(name);
} else {
return Types.primitive(PrimitiveType.PrimitiveTypeName.INT96, repetition)
.named(name);
}
case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
LocalZonedTimestampType localZonedTimestampType = (LocalZonedTimestampType) type;
if (localZonedTimestampType.getPrecision() == 3 || localZonedTimestampType.getPrecision() == 6) {
TimeUnit timeunit = localZonedTimestampType.getPrecision() == 3 ? TimeUnit.MILLIS : TimeUnit.MICROS;
return Types.primitive(PrimitiveType.PrimitiveTypeName.INT64, repetition)
.as(LogicalTypeAnnotation.timestampType(false, timeunit))
.named(name);
} else {
return Types.primitive(PrimitiveType.PrimitiveTypeName.INT96, repetition)
.named(name);
}
case ARRAY:
// <list-repetition> group <name> (LIST) {
// repeated group list {
// <element-repetition> <element-type> element;
// }
// }
ArrayType arrayType = (ArrayType) type;
LogicalType elementType = arrayType.getElementType();
return Types
.buildGroup(repetition).as(OriginalType.LIST)
.addField(
Types.repeatedGroup()
.addField(convertToParquetType("element", elementType, repetition))
.named("list"))
.named(name);
case MAP:
// <map-repetition> group <name> (MAP) {
// repeated group key_value {
// required <key-type> key;
// <value-repetition> <value-type> value;
// }
// }
MapType mapType = (MapType) type;
LogicalType keyType = mapType.getKeyType();
LogicalType valueType = mapType.getValueType();
return Types
.buildGroup(repetition).as(OriginalType.MAP)
.addField(
Types
.repeatedGroup()
.addField(convertToParquetType("key", keyType, Type.Repetition.REQUIRED))
.addField(convertToParquetType("value", valueType, repetition))
.named("key_value"))
.named(name);
case ROW:
RowType rowType = (RowType) type;
Types.GroupBuilder<GroupType> builder = Types.buildGroup(repetition);
rowType.getFields().forEach(field -> builder.addField(convertToParquetType(field.getName(), field.getType(), repetition)));
return builder.named(name);
default:
throw new UnsupportedOperationException("Unsupported type: " + type);
}
}
public static int computeMinBytesForDecimalPrecision(int precision) {
int numBytes = 1;
while (Math.pow(2.0, 8 * numBytes - 1) < Math.pow(10.0, precision)) {
numBytes += 1;
}
return numBytes;
}
}