blob: 8855f87bd230f6ea19824e1955c413ad5111dffd [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hudi.util;
import org.apache.avro.generic.GenericFixed;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.generic.IndexedRecord;
import org.apache.flink.annotation.Internal;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.data.DecimalData;
import org.apache.flink.table.data.GenericArrayData;
import org.apache.flink.table.data.GenericMapData;
import org.apache.flink.table.data.GenericRowData;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.data.StringData;
import org.apache.flink.table.data.TimestampData;
import org.apache.flink.table.types.logical.ArrayType;
import org.apache.flink.table.types.logical.DecimalType;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.table.types.logical.TimestampType;
import org.apache.flink.table.types.logical.utils.LogicalTypeUtils;
import org.joda.time.DateTime;
import org.joda.time.DateTimeFieldType;
import java.io.Serializable;
import java.lang.reflect.Array;
import java.nio.ByteBuffer;
import java.time.Instant;
import java.time.LocalDate;
import java.time.LocalTime;
import java.time.temporal.ChronoField;
import java.time.temporal.ChronoUnit;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
/**
* Tool class used to convert from Avro {@link GenericRecord} to {@link RowData}.
*
* <p>NOTE: reference from Flink release 1.12.0, should remove when Flink version upgrade to that.
*/
@Internal
public class AvroToRowDataConverters {
/**
* Runtime converter that converts Avro data structures into objects of Flink Table & SQL
* internal data structures.
*/
@FunctionalInterface
public interface AvroToRowDataConverter extends Serializable {
Object convert(Object object);
}
// -------------------------------------------------------------------------------------
// Runtime Converters
// -------------------------------------------------------------------------------------
public static AvroToRowDataConverter createRowConverter(RowType rowType) {
final AvroToRowDataConverter[] fieldConverters =
rowType.getFields().stream()
.map(RowType.RowField::getType)
.map(AvroToRowDataConverters::createNullableConverter)
.toArray(AvroToRowDataConverter[]::new);
final int arity = rowType.getFieldCount();
return avroObject -> {
IndexedRecord record = (IndexedRecord) avroObject;
GenericRowData row = new GenericRowData(arity);
for (int i = 0; i < arity; ++i) {
row.setField(i, fieldConverters[i].convert(record.get(i)));
}
return row;
};
}
/**
* Creates a runtime converter which is null safe.
*/
private static AvroToRowDataConverter createNullableConverter(LogicalType type) {
final AvroToRowDataConverter converter = createConverter(type);
return avroObject -> {
if (avroObject == null) {
return null;
}
return converter.convert(avroObject);
};
}
/**
* Creates a runtime converter which assuming input object is not null.
*/
private static AvroToRowDataConverter createConverter(LogicalType type) {
switch (type.getTypeRoot()) {
case NULL:
return avroObject -> null;
case TINYINT:
return avroObject -> ((Integer) avroObject).byteValue();
case SMALLINT:
return avroObject -> ((Integer) avroObject).shortValue();
case BOOLEAN: // boolean
case INTEGER: // int
case INTERVAL_YEAR_MONTH: // long
case BIGINT: // long
case INTERVAL_DAY_TIME: // long
case FLOAT: // float
case DOUBLE: // double
return avroObject -> avroObject;
case DATE:
return AvroToRowDataConverters::convertToDate;
case TIME_WITHOUT_TIME_ZONE:
return AvroToRowDataConverters::convertToTime;
case TIMESTAMP_WITHOUT_TIME_ZONE:
return createTimestampConverter(((TimestampType) type).getPrecision());
case CHAR:
case VARCHAR:
return avroObject -> StringData.fromString(avroObject.toString());
case BINARY:
case VARBINARY:
return AvroToRowDataConverters::convertToBytes;
case DECIMAL:
return createDecimalConverter((DecimalType) type);
case ARRAY:
return createArrayConverter((ArrayType) type);
case ROW:
return createRowConverter((RowType) type);
case MAP:
case MULTISET:
return createMapConverter(type);
case RAW:
default:
throw new UnsupportedOperationException("Unsupported type: " + type);
}
}
private static AvroToRowDataConverter createDecimalConverter(DecimalType decimalType) {
final int precision = decimalType.getPrecision();
final int scale = decimalType.getScale();
return avroObject -> {
final byte[] bytes;
if (avroObject instanceof GenericFixed) {
bytes = ((GenericFixed) avroObject).bytes();
} else if (avroObject instanceof ByteBuffer) {
ByteBuffer byteBuffer = (ByteBuffer) avroObject;
bytes = new byte[byteBuffer.remaining()];
byteBuffer.get(bytes);
} else {
bytes = (byte[]) avroObject;
}
return DecimalData.fromUnscaledBytes(bytes, precision, scale);
};
}
private static AvroToRowDataConverter createArrayConverter(ArrayType arrayType) {
final AvroToRowDataConverter elementConverter =
createNullableConverter(arrayType.getElementType());
final Class<?> elementClass =
LogicalTypeUtils.toInternalConversionClass(arrayType.getElementType());
return avroObject -> {
final List<?> list = (List<?>) avroObject;
final int length = list.size();
final Object[] array = (Object[]) Array.newInstance(elementClass, length);
for (int i = 0; i < length; ++i) {
array[i] = elementConverter.convert(list.get(i));
}
return new GenericArrayData(array);
};
}
private static AvroToRowDataConverter createMapConverter(LogicalType type) {
final AvroToRowDataConverter keyConverter =
createConverter(DataTypes.STRING().getLogicalType());
final AvroToRowDataConverter valueConverter =
createNullableConverter(AvroSchemaConverter.extractValueTypeToAvroMap(type));
return avroObject -> {
final Map<?, ?> map = (Map<?, ?>) avroObject;
Map<Object, Object> result = new HashMap<>();
for (Map.Entry<?, ?> entry : map.entrySet()) {
Object key = keyConverter.convert(entry.getKey());
Object value = valueConverter.convert(entry.getValue());
result.put(key, value);
}
return new GenericMapData(result);
};
}
private static AvroToRowDataConverter createTimestampConverter(int precision) {
final ChronoUnit chronoUnit;
if (precision <= 3) {
chronoUnit = ChronoUnit.MILLIS;
} else if (precision <= 6) {
chronoUnit = ChronoUnit.MICROS;
} else {
throw new IllegalArgumentException(
"Avro does not support TIMESTAMP type with precision: "
+ precision
+ ", it only supports precision less than 6.");
}
return avroObject -> {
final Instant instant;
if (avroObject instanceof Long) {
instant = Instant.EPOCH.plus((Long) avroObject, chronoUnit);
} else if (avroObject instanceof Instant) {
instant = (Instant) avroObject;
} else {
JodaConverter jodaConverter = JodaConverter.getConverter();
if (jodaConverter != null) {
// joda time has only millisecond precision
instant = Instant.ofEpochMilli(jodaConverter.convertTimestamp(avroObject));
} else {
throw new IllegalArgumentException(
"Unexpected object type for TIMESTAMP logical type. Received: " + avroObject);
}
}
return TimestampData.fromInstant(instant);
};
}
private static int convertToDate(Object object) {
if (object instanceof Integer) {
return (Integer) object;
} else if (object instanceof LocalDate) {
return (int) ((LocalDate) object).toEpochDay();
} else {
JodaConverter jodaConverter = JodaConverter.getConverter();
if (jodaConverter != null) {
return (int) jodaConverter.convertDate(object);
} else {
throw new IllegalArgumentException(
"Unexpected object type for DATE logical type. Received: " + object);
}
}
}
private static int convertToTime(Object object) {
final int millis;
if (object instanceof Integer) {
millis = (Integer) object;
} else if (object instanceof LocalTime) {
millis = ((LocalTime) object).get(ChronoField.MILLI_OF_DAY);
} else {
JodaConverter jodaConverter = JodaConverter.getConverter();
if (jodaConverter != null) {
millis = jodaConverter.convertTime(object);
} else {
throw new IllegalArgumentException(
"Unexpected object type for TIME logical type. Received: " + object);
}
}
return millis;
}
private static byte[] convertToBytes(Object object) {
if (object instanceof GenericFixed) {
return ((GenericFixed) object).bytes();
} else if (object instanceof ByteBuffer) {
ByteBuffer byteBuffer = (ByteBuffer) object;
byte[] bytes = new byte[byteBuffer.remaining()];
byteBuffer.get(bytes);
return bytes;
} else {
return (byte[]) object;
}
}
/**
* Encapsulates joda optional dependency. Instantiates this class only if joda is available on the
* classpath.
*/
static class JodaConverter {
private static JodaConverter instance;
public static JodaConverter getConverter() {
if (instance != null) {
return instance;
}
try {
Class.forName(
"org.joda.time.DateTime",
false,
Thread.currentThread().getContextClassLoader());
instance = new JodaConverter();
} catch (ClassNotFoundException e) {
instance = null;
}
return instance;
}
public long convertDate(Object object) {
final org.joda.time.LocalDate value = (org.joda.time.LocalDate) object;
return value.toDate().getTime();
}
public int convertTime(Object object) {
final org.joda.time.LocalTime value = (org.joda.time.LocalTime) object;
return value.get(DateTimeFieldType.millisOfDay());
}
public long convertTimestamp(Object object) {
final DateTime value = (DateTime) object;
return value.toDate().getTime();
}
private JodaConverter() {
}
}
}