blob: 6003e19e9b82977b4350562a050a0cb90c989b46 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.avro;
import org.apache.avro.Conversions;
import org.apache.avro.JsonProperties;
import org.apache.avro.LogicalType;
import org.apache.avro.LogicalTypes;
import org.apache.avro.Schema;
import org.apache.avro.Schema.Field;
import org.apache.avro.Schema.Type;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericData.Array;
import org.apache.avro.generic.GenericFixed;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.specific.SpecificRecord;
import org.apache.avro.util.Utf8;
import org.apache.commons.compress.utils.IOUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.tuple.ImmutablePair;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.nifi.serialization.SimpleRecordSchema;
import org.apache.nifi.serialization.record.DataType;
import org.apache.nifi.serialization.record.MapRecord;
import org.apache.nifi.serialization.record.Record;
import org.apache.nifi.serialization.record.RecordField;
import org.apache.nifi.serialization.record.RecordFieldType;
import org.apache.nifi.serialization.record.RecordSchema;
import org.apache.nifi.serialization.record.SchemaIdentifier;
import org.apache.nifi.serialization.record.StandardSchemaIdentifier;
import org.apache.nifi.serialization.record.type.ArrayDataType;
import org.apache.nifi.serialization.record.type.ChoiceDataType;
import org.apache.nifi.serialization.record.type.DecimalDataType;
import org.apache.nifi.serialization.record.type.MapDataType;
import org.apache.nifi.serialization.record.type.RecordDataType;
import org.apache.nifi.serialization.record.util.DataTypeUtils;
import org.apache.nifi.serialization.record.util.IllegalTypeConversionException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.math.BigDecimal;
import java.nio.ByteBuffer;
import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
import java.sql.Blob;
import java.sql.Time;
import java.sql.Timestamp;
import java.time.Duration;
import java.time.Instant;
import java.time.temporal.ChronoUnit;
import java.util.AbstractMap;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Date;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
public class AvroTypeUtil {
private static final Logger logger = LoggerFactory.getLogger(AvroTypeUtil.class);
public static final String AVRO_SCHEMA_FORMAT = "avro";
private static final String LOGICAL_TYPE_DATE = "date";
private static final String LOGICAL_TYPE_TIME_MILLIS = "time-millis";
private static final String LOGICAL_TYPE_TIME_MICROS = "time-micros";
private static final String LOGICAL_TYPE_TIMESTAMP_MILLIS = "timestamp-millis";
private static final String LOGICAL_TYPE_TIMESTAMP_MICROS = "timestamp-micros";
private static final String LOGICAL_TYPE_DECIMAL = "decimal";
public static Schema extractAvroSchema(final RecordSchema recordSchema) {
if (recordSchema == null) {
throw new IllegalArgumentException("RecordSchema cannot be null");
}
final Optional<String> schemaFormatOption = recordSchema.getSchemaFormat();
if (!schemaFormatOption.isPresent()) {
return buildAvroSchema(recordSchema);
}
final String schemaFormat = schemaFormatOption.get();
if (!schemaFormat.equals(AVRO_SCHEMA_FORMAT)) {
return buildAvroSchema(recordSchema);
}
final Optional<String> textOption = recordSchema.getSchemaText();
if (!textOption.isPresent()) {
return buildAvroSchema(recordSchema);
}
final String text = textOption.get();
return new Schema.Parser().parse(text);
}
private static Schema buildAvroSchema(final RecordSchema recordSchema) {
final List<Field> avroFields = new ArrayList<>(recordSchema.getFieldCount());
for (final RecordField recordField : recordSchema.getFields()) {
avroFields.add(buildAvroField(recordField, ""));
}
final Schema avroSchema = Schema.createRecord("nifiRecord", null, "org.apache.nifi", false, avroFields);
return avroSchema;
}
private static Field buildAvroField(final RecordField recordField, String fieldNamePrefix) {
final Schema schema = buildAvroSchema(recordField.getDataType(), recordField.getFieldName(), fieldNamePrefix, recordField.isNullable());
final Field field;
final String recordFieldName = recordField.getFieldName();
if (isValidAvroFieldName(recordFieldName)) {
field = new Field(recordField.getFieldName(), schema, null, recordField.getDefaultValue());
} else {
final String validName = createValidAvroFieldName(recordField.getFieldName());
field = new Field(validName, schema, null, recordField.getDefaultValue());
field.addAlias(recordField.getFieldName());
}
for (final String alias : recordField.getAliases()) {
field.addAlias(alias);
}
return field;
}
private static boolean isValidAvroFieldName(final String fieldName) {
// Avro field names must match the following criteria:
// 1. Must be non-empty
// 2. Must begin with a letter or an underscore
// 3. Must consist only of letters, underscores, and numbers.
if (fieldName.isEmpty()) {
return false;
}
final char firstChar = fieldName.charAt(0);
if (firstChar != '_' && !Character.isLetter(firstChar)) {
return false;
}
for (int i=1; i < fieldName.length(); i++) {
final char c = fieldName.charAt(i);
if (c != '_' && !Character.isLetterOrDigit(c)) {
return false;
}
}
return true;
}
private static String createValidAvroFieldName(final String fieldName) {
if (fieldName.isEmpty()) {
return "UNNAMED_FIELD";
}
final StringBuilder sb = new StringBuilder();
final char firstChar = fieldName.charAt(0);
if (firstChar == '_' || Character.isLetter(firstChar)) {
sb.append(firstChar);
} else {
sb.append("_");
}
for (int i=1; i < fieldName.length(); i++) {
final char c = fieldName.charAt(i);
if (c == '_' || Character.isLetterOrDigit(c)) {
sb.append(c);
} else {
sb.append("_");
}
}
return sb.toString();
}
private static Schema buildAvroSchema(final DataType dataType, final String fieldName, String fieldNamePrefix, final boolean nullable) {
final Schema schema;
switch (dataType.getFieldType()) {
case ARRAY:
final ArrayDataType arrayDataType = (ArrayDataType) dataType;
final DataType elementDataType = arrayDataType.getElementType();
if (RecordFieldType.BYTE.equals(elementDataType.getFieldType())) {
schema = Schema.create(Type.BYTES);
} else {
final Schema elementType = buildAvroSchema(elementDataType, fieldName, fieldNamePrefix, false);
schema = Schema.createArray(elementType);
}
break;
case BIGINT:
schema = Schema.create(Type.STRING);
break;
case BOOLEAN:
schema = Schema.create(Type.BOOLEAN);
break;
case BYTE:
schema = Schema.create(Type.INT);
break;
case CHAR:
schema = Schema.create(Type.STRING);
break;
case CHOICE:
final ChoiceDataType choiceDataType = (ChoiceDataType) dataType;
final List<DataType> options = choiceDataType.getPossibleSubTypes();
// We need to keep track of which types have been added to the union, because if we have
// two elements in the UNION with the same type, it will fail - even if the logical type is
// different. So if we have an int and a logical type date (which also has a 'concrete type' of int)
// then an Exception will be thrown when we try to create the union. To avoid this, we just keep track
// of the Types and avoid adding it in such a case.
final List<Schema> unionTypes = new ArrayList<>(options.size());
final Set<Type> typesAdded = new HashSet<>();
int optionCounter = 1;
for (final DataType option : options) {
final Schema optionSchema = buildAvroSchema(option, fieldName, fieldNamePrefix, false);
if (!typesAdded.contains(optionSchema.getType())) {
unionTypes.add(optionSchema);
typesAdded.add(optionSchema.getType());
} else if (Type.RECORD.equals(optionSchema.getType()) && !unionTypes.contains(optionSchema)) {
final Schema indexedOptionSchema = buildAvroSchema(option, fieldName + ++optionCounter, fieldNamePrefix, false);
unionTypes.add(indexedOptionSchema);
}
}
schema = Schema.createUnion(unionTypes);
break;
case DATE:
schema = Schema.create(Type.INT);
LogicalTypes.date().addToSchema(schema);
break;
case DOUBLE:
schema = Schema.create(Type.DOUBLE);
break;
case FLOAT:
schema = Schema.create(Type.FLOAT);
break;
case INT:
schema = Schema.create(Type.INT);
break;
case LONG:
schema = Schema.create(Type.LONG);
break;
case DECIMAL:
final DecimalDataType decimalDataType = (DecimalDataType) dataType;
schema = Schema.create(Type.BYTES);
LogicalTypes.decimal(decimalDataType.getPrecision(), decimalDataType.getScale()).addToSchema(schema);
break;
case MAP:
schema = Schema.createMap(buildAvroSchema(((MapDataType) dataType).getValueType(), fieldName, fieldNamePrefix, false));
break;
case RECORD:
final RecordDataType recordDataType = (RecordDataType) dataType;
final RecordSchema childSchema = recordDataType.getChildSchema();
final List<Field> childFields = new ArrayList<>(childSchema.getFieldCount());
for (final RecordField field : childSchema.getFields()) {
String childFieldNamePrefix = StringUtils.isBlank(fieldNamePrefix) ? fieldName + "_" : fieldNamePrefix + fieldName + "_";
childFields.add(buildAvroField(field, childFieldNamePrefix));
}
schema = Schema.createRecord(fieldNamePrefix + fieldName + "Type", null, "org.apache.nifi", false, childFields);
break;
case SHORT:
schema = Schema.create(Type.INT);
break;
case STRING:
schema = Schema.create(Type.STRING);
break;
case TIME:
schema = Schema.create(Type.INT);
LogicalTypes.timeMillis().addToSchema(schema);
break;
case TIMESTAMP:
schema = Schema.create(Type.LONG);
LogicalTypes.timestampMillis().addToSchema(schema);
break;
default:
return null;
}
if (nullable) {
return nullable(schema);
} else {
return schema;
}
}
private static Schema nullable(final Schema schema) {
if (schema.getType() == Type.UNION) {
final List<Schema> unionTypes = new ArrayList<>(schema.getTypes());
final Schema nullSchema = Schema.create(Type.NULL);
if (unionTypes.contains(nullSchema)) {
return schema;
}
unionTypes.add(nullSchema);
return Schema.createUnion(unionTypes);
}
return Schema.createUnion(Schema.create(Type.NULL), schema);
}
/**
* Returns a DataType for the given Avro Schema
*
* @param avroSchema the Avro Schema to convert
* @return a Data Type that corresponds to the given Avro Schema
*/
public static DataType determineDataType(final Schema avroSchema) {
return determineDataType(avroSchema, new HashMap<>());
}
public static DataType determineDataType(final Schema avroSchema, Map<String, DataType> knownRecordTypes) {
if (knownRecordTypes == null) {
throw new IllegalArgumentException("'knownRecordTypes' cannot be null.");
}
final Type avroType = avroSchema.getType();
final LogicalType logicalType = avroSchema.getLogicalType();
if (logicalType != null) {
final String logicalTypeName = logicalType.getName();
switch (logicalTypeName) {
case LOGICAL_TYPE_DATE:
return RecordFieldType.DATE.getDataType();
case LOGICAL_TYPE_TIME_MILLIS:
case LOGICAL_TYPE_TIME_MICROS:
return RecordFieldType.TIME.getDataType();
case LOGICAL_TYPE_TIMESTAMP_MILLIS:
case LOGICAL_TYPE_TIMESTAMP_MICROS:
return RecordFieldType.TIMESTAMP.getDataType();
case LOGICAL_TYPE_DECIMAL:
final LogicalTypes.Decimal decimal = (LogicalTypes.Decimal) logicalType;
return RecordFieldType.DECIMAL.getDecimalDataType(decimal.getPrecision(), decimal.getScale());
}
}
switch (avroType) {
case ARRAY:
final DataType elementType = determineDataType(avroSchema.getElementType(), knownRecordTypes);
final boolean elementsNullable = isNullable(avroSchema.getElementType());
return RecordFieldType.ARRAY.getArrayDataType(elementType, elementsNullable);
case BYTES:
case FIXED:
return RecordFieldType.ARRAY.getArrayDataType(RecordFieldType.BYTE.getDataType());
case BOOLEAN:
return RecordFieldType.BOOLEAN.getDataType();
case DOUBLE:
return RecordFieldType.DOUBLE.getDataType();
case ENUM:
return RecordFieldType.ENUM.getEnumDataType(avroSchema.getEnumSymbols());
case STRING:
return RecordFieldType.STRING.getDataType();
case FLOAT:
return RecordFieldType.FLOAT.getDataType();
case INT:
return RecordFieldType.INT.getDataType();
case LONG:
return RecordFieldType.LONG.getDataType();
case RECORD: {
String schemaFullName = avroSchema.getNamespace() + "." + avroSchema.getName();
if (knownRecordTypes.containsKey(schemaFullName)) {
return knownRecordTypes.get(schemaFullName);
} else {
SimpleRecordSchema recordSchema = new SimpleRecordSchema(SchemaIdentifier.EMPTY);
recordSchema.setSchemaName(avroSchema.getName());
recordSchema.setSchemaNamespace(avroSchema.getNamespace());
DataType recordSchemaType = RecordFieldType.RECORD.getRecordDataType(recordSchema);
knownRecordTypes.put(schemaFullName, recordSchemaType);
final List<Field> avroFields = avroSchema.getFields();
final List<RecordField> recordFields = new ArrayList<>(avroFields.size());
for (final Field field : avroFields) {
final String fieldName = field.name();
final Schema fieldSchema = field.schema();
final DataType fieldType = determineDataType(fieldSchema, knownRecordTypes);
final boolean nullable = isNullable(fieldSchema);
addFieldToList(recordFields, field, fieldName, fieldSchema, fieldType, nullable);
}
recordSchema.setFields(recordFields);
return recordSchemaType;
}
}
case NULL:
return RecordFieldType.STRING.getDataType();
case MAP:
final Schema valueSchema = avroSchema.getValueType();
final DataType valueType = determineDataType(valueSchema, knownRecordTypes);
final boolean valuesNullable = isNullable(valueSchema);
return RecordFieldType.MAP.getMapDataType(valueType, valuesNullable);
case UNION: {
final List<Schema> nonNullSubSchemas = getNonNullSubSchemas(avroSchema);
if (nonNullSubSchemas.size() == 1) {
return determineDataType(nonNullSubSchemas.get(0), knownRecordTypes);
}
final List<DataType> possibleChildTypes = new ArrayList<>(nonNullSubSchemas.size());
for (final Schema subSchema : nonNullSubSchemas) {
final DataType childDataType = determineDataType(subSchema, knownRecordTypes);
possibleChildTypes.add(childDataType);
}
return RecordFieldType.CHOICE.getChoiceDataType(possibleChildTypes);
}
}
return null;
}
private static List<Schema> getNonNullSubSchemas(final Schema avroSchema) {
final List<Schema> unionFieldSchemas = avroSchema.getTypes();
if (unionFieldSchemas == null) {
return Collections.emptyList();
}
final List<Schema> nonNullTypes = new ArrayList<>(unionFieldSchemas.size());
for (final Schema fieldSchema : unionFieldSchemas) {
if (fieldSchema.getType() != Type.NULL) {
nonNullTypes.add(fieldSchema);
}
}
return nonNullTypes;
}
public static RecordSchema createSchema(final Schema avroSchema) {
return createSchema(avroSchema, true);
}
public static RecordSchema createSchema(final Schema avroSchema, final boolean includeText) {
if (avroSchema == null) {
throw new IllegalArgumentException("Avro Schema cannot be null");
}
SchemaIdentifier identifier = new StandardSchemaIdentifier.Builder().name(avroSchema.getName()).build();
return createSchema(avroSchema, includeText ? avroSchema.toString() : null, identifier);
}
/**
* Converts an Avro Schema to a RecordSchema
*
* @param avroSchema the Avro Schema to convert
* @param schemaText the textual representation of the schema
* @param schemaId the identifier of the schema
* @return the Corresponding Record Schema
*/
public static RecordSchema createSchema(final Schema avroSchema, final String schemaText, final SchemaIdentifier schemaId) {
if (avroSchema == null) {
throw new IllegalArgumentException("Avro Schema cannot be null");
}
final String schemaFullName = avroSchema.getNamespace() + "." + avroSchema.getName();
final SimpleRecordSchema recordSchema = schemaText == null ? new SimpleRecordSchema(schemaId) : new SimpleRecordSchema(schemaText, AVRO_SCHEMA_FORMAT, schemaId);
recordSchema.setSchemaName(avroSchema.getName());
recordSchema.setSchemaNamespace(avroSchema.getNamespace());
final DataType recordSchemaType = RecordFieldType.RECORD.getRecordDataType(recordSchema);
final Map<String, DataType> knownRecords = new HashMap<>();
knownRecords.put(schemaFullName, recordSchemaType);
final List<RecordField> recordFields = new ArrayList<>(avroSchema.getFields().size());
for (final Field field : avroSchema.getFields()) {
final String fieldName = field.name();
final Schema fieldSchema = field.schema();
final DataType dataType = AvroTypeUtil.determineDataType(fieldSchema, knownRecords);
final boolean nullable = isNullable(fieldSchema);
addFieldToList(recordFields, field, fieldName, fieldSchema, dataType, nullable);
}
recordSchema.setFields(recordFields);
return recordSchema;
}
public static boolean isNullable(final Schema schema) {
final Type schemaType = schema.getType();
if (schemaType == Type.UNION) {
for (final Schema unionSchema : schema.getTypes()) {
if (isNullable(unionSchema)) {
return true;
}
}
}
return schemaType == Type.NULL;
}
public static Object[] convertByteArray(final byte[] bytes) {
final Object[] array = new Object[bytes.length];
for (int i = 0; i < bytes.length; i++) {
array[i] = Byte.valueOf(bytes[i]);
}
return array;
}
public static ByteBuffer convertByteArray(final Object[] bytes) {
final ByteBuffer bb = ByteBuffer.allocate(bytes.length);
for (final Object o : bytes) {
if (o instanceof Byte) {
bb.put(((Byte) o).byteValue());
} else {
throw new IllegalTypeConversionException("Cannot convert value " + bytes + " of type " + bytes.getClass() + " to ByteBuffer");
}
}
bb.flip();
return bb;
}
/**
* Method that attempts to map a record field into a provided schema
* @param avroSchema - Schema to map into
* @param recordField - The field of the record to be mapped
* @return Pair with the LHS being the field name and RHS being the mapped field from the schema
*/
protected static Pair<String, Field> lookupField(final Schema avroSchema, final RecordField recordField) {
String fieldName = recordField.getFieldName();
// Attempt to locate the field as is in a true 1:1 mapping with the same name
Field field = avroSchema.getField(fieldName);
if (field == null) {
// No straight mapping was found, so check the aliases to see if it can be mapped
for (final String alias : recordField.getAliases()) {
field = avroSchema.getField(alias);
if (field != null) {
fieldName = alias;
break;
}
}
}
if (field == null) {
for (final Field childField : avroSchema.getFields()) {
final Set<String> aliases = childField.aliases();
if (aliases.isEmpty()) {
continue;
}
if (aliases.contains(fieldName)) {
field = childField;
break;
}
for (final String alias : recordField.getAliases()) {
if (aliases.contains(alias)) {
field = childField;
fieldName = alias;
break;
}
}
}
}
return new ImmutablePair<>(fieldName, field);
}
public static GenericRecord createAvroRecord(final Record record, final Schema avroSchema) throws IOException {
return createAvroRecord(record, avroSchema, StandardCharsets.UTF_8);
}
public static GenericRecord createAvroRecord(final Record record, final Schema avroSchema, final Charset charset) throws IOException {
final GenericRecord rec = new GenericData.Record(avroSchema);
final RecordSchema recordSchema = record.getSchema();
final Map<String, Object> recordValues = record.toMap();
for (final Map.Entry<String, Object> entry : recordValues.entrySet()) {
final Object rawValue = entry.getValue();
if (rawValue == null) {
continue;
}
final String rawFieldName = entry.getKey();
final Optional<RecordField> optionalRecordField = recordSchema.getField(rawFieldName);
if (!optionalRecordField.isPresent()) {
continue;
}
final RecordField recordField = optionalRecordField.get();
final Field field;
final Field avroField = avroSchema.getField(rawFieldName);
if (avroField == null) {
final Pair<String, Field> fieldPair = lookupField(avroSchema, recordField);
field = fieldPair.getRight();
if (field == null) {
continue;
}
} else {
field = avroField;
}
final String fieldName = field.name();
final Object converted = convertToAvroObject(rawValue, field.schema(), fieldName, charset);
rec.put(fieldName, converted);
}
return rec;
}
/**
* Convert a raw value to an Avro object to serialize in Avro type system, using the provided character set when necessary.
* The counter-part method which reads an Avro object back to a raw value is {@link #normalizeValue(Object, Schema, String)}.
*/
public static Object convertToAvroObject(final Object rawValue, final Schema fieldSchema) {
return convertToAvroObject(rawValue, fieldSchema, StandardCharsets.UTF_8);
}
/**
* Convert a raw value to an Avro object to serialize in Avro type system, using the provided character set when necessary.
* The counter-part method which reads an Avro object back to a raw value is {@link #normalizeValue(Object, Schema, String)}.
*/
public static Object convertToAvroObject(final Object rawValue, final Schema fieldSchema, final Charset charset) {
return convertToAvroObject(rawValue, fieldSchema, fieldSchema.getName(), charset);
}
/**
* Adds fields to <tt>recordFields</tt> list.
* @param recordFields - record fields are added to this list.
* @param field - the field
* @param fieldName - field name
* @param fieldSchema - field schema
* @param dataType - data type
* @param nullable - is nullable?
*/
private static void addFieldToList(final List<RecordField> recordFields, final Field field, final String fieldName,
final Schema fieldSchema, final DataType dataType, final boolean nullable) {
if (field.defaultVal() == JsonProperties.NULL_VALUE) {
recordFields.add(new RecordField(fieldName, dataType, field.aliases(), nullable));
} else {
Object defaultValue = field.defaultVal();
if (defaultValue != null && fieldSchema.getType() == Schema.Type.ARRAY && !DataTypeUtils.isArrayTypeCompatible(defaultValue, ((ArrayDataType) dataType).getElementType())) {
defaultValue = defaultValue instanceof List ? ((List<?>) defaultValue).toArray() : new Object[0];
}
recordFields.add(new RecordField(fieldName, dataType, defaultValue, field.aliases(), nullable));
}
}
private static Long getLongFromTimestamp(final Object rawValue, final Schema fieldSchema, final String fieldName) {
final String format = AvroTypeUtil.determineDataType(fieldSchema).getFormat();
Timestamp t = DataTypeUtils.toTimestamp(rawValue, () -> DataTypeUtils.getDateFormat(format), fieldName);
return t.getTime();
}
@SuppressWarnings("unchecked")
private static Object convertToAvroObject(final Object rawValue, final Schema fieldSchema, final String fieldName, final Charset charset) {
if (rawValue == null) {
return null;
}
switch (fieldSchema.getType()) {
case INT: {
final LogicalType logicalType = fieldSchema.getLogicalType();
if (logicalType == null) {
return DataTypeUtils.toInteger(rawValue, fieldName);
}
if (LOGICAL_TYPE_DATE.equals(logicalType.getName())) {
final String format = AvroTypeUtil.determineDataType(fieldSchema).getFormat();
final java.sql.Date date = DataTypeUtils.toDate(rawValue, () -> DataTypeUtils.getDateFormat(format), fieldName);
return (int) ChronoUnit.DAYS.between(Instant.EPOCH, Instant.ofEpochMilli(date.getTime()));
} else if (LOGICAL_TYPE_TIME_MILLIS.equals(logicalType.getName())) {
final String format = AvroTypeUtil.determineDataType(fieldSchema).getFormat();
final Time time = DataTypeUtils.toTime(rawValue, () -> DataTypeUtils.getDateFormat(format), fieldName);
final Date date = new Date(time.getTime());
final Duration duration = Duration.between(date.toInstant().truncatedTo(ChronoUnit.DAYS), date.toInstant());
final long millisSinceMidnight = duration.toMillis();
return (int) millisSinceMidnight;
}
return DataTypeUtils.toInteger(rawValue, fieldName);
}
case LONG: {
final LogicalType logicalType = fieldSchema.getLogicalType();
if (logicalType == null) {
return DataTypeUtils.toLong(rawValue, fieldName);
}
if (LOGICAL_TYPE_TIME_MICROS.equals(logicalType.getName())) {
final long longValue = getLongFromTimestamp(rawValue, fieldSchema, fieldName);
final Date date = new Date(longValue);
final Duration duration = Duration.between(date.toInstant().truncatedTo(ChronoUnit.DAYS), date.toInstant());
return duration.toMillis() * 1000L;
} else if (LOGICAL_TYPE_TIMESTAMP_MILLIS.equals(logicalType.getName())) {
return getLongFromTimestamp(rawValue, fieldSchema, fieldName);
} else if (LOGICAL_TYPE_TIMESTAMP_MICROS.equals(logicalType.getName())) {
return getLongFromTimestamp(rawValue, fieldSchema, fieldName) * 1000L;
}
return DataTypeUtils.toLong(rawValue, fieldName);
}
case BYTES:
case FIXED:
final LogicalType logicalType = fieldSchema.getLogicalType();
if (logicalType != null && LOGICAL_TYPE_DECIMAL.equals(logicalType.getName())) {
final LogicalTypes.Decimal decimalType = (LogicalTypes.Decimal) logicalType;
final BigDecimal rawDecimal;
if (rawValue instanceof BigDecimal) {
rawDecimal = (BigDecimal) rawValue;
} else if (rawValue instanceof Double) {
rawDecimal = BigDecimal.valueOf((Double) rawValue);
} else if (rawValue instanceof String) {
rawDecimal = new BigDecimal((String) rawValue);
} else if (rawValue instanceof Integer) {
rawDecimal = new BigDecimal((Integer) rawValue);
} else if (rawValue instanceof Long) {
rawDecimal = new BigDecimal((Long) rawValue);
} else {
throw new IllegalTypeConversionException("Cannot convert value " + rawValue + " of type " + rawValue.getClass() + " to a logical decimal");
}
// If the desired scale is different than this value's coerce scale.
final int desiredScale = decimalType.getScale();
final BigDecimal decimal = rawDecimal.scale() == desiredScale
? rawDecimal : rawDecimal.setScale(desiredScale, BigDecimal.ROUND_HALF_UP);
return fieldSchema.getType() == Type.BYTES
? new Conversions.DecimalConversion().toBytes(decimal, fieldSchema, logicalType) //return GenericByte
: new Conversions.DecimalConversion().toFixed(decimal, fieldSchema, logicalType); //return GenericFixed
}
if (rawValue instanceof byte[]) {
return ByteBuffer.wrap((byte[]) rawValue);
}
if (rawValue instanceof String) {
return ByteBuffer.wrap(((String) rawValue).getBytes(charset));
}
if (rawValue instanceof Object[]) {
if (fieldSchema.getType() == Type.FIXED && "INT96".equals(fieldSchema.getName())) {
Object[] rawObjects = (Object[]) rawValue;
byte[] rawBytes = new byte[rawObjects.length];
for (int elementIndex = 0; elementIndex < rawObjects.length; elementIndex++) {
rawBytes[elementIndex] = (Byte) rawObjects[elementIndex];
}
return new GenericData.Fixed(fieldSchema, rawBytes);
} else {
return AvroTypeUtil.convertByteArray((Object[]) rawValue);
}
}
try {
if (rawValue instanceof Blob) {
Blob blob = (Blob) rawValue;
return ByteBuffer.wrap(IOUtils.toByteArray(blob.getBinaryStream()));
} else {
throw new IllegalTypeConversionException("Cannot convert value " + rawValue + " of type " + rawValue.getClass() + " to a ByteBuffer");
}
} catch (IllegalTypeConversionException itce) {
throw itce;
} catch (Exception e) {
throw new IllegalTypeConversionException("Cannot convert value " + rawValue + " of type " + rawValue.getClass() + " to a ByteBuffer", e);
}
case MAP:
if (rawValue instanceof Record) {
final Record recordValue = (Record) rawValue;
final Map<String, Object> map = new HashMap<>();
for (final RecordField recordField : recordValue.getSchema().getFields()) {
final Object v = recordValue.getValue(recordField);
if (v != null) {
map.put(recordField.getFieldName(), convertToAvroObject(v, fieldSchema.getValueType(), fieldName + "[" + recordField.getFieldName() + "]", charset));
}
}
return map;
} else if (rawValue instanceof Map) {
final Map<String, Object> objectMap = (Map<String, Object>) rawValue;
final Map<String, Object> map = new HashMap<>(objectMap.size());
for (final String s : objectMap.keySet()) {
final Object converted = convertToAvroObject(objectMap.get(s), fieldSchema.getValueType(), fieldName + "[" + s + "]", charset);
map.put(s, converted);
}
return map;
} else {
throw new IllegalTypeConversionException("Cannot convert value " + rawValue + " of type " + rawValue.getClass() + " to a Map");
}
case RECORD:
final GenericData.Record avroRecord = new GenericData.Record(fieldSchema);
final Set<Map.Entry<String, Object>> entries;
if (rawValue instanceof Map) {
final Map<String, Object> map = (Map<String, Object>) rawValue;
entries = map.entrySet();
} else if (rawValue instanceof Record) {
entries = new HashSet<>();
final Record record = (Record) rawValue;
record.getSchema().getFields().forEach(field -> entries.add(new AbstractMap.SimpleEntry<>(field.getFieldName(), record.getValue(field))));
} else {
throw new IllegalTypeConversionException("Cannot convert value " + rawValue + " of type " + rawValue.getClass() + " to a Record");
}
for (final Map.Entry<String, Object> e : entries) {
final Object recordFieldValue = e.getValue();
final String recordFieldName = e.getKey();
final Field field = fieldSchema.getField(recordFieldName);
if (field == null) {
continue;
}
final Object converted = convertToAvroObject(recordFieldValue, field.schema(), fieldName + "/" + recordFieldName, charset);
avroRecord.put(recordFieldName, converted);
}
return avroRecord;
case UNION:
return convertUnionFieldValue(rawValue, fieldSchema, schema -> convertToAvroObject(rawValue, schema, fieldName, charset), fieldName);
case ARRAY:
final Object[] objectArray;
if (rawValue instanceof List) {
objectArray = ((List) rawValue).toArray();
} else if (rawValue instanceof Object[]) {
objectArray = (Object[]) rawValue;
} else {
throw new IllegalTypeConversionException("Cannot convert value " + rawValue + " of type " + rawValue.getClass() + " to an Array");
}
final List<Object> list = new ArrayList<>(objectArray.length);
int i = 0;
for (final Object o : objectArray) {
final Object converted = convertToAvroObject(o, fieldSchema.getElementType(), fieldName + "[" + i + "]", charset);
list.add(converted);
i++;
}
return list;
case BOOLEAN:
return DataTypeUtils.toBoolean(rawValue, fieldName);
case DOUBLE:
return DataTypeUtils.toDouble(rawValue, fieldName);
case FLOAT:
return DataTypeUtils.toFloat(rawValue, fieldName);
case NULL:
return null;
case ENUM:
List<String> enums = fieldSchema.getEnumSymbols();
if(enums != null && enums.contains(rawValue)) {
return new GenericData.EnumSymbol(fieldSchema, rawValue);
} else {
throw new IllegalTypeConversionException(rawValue + " is not a possible value of the ENUM" + enums + ".");
}
case STRING:
if (rawValue instanceof String) {
return rawValue;
}
return DataTypeUtils.toString(rawValue, (String) null, charset);
}
return rawValue;
}
public static Map<String, Object> convertAvroRecordToMap(final GenericRecord avroRecord, final RecordSchema recordSchema) {
return convertAvroRecordToMap(avroRecord, recordSchema, StandardCharsets.UTF_8);
}
public static Map<String, Object> convertAvroRecordToMap(final GenericRecord avroRecord, final RecordSchema recordSchema, final Charset charset) {
final Map<String, Object> values = new HashMap<>(recordSchema.getFieldCount());
for (final RecordField recordField : recordSchema.getFields()) {
Object value = avroRecord.get(recordField.getFieldName());
if (value == null) {
for (final String alias : recordField.getAliases()) {
value = avroRecord.get(alias);
if (value != null) {
break;
}
}
}
final String fieldName = recordField.getFieldName();
try {
final Field avroField = avroRecord.getSchema().getField(fieldName);
if (avroField == null) {
values.put(fieldName, null);
continue;
}
final Schema fieldSchema = avroField.schema();
final Object rawValue = normalizeValue(value, fieldSchema, fieldName);
final DataType desiredType = recordField.getDataType();
final Object coercedValue = DataTypeUtils.convertType(rawValue, desiredType, fieldName, charset);
values.put(fieldName, coercedValue);
} catch (Exception ex) {
logger.debug("fail to convert field " + fieldName, ex );
throw ex;
}
}
return values;
}
/**
* Convert value of a nullable union field.
* @param originalValue original value
* @param fieldSchema the union field schema
* @param conversion the conversion function which takes a non-null field schema within the union field and returns a converted value
* @return a converted value
*/
private static Object convertUnionFieldValue(final Object originalValue, final Schema fieldSchema, final Function<Schema, Object> conversion, final String fieldName) {
boolean foundNonNull = false;
// It is an extremely common case to have a UNION type because a field can be NULL or some other type. In this situation,
// we will have two possible types, and one of them will be null. When this happens, we can be much more efficient by simply
// determining the non-null type and converting to that.
final List<Schema> schemaTypes = fieldSchema.getTypes();
if (schemaTypes.size() == 2) {
final Schema firstSchema = schemaTypes.get(0);
final Schema secondSchema = schemaTypes.get(1);
if (firstSchema.getType() == Type.NULL) {
return conversion.apply(secondSchema);
}
if (secondSchema.getType() == Type.NULL) {
return conversion.apply(firstSchema);
}
}
final Optional<Schema> mostSuitableType = DataTypeUtils.findMostSuitableType(
originalValue,
getNonNullSubSchemas(fieldSchema),
AvroTypeUtil::determineDataType
);
if (mostSuitableType.isPresent()) {
return conversion.apply(mostSuitableType.get());
}
for (final Schema subSchema : fieldSchema.getTypes()) {
if (subSchema.getType() == Type.NULL) {
continue;
}
foundNonNull = true;
final DataType desiredDataType = AvroTypeUtil.determineDataType(subSchema);
try {
final Object convertedValue = conversion.apply(subSchema);
if (isCompatibleDataType(convertedValue, desiredDataType)) {
return convertedValue;
}
// For logical types those store with different type (e.g. BigDecimal as ByteBuffer), check compatibility using the original rawValue
if (subSchema.getLogicalType() != null && DataTypeUtils.isCompatibleDataType(originalValue, desiredDataType)) {
return convertedValue;
}
} catch (Exception e) {
// If failed with one of possible types, continue with the next available option.
if (logger.isDebugEnabled()) {
logger.debug("Cannot convert value {} to type {}", originalValue, desiredDataType, e);
}
}
}
if (foundNonNull) {
throw new IllegalTypeConversionException("Cannot convert value " + originalValue + " of type " + originalValue.getClass()
+ " because no compatible types exist in the UNION for field " + fieldName);
}
return null;
}
private static boolean isCompatibleDataType(final Object value, final DataType dataType) {
if (value == null) {
return false;
}
switch (dataType.getFieldType()) {
case RECORD:
if (value instanceof GenericRecord || value instanceof SpecificRecord) {
return true;
}
break;
case STRING:
if (value instanceof Utf8) {
return true;
}
break;
case ARRAY:
if (value instanceof Array || value instanceof List || value instanceof ByteBuffer) {
return true;
}
break;
case MAP:
if (value instanceof Map) {
return true;
}
}
return DataTypeUtils.isCompatibleDataType(value, dataType);
}
/**
* Convert an Avro object to a normal Java objects for further processing.
* The counter-part method which convert a raw value to an Avro object is {@link #convertToAvroObject(Object, Schema, String, Charset)}
*/
private static Object normalizeValue(final Object value, final Schema avroSchema, final String fieldName) {
if (value == null) {
return null;
}
switch (avroSchema.getType()) {
case INT: {
final LogicalType logicalType = avroSchema.getLogicalType();
if (logicalType == null) {
return value;
}
final String logicalName = logicalType.getName();
if (LOGICAL_TYPE_DATE.equals(logicalName)) {
// date logical name means that the value is number of days since Jan 1, 1970
return new java.sql.Date(TimeUnit.DAYS.toMillis((int) value));
} else if (LOGICAL_TYPE_TIME_MILLIS.equals(logicalName)) {
// time-millis logical name means that the value is number of milliseconds since midnight.
return new java.sql.Time((int) value);
}
break;
}
case LONG: {
final LogicalType logicalType = avroSchema.getLogicalType();
if (logicalType == null) {
return value;
}
final String logicalName = logicalType.getName();
if (LOGICAL_TYPE_TIME_MICROS.equals(logicalName)) {
return new java.sql.Time(TimeUnit.MICROSECONDS.toMillis((long) value));
} else if (LOGICAL_TYPE_TIMESTAMP_MILLIS.equals(logicalName)) {
return new java.sql.Timestamp((long) value);
} else if (LOGICAL_TYPE_TIMESTAMP_MICROS.equals(logicalName)) {
return new java.sql.Timestamp(TimeUnit.MICROSECONDS.toMillis((long) value));
}
break;
}
case UNION:
if (value instanceof GenericData.Record) {
final GenericData.Record avroRecord = (GenericData.Record) value;
return normalizeValue(value, avroRecord.getSchema(), fieldName);
}
return convertUnionFieldValue(value, avroSchema, schema -> normalizeValue(value, schema, fieldName), fieldName);
case RECORD:
final GenericData.Record record = (GenericData.Record) value;
final Schema recordSchema = record.getSchema();
final List<Field> recordFields = recordSchema.getFields();
final Map<String, Object> values = new HashMap<>(recordFields.size());
for (final Field field : recordFields) {
final Object avroFieldValue = record.get(field.name());
final Object fieldValue = normalizeValue(avroFieldValue, field.schema(), fieldName + "/" + field.name());
values.put(field.name(), fieldValue);
}
final RecordSchema childSchema = AvroTypeUtil.createSchema(recordSchema, false);
return new MapRecord(childSchema, values);
case BYTES:
final ByteBuffer bb = (ByteBuffer) value;
final LogicalType logicalType = avroSchema.getLogicalType();
if (logicalType != null && LOGICAL_TYPE_DECIMAL.equals(logicalType.getName())) {
return new Conversions.DecimalConversion().fromBytes(bb, avroSchema, logicalType);
}
return AvroTypeUtil.convertByteArray(bb.array());
case FIXED:
final GenericFixed fixed = (GenericFixed) value;
final LogicalType fixedLogicalType = avroSchema.getLogicalType();
if (fixedLogicalType != null && LOGICAL_TYPE_DECIMAL.equals(fixedLogicalType.getName())) {
final ByteBuffer fixedByteBuffer = ByteBuffer.wrap(fixed.bytes());
return new Conversions.DecimalConversion().fromBytes(fixedByteBuffer, avroSchema, fixedLogicalType);
}
return AvroTypeUtil.convertByteArray(fixed.bytes());
case ENUM:
return value.toString();
case NULL:
return null;
case STRING:
return value.toString();
case ARRAY:
if (value instanceof List) {
final List<?> list = (List<?>) value;
final Object[] valueArray = new Object[list.size()];
for (int i = 0; i < list.size(); i++) {
final Schema elementSchema = avroSchema.getElementType();
valueArray[i] = normalizeValue(list.get(i), elementSchema, fieldName + "[" + i + "]");
}
return valueArray;
} else {
final GenericData.Array<?> array = (GenericData.Array<?>) value;
final Object[] valueArray = new Object[array.size()];
for (int i = 0; i < array.size(); i++) {
final Schema elementSchema = avroSchema.getElementType();
valueArray[i] = normalizeValue(array.get(i), elementSchema, fieldName + "[" + i + "]");
}
return valueArray;
}
case MAP:
final Map<?, ?> avroMap = (Map<?, ?>) value;
final Map<String, Object> map = new HashMap<>(avroMap.size());
for (final Map.Entry<?, ?> entry : avroMap.entrySet()) {
Object obj = entry.getValue();
if (obj instanceof Utf8 || obj instanceof CharSequence) {
obj = obj.toString();
}
final String key = entry.getKey().toString();
obj = normalizeValue(obj, avroSchema.getValueType(), fieldName + "[" + key + "]");
map.put(key, obj);
}
return map;
}
return value;
}
}