| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.nifi.avro; |
| |
| import org.apache.avro.Conversions; |
| import org.apache.avro.JsonProperties; |
| import org.apache.avro.LogicalType; |
| import org.apache.avro.LogicalTypes; |
| import org.apache.avro.Schema; |
| import org.apache.avro.Schema.Field; |
| import org.apache.avro.Schema.Type; |
| import org.apache.avro.generic.GenericData; |
| import org.apache.avro.generic.GenericData.Array; |
| import org.apache.avro.generic.GenericFixed; |
| import org.apache.avro.generic.GenericRecord; |
| import org.apache.avro.specific.SpecificRecord; |
| import org.apache.avro.util.Utf8; |
| import org.apache.commons.compress.utils.IOUtils; |
| import org.apache.commons.lang3.StringUtils; |
| import org.apache.commons.lang3.tuple.ImmutablePair; |
| import org.apache.commons.lang3.tuple.Pair; |
| import org.apache.nifi.serialization.SimpleRecordSchema; |
| import org.apache.nifi.serialization.record.DataType; |
| import org.apache.nifi.serialization.record.MapRecord; |
| import org.apache.nifi.serialization.record.Record; |
| import org.apache.nifi.serialization.record.RecordField; |
| import org.apache.nifi.serialization.record.RecordFieldType; |
| import org.apache.nifi.serialization.record.RecordSchema; |
| import org.apache.nifi.serialization.record.SchemaIdentifier; |
| import org.apache.nifi.serialization.record.StandardSchemaIdentifier; |
| import org.apache.nifi.serialization.record.type.ArrayDataType; |
| import org.apache.nifi.serialization.record.type.ChoiceDataType; |
| import org.apache.nifi.serialization.record.type.DecimalDataType; |
| import org.apache.nifi.serialization.record.type.MapDataType; |
| import org.apache.nifi.serialization.record.type.RecordDataType; |
| import org.apache.nifi.serialization.record.util.DataTypeUtils; |
| import org.apache.nifi.serialization.record.util.IllegalTypeConversionException; |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| |
| import java.io.IOException; |
| import java.math.BigDecimal; |
| import java.nio.ByteBuffer; |
| import java.nio.charset.Charset; |
| import java.nio.charset.StandardCharsets; |
| import java.sql.Blob; |
| import java.sql.Time; |
| import java.sql.Timestamp; |
| import java.time.Duration; |
| import java.time.Instant; |
| import java.time.temporal.ChronoUnit; |
| import java.util.AbstractMap; |
| import java.util.ArrayList; |
| import java.util.Collections; |
| import java.util.Date; |
| import java.util.HashMap; |
| import java.util.HashSet; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.Optional; |
| import java.util.Set; |
| import java.util.concurrent.TimeUnit; |
| import java.util.function.Function; |
| |
| public class AvroTypeUtil { |
| private static final Logger logger = LoggerFactory.getLogger(AvroTypeUtil.class); |
| public static final String AVRO_SCHEMA_FORMAT = "avro"; |
| |
| private static final String LOGICAL_TYPE_DATE = "date"; |
| private static final String LOGICAL_TYPE_TIME_MILLIS = "time-millis"; |
| private static final String LOGICAL_TYPE_TIME_MICROS = "time-micros"; |
| private static final String LOGICAL_TYPE_TIMESTAMP_MILLIS = "timestamp-millis"; |
| private static final String LOGICAL_TYPE_TIMESTAMP_MICROS = "timestamp-micros"; |
| private static final String LOGICAL_TYPE_DECIMAL = "decimal"; |
| |
| |
| public static Schema extractAvroSchema(final RecordSchema recordSchema) { |
| if (recordSchema == null) { |
| throw new IllegalArgumentException("RecordSchema cannot be null"); |
| } |
| |
| final Optional<String> schemaFormatOption = recordSchema.getSchemaFormat(); |
| if (!schemaFormatOption.isPresent()) { |
| return buildAvroSchema(recordSchema); |
| } |
| |
| final String schemaFormat = schemaFormatOption.get(); |
| if (!schemaFormat.equals(AVRO_SCHEMA_FORMAT)) { |
| return buildAvroSchema(recordSchema); |
| } |
| |
| final Optional<String> textOption = recordSchema.getSchemaText(); |
| if (!textOption.isPresent()) { |
| return buildAvroSchema(recordSchema); |
| } |
| |
| final String text = textOption.get(); |
| return new Schema.Parser().parse(text); |
| } |
| |
| private static Schema buildAvroSchema(final RecordSchema recordSchema) { |
| final List<Field> avroFields = new ArrayList<>(recordSchema.getFieldCount()); |
| for (final RecordField recordField : recordSchema.getFields()) { |
| avroFields.add(buildAvroField(recordField, "")); |
| } |
| |
| final Schema avroSchema = Schema.createRecord("nifiRecord", null, "org.apache.nifi", false, avroFields); |
| return avroSchema; |
| } |
| |
| private static Field buildAvroField(final RecordField recordField, String fieldNamePrefix) { |
| final Schema schema = buildAvroSchema(recordField.getDataType(), recordField.getFieldName(), fieldNamePrefix, recordField.isNullable()); |
| |
| final Field field; |
| final String recordFieldName = recordField.getFieldName(); |
| if (isValidAvroFieldName(recordFieldName)) { |
| field = new Field(recordField.getFieldName(), schema, null, recordField.getDefaultValue()); |
| } else { |
| final String validName = createValidAvroFieldName(recordField.getFieldName()); |
| field = new Field(validName, schema, null, recordField.getDefaultValue()); |
| field.addAlias(recordField.getFieldName()); |
| } |
| |
| for (final String alias : recordField.getAliases()) { |
| field.addAlias(alias); |
| } |
| |
| return field; |
| } |
| |
| private static boolean isValidAvroFieldName(final String fieldName) { |
| // Avro field names must match the following criteria: |
| // 1. Must be non-empty |
| // 2. Must begin with a letter or an underscore |
| // 3. Must consist only of letters, underscores, and numbers. |
| if (fieldName.isEmpty()) { |
| return false; |
| } |
| |
| final char firstChar = fieldName.charAt(0); |
| if (firstChar != '_' && !Character.isLetter(firstChar)) { |
| return false; |
| } |
| |
| for (int i=1; i < fieldName.length(); i++) { |
| final char c = fieldName.charAt(i); |
| if (c != '_' && !Character.isLetterOrDigit(c)) { |
| return false; |
| } |
| } |
| |
| return true; |
| } |
| |
| private static String createValidAvroFieldName(final String fieldName) { |
| if (fieldName.isEmpty()) { |
| return "UNNAMED_FIELD"; |
| } |
| |
| final StringBuilder sb = new StringBuilder(); |
| |
| final char firstChar = fieldName.charAt(0); |
| if (firstChar == '_' || Character.isLetter(firstChar)) { |
| sb.append(firstChar); |
| } else { |
| sb.append("_"); |
| } |
| |
| for (int i=1; i < fieldName.length(); i++) { |
| final char c = fieldName.charAt(i); |
| if (c == '_' || Character.isLetterOrDigit(c)) { |
| sb.append(c); |
| } else { |
| sb.append("_"); |
| } |
| } |
| |
| return sb.toString(); |
| } |
| |
| private static Schema buildAvroSchema(final DataType dataType, final String fieldName, String fieldNamePrefix, final boolean nullable) { |
| final Schema schema; |
| |
| switch (dataType.getFieldType()) { |
| case ARRAY: |
| final ArrayDataType arrayDataType = (ArrayDataType) dataType; |
| final DataType elementDataType = arrayDataType.getElementType(); |
| if (RecordFieldType.BYTE.equals(elementDataType.getFieldType())) { |
| schema = Schema.create(Type.BYTES); |
| } else { |
| final Schema elementType = buildAvroSchema(elementDataType, fieldName, fieldNamePrefix, false); |
| schema = Schema.createArray(elementType); |
| } |
| break; |
| case BIGINT: |
| schema = Schema.create(Type.STRING); |
| break; |
| case BOOLEAN: |
| schema = Schema.create(Type.BOOLEAN); |
| break; |
| case BYTE: |
| schema = Schema.create(Type.INT); |
| break; |
| case CHAR: |
| schema = Schema.create(Type.STRING); |
| break; |
| case CHOICE: |
| final ChoiceDataType choiceDataType = (ChoiceDataType) dataType; |
| final List<DataType> options = choiceDataType.getPossibleSubTypes(); |
| |
| // We need to keep track of which types have been added to the union, because if we have |
| // two elements in the UNION with the same type, it will fail - even if the logical type is |
| // different. So if we have an int and a logical type date (which also has a 'concrete type' of int) |
| // then an Exception will be thrown when we try to create the union. To avoid this, we just keep track |
| // of the Types and avoid adding it in such a case. |
| final List<Schema> unionTypes = new ArrayList<>(options.size()); |
| final Set<Type> typesAdded = new HashSet<>(); |
| |
| int optionCounter = 1; |
| for (final DataType option : options) { |
| final Schema optionSchema = buildAvroSchema(option, fieldName, fieldNamePrefix, false); |
| if (!typesAdded.contains(optionSchema.getType())) { |
| unionTypes.add(optionSchema); |
| typesAdded.add(optionSchema.getType()); |
| } else if (Type.RECORD.equals(optionSchema.getType()) && !unionTypes.contains(optionSchema)) { |
| final Schema indexedOptionSchema = buildAvroSchema(option, fieldName + ++optionCounter, fieldNamePrefix, false); |
| unionTypes.add(indexedOptionSchema); |
| } |
| } |
| |
| schema = Schema.createUnion(unionTypes); |
| break; |
| case DATE: |
| schema = Schema.create(Type.INT); |
| LogicalTypes.date().addToSchema(schema); |
| break; |
| case DOUBLE: |
| schema = Schema.create(Type.DOUBLE); |
| break; |
| case FLOAT: |
| schema = Schema.create(Type.FLOAT); |
| break; |
| case INT: |
| schema = Schema.create(Type.INT); |
| break; |
| case LONG: |
| schema = Schema.create(Type.LONG); |
| break; |
| case DECIMAL: |
| final DecimalDataType decimalDataType = (DecimalDataType) dataType; |
| schema = Schema.create(Type.BYTES); |
| LogicalTypes.decimal(decimalDataType.getPrecision(), decimalDataType.getScale()).addToSchema(schema); |
| break; |
| case MAP: |
| schema = Schema.createMap(buildAvroSchema(((MapDataType) dataType).getValueType(), fieldName, fieldNamePrefix, false)); |
| break; |
| case RECORD: |
| final RecordDataType recordDataType = (RecordDataType) dataType; |
| final RecordSchema childSchema = recordDataType.getChildSchema(); |
| |
| final List<Field> childFields = new ArrayList<>(childSchema.getFieldCount()); |
| for (final RecordField field : childSchema.getFields()) { |
| String childFieldNamePrefix = StringUtils.isBlank(fieldNamePrefix) ? fieldName + "_" : fieldNamePrefix + fieldName + "_"; |
| childFields.add(buildAvroField(field, childFieldNamePrefix)); |
| } |
| |
| schema = Schema.createRecord(fieldNamePrefix + fieldName + "Type", null, "org.apache.nifi", false, childFields); |
| break; |
| case SHORT: |
| schema = Schema.create(Type.INT); |
| break; |
| case STRING: |
| schema = Schema.create(Type.STRING); |
| break; |
| case TIME: |
| schema = Schema.create(Type.INT); |
| LogicalTypes.timeMillis().addToSchema(schema); |
| break; |
| case TIMESTAMP: |
| schema = Schema.create(Type.LONG); |
| LogicalTypes.timestampMillis().addToSchema(schema); |
| break; |
| default: |
| return null; |
| } |
| |
| if (nullable) { |
| return nullable(schema); |
| } else { |
| return schema; |
| } |
| } |
| |
| private static Schema nullable(final Schema schema) { |
| if (schema.getType() == Type.UNION) { |
| final List<Schema> unionTypes = new ArrayList<>(schema.getTypes()); |
| final Schema nullSchema = Schema.create(Type.NULL); |
| if (unionTypes.contains(nullSchema)) { |
| return schema; |
| } |
| |
| unionTypes.add(nullSchema); |
| return Schema.createUnion(unionTypes); |
| } |
| |
| return Schema.createUnion(Schema.create(Type.NULL), schema); |
| } |
| |
| /** |
| * Returns a DataType for the given Avro Schema |
| * |
| * @param avroSchema the Avro Schema to convert |
| * @return a Data Type that corresponds to the given Avro Schema |
| */ |
| public static DataType determineDataType(final Schema avroSchema) { |
| return determineDataType(avroSchema, new HashMap<>()); |
| } |
| |
| public static DataType determineDataType(final Schema avroSchema, Map<String, DataType> knownRecordTypes) { |
| |
| if (knownRecordTypes == null) { |
| throw new IllegalArgumentException("'knownRecordTypes' cannot be null."); |
| } |
| |
| final Type avroType = avroSchema.getType(); |
| |
| final LogicalType logicalType = avroSchema.getLogicalType(); |
| if (logicalType != null) { |
| final String logicalTypeName = logicalType.getName(); |
| switch (logicalTypeName) { |
| case LOGICAL_TYPE_DATE: |
| return RecordFieldType.DATE.getDataType(); |
| case LOGICAL_TYPE_TIME_MILLIS: |
| case LOGICAL_TYPE_TIME_MICROS: |
| return RecordFieldType.TIME.getDataType(); |
| case LOGICAL_TYPE_TIMESTAMP_MILLIS: |
| case LOGICAL_TYPE_TIMESTAMP_MICROS: |
| return RecordFieldType.TIMESTAMP.getDataType(); |
| case LOGICAL_TYPE_DECIMAL: |
| final LogicalTypes.Decimal decimal = (LogicalTypes.Decimal) logicalType; |
| return RecordFieldType.DECIMAL.getDecimalDataType(decimal.getPrecision(), decimal.getScale()); |
| } |
| } |
| |
| switch (avroType) { |
| case ARRAY: |
| final DataType elementType = determineDataType(avroSchema.getElementType(), knownRecordTypes); |
| final boolean elementsNullable = isNullable(avroSchema.getElementType()); |
| return RecordFieldType.ARRAY.getArrayDataType(elementType, elementsNullable); |
| case BYTES: |
| case FIXED: |
| return RecordFieldType.ARRAY.getArrayDataType(RecordFieldType.BYTE.getDataType()); |
| case BOOLEAN: |
| return RecordFieldType.BOOLEAN.getDataType(); |
| case DOUBLE: |
| return RecordFieldType.DOUBLE.getDataType(); |
| case ENUM: |
| return RecordFieldType.ENUM.getEnumDataType(avroSchema.getEnumSymbols()); |
| case STRING: |
| return RecordFieldType.STRING.getDataType(); |
| case FLOAT: |
| return RecordFieldType.FLOAT.getDataType(); |
| case INT: |
| return RecordFieldType.INT.getDataType(); |
| case LONG: |
| return RecordFieldType.LONG.getDataType(); |
| case RECORD: { |
| String schemaFullName = avroSchema.getNamespace() + "." + avroSchema.getName(); |
| |
| if (knownRecordTypes.containsKey(schemaFullName)) { |
| return knownRecordTypes.get(schemaFullName); |
| } else { |
| SimpleRecordSchema recordSchema = new SimpleRecordSchema(SchemaIdentifier.EMPTY); |
| recordSchema.setSchemaName(avroSchema.getName()); |
| recordSchema.setSchemaNamespace(avroSchema.getNamespace()); |
| DataType recordSchemaType = RecordFieldType.RECORD.getRecordDataType(recordSchema); |
| knownRecordTypes.put(schemaFullName, recordSchemaType); |
| |
| final List<Field> avroFields = avroSchema.getFields(); |
| final List<RecordField> recordFields = new ArrayList<>(avroFields.size()); |
| |
| for (final Field field : avroFields) { |
| final String fieldName = field.name(); |
| final Schema fieldSchema = field.schema(); |
| final DataType fieldType = determineDataType(fieldSchema, knownRecordTypes); |
| final boolean nullable = isNullable(fieldSchema); |
| addFieldToList(recordFields, field, fieldName, fieldSchema, fieldType, nullable); |
| } |
| |
| recordSchema.setFields(recordFields); |
| return recordSchemaType; |
| } |
| } |
| case NULL: |
| return RecordFieldType.STRING.getDataType(); |
| case MAP: |
| final Schema valueSchema = avroSchema.getValueType(); |
| final DataType valueType = determineDataType(valueSchema, knownRecordTypes); |
| final boolean valuesNullable = isNullable(valueSchema); |
| return RecordFieldType.MAP.getMapDataType(valueType, valuesNullable); |
| case UNION: { |
| final List<Schema> nonNullSubSchemas = getNonNullSubSchemas(avroSchema); |
| |
| if (nonNullSubSchemas.size() == 1) { |
| return determineDataType(nonNullSubSchemas.get(0), knownRecordTypes); |
| } |
| |
| final List<DataType> possibleChildTypes = new ArrayList<>(nonNullSubSchemas.size()); |
| for (final Schema subSchema : nonNullSubSchemas) { |
| final DataType childDataType = determineDataType(subSchema, knownRecordTypes); |
| possibleChildTypes.add(childDataType); |
| } |
| |
| return RecordFieldType.CHOICE.getChoiceDataType(possibleChildTypes); |
| } |
| } |
| |
| return null; |
| } |
| |
| private static List<Schema> getNonNullSubSchemas(final Schema avroSchema) { |
| final List<Schema> unionFieldSchemas = avroSchema.getTypes(); |
| if (unionFieldSchemas == null) { |
| return Collections.emptyList(); |
| } |
| |
| final List<Schema> nonNullTypes = new ArrayList<>(unionFieldSchemas.size()); |
| for (final Schema fieldSchema : unionFieldSchemas) { |
| if (fieldSchema.getType() != Type.NULL) { |
| nonNullTypes.add(fieldSchema); |
| } |
| } |
| |
| return nonNullTypes; |
| } |
| |
| public static RecordSchema createSchema(final Schema avroSchema) { |
| return createSchema(avroSchema, true); |
| } |
| |
| public static RecordSchema createSchema(final Schema avroSchema, final boolean includeText) { |
| if (avroSchema == null) { |
| throw new IllegalArgumentException("Avro Schema cannot be null"); |
| } |
| |
| SchemaIdentifier identifier = new StandardSchemaIdentifier.Builder().name(avroSchema.getName()).build(); |
| return createSchema(avroSchema, includeText ? avroSchema.toString() : null, identifier); |
| } |
| |
| /** |
| * Converts an Avro Schema to a RecordSchema |
| * |
| * @param avroSchema the Avro Schema to convert |
| * @param schemaText the textual representation of the schema |
| * @param schemaId the identifier of the schema |
| * @return the Corresponding Record Schema |
| */ |
| public static RecordSchema createSchema(final Schema avroSchema, final String schemaText, final SchemaIdentifier schemaId) { |
| if (avroSchema == null) { |
| throw new IllegalArgumentException("Avro Schema cannot be null"); |
| } |
| |
| final String schemaFullName = avroSchema.getNamespace() + "." + avroSchema.getName(); |
| final SimpleRecordSchema recordSchema = schemaText == null ? new SimpleRecordSchema(schemaId) : new SimpleRecordSchema(schemaText, AVRO_SCHEMA_FORMAT, schemaId); |
| recordSchema.setSchemaName(avroSchema.getName()); |
| recordSchema.setSchemaNamespace(avroSchema.getNamespace()); |
| final DataType recordSchemaType = RecordFieldType.RECORD.getRecordDataType(recordSchema); |
| final Map<String, DataType> knownRecords = new HashMap<>(); |
| knownRecords.put(schemaFullName, recordSchemaType); |
| |
| final List<RecordField> recordFields = new ArrayList<>(avroSchema.getFields().size()); |
| for (final Field field : avroSchema.getFields()) { |
| final String fieldName = field.name(); |
| final Schema fieldSchema = field.schema(); |
| final DataType dataType = AvroTypeUtil.determineDataType(fieldSchema, knownRecords); |
| final boolean nullable = isNullable(fieldSchema); |
| addFieldToList(recordFields, field, fieldName, fieldSchema, dataType, nullable); |
| } |
| |
| recordSchema.setFields(recordFields); |
| return recordSchema; |
| } |
| |
| public static boolean isNullable(final Schema schema) { |
| final Type schemaType = schema.getType(); |
| if (schemaType == Type.UNION) { |
| for (final Schema unionSchema : schema.getTypes()) { |
| if (isNullable(unionSchema)) { |
| return true; |
| } |
| } |
| } |
| |
| return schemaType == Type.NULL; |
| } |
| |
| public static Object[] convertByteArray(final byte[] bytes) { |
| final Object[] array = new Object[bytes.length]; |
| for (int i = 0; i < bytes.length; i++) { |
| array[i] = Byte.valueOf(bytes[i]); |
| } |
| return array; |
| } |
| |
| public static ByteBuffer convertByteArray(final Object[] bytes) { |
| final ByteBuffer bb = ByteBuffer.allocate(bytes.length); |
| for (final Object o : bytes) { |
| if (o instanceof Byte) { |
| bb.put(((Byte) o).byteValue()); |
| } else { |
| throw new IllegalTypeConversionException("Cannot convert value " + bytes + " of type " + bytes.getClass() + " to ByteBuffer"); |
| } |
| } |
| bb.flip(); |
| return bb; |
| } |
| |
| /** |
| * Method that attempts to map a record field into a provided schema |
| * @param avroSchema - Schema to map into |
| * @param recordField - The field of the record to be mapped |
| * @return Pair with the LHS being the field name and RHS being the mapped field from the schema |
| */ |
| protected static Pair<String, Field> lookupField(final Schema avroSchema, final RecordField recordField) { |
| String fieldName = recordField.getFieldName(); |
| |
| // Attempt to locate the field as is in a true 1:1 mapping with the same name |
| Field field = avroSchema.getField(fieldName); |
| if (field == null) { |
| // No straight mapping was found, so check the aliases to see if it can be mapped |
| for (final String alias : recordField.getAliases()) { |
| field = avroSchema.getField(alias); |
| if (field != null) { |
| fieldName = alias; |
| break; |
| } |
| } |
| } |
| |
| if (field == null) { |
| for (final Field childField : avroSchema.getFields()) { |
| final Set<String> aliases = childField.aliases(); |
| if (aliases.isEmpty()) { |
| continue; |
| } |
| |
| if (aliases.contains(fieldName)) { |
| field = childField; |
| break; |
| } |
| |
| for (final String alias : recordField.getAliases()) { |
| if (aliases.contains(alias)) { |
| field = childField; |
| fieldName = alias; |
| break; |
| } |
| } |
| } |
| } |
| |
| return new ImmutablePair<>(fieldName, field); |
| } |
| |
| public static GenericRecord createAvroRecord(final Record record, final Schema avroSchema) throws IOException { |
| return createAvroRecord(record, avroSchema, StandardCharsets.UTF_8); |
| } |
| |
| public static GenericRecord createAvroRecord(final Record record, final Schema avroSchema, final Charset charset) throws IOException { |
| final GenericRecord rec = new GenericData.Record(avroSchema); |
| final RecordSchema recordSchema = record.getSchema(); |
| |
| final Map<String, Object> recordValues = record.toMap(); |
| for (final Map.Entry<String, Object> entry : recordValues.entrySet()) { |
| final Object rawValue = entry.getValue(); |
| if (rawValue == null) { |
| continue; |
| } |
| |
| final String rawFieldName = entry.getKey(); |
| final Optional<RecordField> optionalRecordField = recordSchema.getField(rawFieldName); |
| if (!optionalRecordField.isPresent()) { |
| continue; |
| } |
| |
| final RecordField recordField = optionalRecordField.get(); |
| |
| final Field field; |
| final Field avroField = avroSchema.getField(rawFieldName); |
| if (avroField == null) { |
| final Pair<String, Field> fieldPair = lookupField(avroSchema, recordField); |
| field = fieldPair.getRight(); |
| |
| if (field == null) { |
| continue; |
| } |
| } else { |
| field = avroField; |
| } |
| |
| final String fieldName = field.name(); |
| final Object converted = convertToAvroObject(rawValue, field.schema(), fieldName, charset); |
| rec.put(fieldName, converted); |
| } |
| |
| return rec; |
| } |
| |
| /** |
| * Convert a raw value to an Avro object to serialize in Avro type system, using the provided character set when necessary. |
| * The counter-part method which reads an Avro object back to a raw value is {@link #normalizeValue(Object, Schema, String)}. |
| */ |
| public static Object convertToAvroObject(final Object rawValue, final Schema fieldSchema) { |
| return convertToAvroObject(rawValue, fieldSchema, StandardCharsets.UTF_8); |
| } |
| |
| /** |
| * Convert a raw value to an Avro object to serialize in Avro type system, using the provided character set when necessary. |
| * The counter-part method which reads an Avro object back to a raw value is {@link #normalizeValue(Object, Schema, String)}. |
| */ |
| public static Object convertToAvroObject(final Object rawValue, final Schema fieldSchema, final Charset charset) { |
| return convertToAvroObject(rawValue, fieldSchema, fieldSchema.getName(), charset); |
| } |
| |
| /** |
| * Adds fields to <tt>recordFields</tt> list. |
| * @param recordFields - record fields are added to this list. |
| * @param field - the field |
| * @param fieldName - field name |
| * @param fieldSchema - field schema |
| * @param dataType - data type |
| * @param nullable - is nullable? |
| */ |
| private static void addFieldToList(final List<RecordField> recordFields, final Field field, final String fieldName, |
| final Schema fieldSchema, final DataType dataType, final boolean nullable) { |
| if (field.defaultVal() == JsonProperties.NULL_VALUE) { |
| recordFields.add(new RecordField(fieldName, dataType, field.aliases(), nullable)); |
| } else { |
| Object defaultValue = field.defaultVal(); |
| if (defaultValue != null && fieldSchema.getType() == Schema.Type.ARRAY && !DataTypeUtils.isArrayTypeCompatible(defaultValue, ((ArrayDataType) dataType).getElementType())) { |
| defaultValue = defaultValue instanceof List ? ((List<?>) defaultValue).toArray() : new Object[0]; |
| } |
| recordFields.add(new RecordField(fieldName, dataType, defaultValue, field.aliases(), nullable)); |
| } |
| } |
| |
| private static Long getLongFromTimestamp(final Object rawValue, final Schema fieldSchema, final String fieldName) { |
| final String format = AvroTypeUtil.determineDataType(fieldSchema).getFormat(); |
| Timestamp t = DataTypeUtils.toTimestamp(rawValue, () -> DataTypeUtils.getDateFormat(format), fieldName); |
| return t.getTime(); |
| } |
| |
| @SuppressWarnings("unchecked") |
| private static Object convertToAvroObject(final Object rawValue, final Schema fieldSchema, final String fieldName, final Charset charset) { |
| if (rawValue == null) { |
| return null; |
| } |
| |
| switch (fieldSchema.getType()) { |
| case INT: { |
| final LogicalType logicalType = fieldSchema.getLogicalType(); |
| if (logicalType == null) { |
| return DataTypeUtils.toInteger(rawValue, fieldName); |
| } |
| |
| if (LOGICAL_TYPE_DATE.equals(logicalType.getName())) { |
| final String format = AvroTypeUtil.determineDataType(fieldSchema).getFormat(); |
| final java.sql.Date date = DataTypeUtils.toDate(rawValue, () -> DataTypeUtils.getDateFormat(format), fieldName); |
| return (int) ChronoUnit.DAYS.between(Instant.EPOCH, Instant.ofEpochMilli(date.getTime())); |
| } else if (LOGICAL_TYPE_TIME_MILLIS.equals(logicalType.getName())) { |
| final String format = AvroTypeUtil.determineDataType(fieldSchema).getFormat(); |
| final Time time = DataTypeUtils.toTime(rawValue, () -> DataTypeUtils.getDateFormat(format), fieldName); |
| final Date date = new Date(time.getTime()); |
| final Duration duration = Duration.between(date.toInstant().truncatedTo(ChronoUnit.DAYS), date.toInstant()); |
| final long millisSinceMidnight = duration.toMillis(); |
| return (int) millisSinceMidnight; |
| } |
| |
| return DataTypeUtils.toInteger(rawValue, fieldName); |
| } |
| case LONG: { |
| final LogicalType logicalType = fieldSchema.getLogicalType(); |
| if (logicalType == null) { |
| return DataTypeUtils.toLong(rawValue, fieldName); |
| } |
| |
| if (LOGICAL_TYPE_TIME_MICROS.equals(logicalType.getName())) { |
| final long longValue = getLongFromTimestamp(rawValue, fieldSchema, fieldName); |
| final Date date = new Date(longValue); |
| final Duration duration = Duration.between(date.toInstant().truncatedTo(ChronoUnit.DAYS), date.toInstant()); |
| return duration.toMillis() * 1000L; |
| } else if (LOGICAL_TYPE_TIMESTAMP_MILLIS.equals(logicalType.getName())) { |
| return getLongFromTimestamp(rawValue, fieldSchema, fieldName); |
| } else if (LOGICAL_TYPE_TIMESTAMP_MICROS.equals(logicalType.getName())) { |
| return getLongFromTimestamp(rawValue, fieldSchema, fieldName) * 1000L; |
| } |
| |
| return DataTypeUtils.toLong(rawValue, fieldName); |
| } |
| case BYTES: |
| case FIXED: |
| final LogicalType logicalType = fieldSchema.getLogicalType(); |
| if (logicalType != null && LOGICAL_TYPE_DECIMAL.equals(logicalType.getName())) { |
| final LogicalTypes.Decimal decimalType = (LogicalTypes.Decimal) logicalType; |
| final BigDecimal rawDecimal; |
| if (rawValue instanceof BigDecimal) { |
| rawDecimal = (BigDecimal) rawValue; |
| |
| } else if (rawValue instanceof Double) { |
| rawDecimal = BigDecimal.valueOf((Double) rawValue); |
| |
| } else if (rawValue instanceof String) { |
| rawDecimal = new BigDecimal((String) rawValue); |
| |
| } else if (rawValue instanceof Integer) { |
| rawDecimal = new BigDecimal((Integer) rawValue); |
| |
| } else if (rawValue instanceof Long) { |
| rawDecimal = new BigDecimal((Long) rawValue); |
| |
| } else { |
| throw new IllegalTypeConversionException("Cannot convert value " + rawValue + " of type " + rawValue.getClass() + " to a logical decimal"); |
| } |
| // If the desired scale is different than this value's coerce scale. |
| final int desiredScale = decimalType.getScale(); |
| final BigDecimal decimal = rawDecimal.scale() == desiredScale |
| ? rawDecimal : rawDecimal.setScale(desiredScale, BigDecimal.ROUND_HALF_UP); |
| return fieldSchema.getType() == Type.BYTES |
| ? new Conversions.DecimalConversion().toBytes(decimal, fieldSchema, logicalType) //return GenericByte |
| : new Conversions.DecimalConversion().toFixed(decimal, fieldSchema, logicalType); //return GenericFixed |
| } |
| if (rawValue instanceof byte[]) { |
| return ByteBuffer.wrap((byte[]) rawValue); |
| } |
| if (rawValue instanceof String) { |
| return ByteBuffer.wrap(((String) rawValue).getBytes(charset)); |
| } |
| if (rawValue instanceof Object[]) { |
| if (fieldSchema.getType() == Type.FIXED && "INT96".equals(fieldSchema.getName())) { |
| Object[] rawObjects = (Object[]) rawValue; |
| byte[] rawBytes = new byte[rawObjects.length]; |
| for (int elementIndex = 0; elementIndex < rawObjects.length; elementIndex++) { |
| rawBytes[elementIndex] = (Byte) rawObjects[elementIndex]; |
| } |
| |
| return new GenericData.Fixed(fieldSchema, rawBytes); |
| } else { |
| return AvroTypeUtil.convertByteArray((Object[]) rawValue); |
| } |
| } |
| try { |
| if (rawValue instanceof Blob) { |
| Blob blob = (Blob) rawValue; |
| return ByteBuffer.wrap(IOUtils.toByteArray(blob.getBinaryStream())); |
| } else { |
| throw new IllegalTypeConversionException("Cannot convert value " + rawValue + " of type " + rawValue.getClass() + " to a ByteBuffer"); |
| } |
| } catch (IllegalTypeConversionException itce) { |
| throw itce; |
| } catch (Exception e) { |
| throw new IllegalTypeConversionException("Cannot convert value " + rawValue + " of type " + rawValue.getClass() + " to a ByteBuffer", e); |
| } |
| case MAP: |
| if (rawValue instanceof Record) { |
| final Record recordValue = (Record) rawValue; |
| final Map<String, Object> map = new HashMap<>(); |
| for (final RecordField recordField : recordValue.getSchema().getFields()) { |
| final Object v = recordValue.getValue(recordField); |
| if (v != null) { |
| map.put(recordField.getFieldName(), convertToAvroObject(v, fieldSchema.getValueType(), fieldName + "[" + recordField.getFieldName() + "]", charset)); |
| } |
| } |
| |
| return map; |
| } else if (rawValue instanceof Map) { |
| final Map<String, Object> objectMap = (Map<String, Object>) rawValue; |
| final Map<String, Object> map = new HashMap<>(objectMap.size()); |
| for (final String s : objectMap.keySet()) { |
| final Object converted = convertToAvroObject(objectMap.get(s), fieldSchema.getValueType(), fieldName + "[" + s + "]", charset); |
| map.put(s, converted); |
| } |
| return map; |
| } else { |
| throw new IllegalTypeConversionException("Cannot convert value " + rawValue + " of type " + rawValue.getClass() + " to a Map"); |
| } |
| case RECORD: |
| final GenericData.Record avroRecord = new GenericData.Record(fieldSchema); |
| |
| final Set<Map.Entry<String, Object>> entries; |
| if (rawValue instanceof Map) { |
| final Map<String, Object> map = (Map<String, Object>) rawValue; |
| entries = map.entrySet(); |
| } else if (rawValue instanceof Record) { |
| entries = new HashSet<>(); |
| final Record record = (Record) rawValue; |
| record.getSchema().getFields().forEach(field -> entries.add(new AbstractMap.SimpleEntry<>(field.getFieldName(), record.getValue(field)))); |
| } else { |
| throw new IllegalTypeConversionException("Cannot convert value " + rawValue + " of type " + rawValue.getClass() + " to a Record"); |
| } |
| for (final Map.Entry<String, Object> e : entries) { |
| final Object recordFieldValue = e.getValue(); |
| final String recordFieldName = e.getKey(); |
| |
| final Field field = fieldSchema.getField(recordFieldName); |
| if (field == null) { |
| continue; |
| } |
| |
| final Object converted = convertToAvroObject(recordFieldValue, field.schema(), fieldName + "/" + recordFieldName, charset); |
| avroRecord.put(recordFieldName, converted); |
| } |
| return avroRecord; |
| case UNION: |
| return convertUnionFieldValue(rawValue, fieldSchema, schema -> convertToAvroObject(rawValue, schema, fieldName, charset), fieldName); |
| case ARRAY: |
| final Object[] objectArray; |
| if (rawValue instanceof List) { |
| objectArray = ((List) rawValue).toArray(); |
| } else if (rawValue instanceof Object[]) { |
| objectArray = (Object[]) rawValue; |
| } else { |
| throw new IllegalTypeConversionException("Cannot convert value " + rawValue + " of type " + rawValue.getClass() + " to an Array"); |
| } |
| final List<Object> list = new ArrayList<>(objectArray.length); |
| int i = 0; |
| for (final Object o : objectArray) { |
| final Object converted = convertToAvroObject(o, fieldSchema.getElementType(), fieldName + "[" + i + "]", charset); |
| list.add(converted); |
| i++; |
| } |
| return list; |
| case BOOLEAN: |
| return DataTypeUtils.toBoolean(rawValue, fieldName); |
| case DOUBLE: |
| return DataTypeUtils.toDouble(rawValue, fieldName); |
| case FLOAT: |
| return DataTypeUtils.toFloat(rawValue, fieldName); |
| case NULL: |
| return null; |
| case ENUM: |
| List<String> enums = fieldSchema.getEnumSymbols(); |
| if(enums != null && enums.contains(rawValue)) { |
| return new GenericData.EnumSymbol(fieldSchema, rawValue); |
| } else { |
| throw new IllegalTypeConversionException(rawValue + " is not a possible value of the ENUM" + enums + "."); |
| } |
| case STRING: |
| if (rawValue instanceof String) { |
| return rawValue; |
| } |
| |
| return DataTypeUtils.toString(rawValue, (String) null, charset); |
| } |
| |
| return rawValue; |
| } |
| |
| public static Map<String, Object> convertAvroRecordToMap(final GenericRecord avroRecord, final RecordSchema recordSchema) { |
| return convertAvroRecordToMap(avroRecord, recordSchema, StandardCharsets.UTF_8); |
| } |
| |
| public static Map<String, Object> convertAvroRecordToMap(final GenericRecord avroRecord, final RecordSchema recordSchema, final Charset charset) { |
| final Map<String, Object> values = new HashMap<>(recordSchema.getFieldCount()); |
| |
| for (final RecordField recordField : recordSchema.getFields()) { |
| |
| Object value = avroRecord.get(recordField.getFieldName()); |
| if (value == null) { |
| for (final String alias : recordField.getAliases()) { |
| value = avroRecord.get(alias); |
| if (value != null) { |
| break; |
| } |
| } |
| } |
| |
| final String fieldName = recordField.getFieldName(); |
| try { |
| final Field avroField = avroRecord.getSchema().getField(fieldName); |
| if (avroField == null) { |
| values.put(fieldName, null); |
| continue; |
| } |
| |
| final Schema fieldSchema = avroField.schema(); |
| final Object rawValue = normalizeValue(value, fieldSchema, fieldName); |
| |
| final DataType desiredType = recordField.getDataType(); |
| final Object coercedValue = DataTypeUtils.convertType(rawValue, desiredType, fieldName, charset); |
| |
| values.put(fieldName, coercedValue); |
| } catch (Exception ex) { |
| logger.debug("fail to convert field " + fieldName, ex ); |
| throw ex; |
| } |
| } |
| |
| return values; |
| } |
| |
| /** |
| * Convert value of a nullable union field. |
| * @param originalValue original value |
| * @param fieldSchema the union field schema |
| * @param conversion the conversion function which takes a non-null field schema within the union field and returns a converted value |
| * @return a converted value |
| */ |
| private static Object convertUnionFieldValue(final Object originalValue, final Schema fieldSchema, final Function<Schema, Object> conversion, final String fieldName) { |
| boolean foundNonNull = false; |
| |
| // It is an extremely common case to have a UNION type because a field can be NULL or some other type. In this situation, |
| // we will have two possible types, and one of them will be null. When this happens, we can be much more efficient by simply |
| // determining the non-null type and converting to that. |
| final List<Schema> schemaTypes = fieldSchema.getTypes(); |
| if (schemaTypes.size() == 2) { |
| final Schema firstSchema = schemaTypes.get(0); |
| final Schema secondSchema = schemaTypes.get(1); |
| |
| if (firstSchema.getType() == Type.NULL) { |
| return conversion.apply(secondSchema); |
| } |
| if (secondSchema.getType() == Type.NULL) { |
| return conversion.apply(firstSchema); |
| } |
| } |
| |
| final Optional<Schema> mostSuitableType = DataTypeUtils.findMostSuitableType( |
| originalValue, |
| getNonNullSubSchemas(fieldSchema), |
| AvroTypeUtil::determineDataType |
| ); |
| if (mostSuitableType.isPresent()) { |
| return conversion.apply(mostSuitableType.get()); |
| } |
| |
| for (final Schema subSchema : fieldSchema.getTypes()) { |
| if (subSchema.getType() == Type.NULL) { |
| continue; |
| } |
| |
| foundNonNull = true; |
| final DataType desiredDataType = AvroTypeUtil.determineDataType(subSchema); |
| try { |
| final Object convertedValue = conversion.apply(subSchema); |
| |
| if (isCompatibleDataType(convertedValue, desiredDataType)) { |
| return convertedValue; |
| } |
| |
| // For logical types those store with different type (e.g. BigDecimal as ByteBuffer), check compatibility using the original rawValue |
| if (subSchema.getLogicalType() != null && DataTypeUtils.isCompatibleDataType(originalValue, desiredDataType)) { |
| return convertedValue; |
| } |
| } catch (Exception e) { |
| // If failed with one of possible types, continue with the next available option. |
| if (logger.isDebugEnabled()) { |
| logger.debug("Cannot convert value {} to type {}", originalValue, desiredDataType, e); |
| } |
| } |
| } |
| |
| if (foundNonNull) { |
| throw new IllegalTypeConversionException("Cannot convert value " + originalValue + " of type " + originalValue.getClass() |
| + " because no compatible types exist in the UNION for field " + fieldName); |
| } |
| |
| return null; |
| } |
| |
| private static boolean isCompatibleDataType(final Object value, final DataType dataType) { |
| if (value == null) { |
| return false; |
| } |
| |
| switch (dataType.getFieldType()) { |
| case RECORD: |
| if (value instanceof GenericRecord || value instanceof SpecificRecord) { |
| return true; |
| } |
| break; |
| case STRING: |
| if (value instanceof Utf8) { |
| return true; |
| } |
| break; |
| case ARRAY: |
| if (value instanceof Array || value instanceof List || value instanceof ByteBuffer) { |
| return true; |
| } |
| break; |
| case MAP: |
| if (value instanceof Map) { |
| return true; |
| } |
| } |
| |
| return DataTypeUtils.isCompatibleDataType(value, dataType); |
| } |
| |
| |
| /** |
| * Convert an Avro object to a normal Java objects for further processing. |
| * The counter-part method which convert a raw value to an Avro object is {@link #convertToAvroObject(Object, Schema, String, Charset)} |
| */ |
| private static Object normalizeValue(final Object value, final Schema avroSchema, final String fieldName) { |
| if (value == null) { |
| return null; |
| } |
| |
| switch (avroSchema.getType()) { |
| case INT: { |
| final LogicalType logicalType = avroSchema.getLogicalType(); |
| if (logicalType == null) { |
| return value; |
| } |
| |
| final String logicalName = logicalType.getName(); |
| if (LOGICAL_TYPE_DATE.equals(logicalName)) { |
| // date logical name means that the value is number of days since Jan 1, 1970 |
| return new java.sql.Date(TimeUnit.DAYS.toMillis((int) value)); |
| } else if (LOGICAL_TYPE_TIME_MILLIS.equals(logicalName)) { |
| // time-millis logical name means that the value is number of milliseconds since midnight. |
| return new java.sql.Time((int) value); |
| } |
| |
| break; |
| } |
| case LONG: { |
| final LogicalType logicalType = avroSchema.getLogicalType(); |
| if (logicalType == null) { |
| return value; |
| } |
| |
| final String logicalName = logicalType.getName(); |
| if (LOGICAL_TYPE_TIME_MICROS.equals(logicalName)) { |
| return new java.sql.Time(TimeUnit.MICROSECONDS.toMillis((long) value)); |
| } else if (LOGICAL_TYPE_TIMESTAMP_MILLIS.equals(logicalName)) { |
| return new java.sql.Timestamp((long) value); |
| } else if (LOGICAL_TYPE_TIMESTAMP_MICROS.equals(logicalName)) { |
| return new java.sql.Timestamp(TimeUnit.MICROSECONDS.toMillis((long) value)); |
| } |
| break; |
| } |
| case UNION: |
| if (value instanceof GenericData.Record) { |
| final GenericData.Record avroRecord = (GenericData.Record) value; |
| return normalizeValue(value, avroRecord.getSchema(), fieldName); |
| } |
| return convertUnionFieldValue(value, avroSchema, schema -> normalizeValue(value, schema, fieldName), fieldName); |
| case RECORD: |
| final GenericData.Record record = (GenericData.Record) value; |
| final Schema recordSchema = record.getSchema(); |
| final List<Field> recordFields = recordSchema.getFields(); |
| final Map<String, Object> values = new HashMap<>(recordFields.size()); |
| for (final Field field : recordFields) { |
| final Object avroFieldValue = record.get(field.name()); |
| final Object fieldValue = normalizeValue(avroFieldValue, field.schema(), fieldName + "/" + field.name()); |
| values.put(field.name(), fieldValue); |
| } |
| final RecordSchema childSchema = AvroTypeUtil.createSchema(recordSchema, false); |
| return new MapRecord(childSchema, values); |
| case BYTES: |
| final ByteBuffer bb = (ByteBuffer) value; |
| final LogicalType logicalType = avroSchema.getLogicalType(); |
| if (logicalType != null && LOGICAL_TYPE_DECIMAL.equals(logicalType.getName())) { |
| return new Conversions.DecimalConversion().fromBytes(bb, avroSchema, logicalType); |
| } |
| return AvroTypeUtil.convertByteArray(bb.array()); |
| case FIXED: |
| final GenericFixed fixed = (GenericFixed) value; |
| final LogicalType fixedLogicalType = avroSchema.getLogicalType(); |
| if (fixedLogicalType != null && LOGICAL_TYPE_DECIMAL.equals(fixedLogicalType.getName())) { |
| final ByteBuffer fixedByteBuffer = ByteBuffer.wrap(fixed.bytes()); |
| return new Conversions.DecimalConversion().fromBytes(fixedByteBuffer, avroSchema, fixedLogicalType); |
| } |
| return AvroTypeUtil.convertByteArray(fixed.bytes()); |
| case ENUM: |
| return value.toString(); |
| case NULL: |
| return null; |
| case STRING: |
| return value.toString(); |
| case ARRAY: |
| if (value instanceof List) { |
| final List<?> list = (List<?>) value; |
| final Object[] valueArray = new Object[list.size()]; |
| for (int i = 0; i < list.size(); i++) { |
| final Schema elementSchema = avroSchema.getElementType(); |
| valueArray[i] = normalizeValue(list.get(i), elementSchema, fieldName + "[" + i + "]"); |
| } |
| return valueArray; |
| } else { |
| final GenericData.Array<?> array = (GenericData.Array<?>) value; |
| final Object[] valueArray = new Object[array.size()]; |
| for (int i = 0; i < array.size(); i++) { |
| final Schema elementSchema = avroSchema.getElementType(); |
| valueArray[i] = normalizeValue(array.get(i), elementSchema, fieldName + "[" + i + "]"); |
| } |
| return valueArray; |
| } |
| case MAP: |
| final Map<?, ?> avroMap = (Map<?, ?>) value; |
| final Map<String, Object> map = new HashMap<>(avroMap.size()); |
| for (final Map.Entry<?, ?> entry : avroMap.entrySet()) { |
| Object obj = entry.getValue(); |
| if (obj instanceof Utf8 || obj instanceof CharSequence) { |
| obj = obj.toString(); |
| } |
| |
| final String key = entry.getKey().toString(); |
| obj = normalizeValue(obj, avroSchema.getValueType(), fieldName + "[" + key + "]"); |
| |
| map.put(key, obj); |
| } |
| |
| return map; |
| } |
| |
| return value; |
| } |
| |
| } |