blob: afda237efefc0300a4eb5c3e56e627beaf40ea19 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package org.apache.doris.flink.deserialization.converter;
import org.apache.flink.table.data.ArrayData;
import org.apache.flink.table.data.DecimalData;
import org.apache.flink.table.data.GenericArrayData;
import org.apache.flink.table.data.GenericMapData;
import org.apache.flink.table.data.GenericRowData;
import org.apache.flink.table.data.MapData;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.data.StringData;
import org.apache.flink.table.data.TimestampData;
import org.apache.flink.table.data.binary.BinaryArrayData;
import org.apache.flink.table.data.binary.BinaryMapData;
import org.apache.flink.table.types.DataType;
import org.apache.flink.table.types.logical.ArrayType;
import org.apache.flink.table.types.logical.DecimalType;
import org.apache.flink.table.types.logical.LocalZonedTimestampType;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.LogicalTypeRoot;
import org.apache.flink.table.types.logical.MapType;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.table.types.logical.TimestampType;
import org.apache.flink.table.types.logical.ZonedTimestampType;
import org.apache.flink.util.Preconditions;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.doris.flink.serialization.RowBatch;
import java.io.Serializable;
import java.math.BigDecimal;
import java.sql.Date;
import java.time.LocalDate;
import java.time.LocalDateTime;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import static org.apache.flink.util.Preconditions.checkNotNull;
public class DorisRowConverter implements Serializable {
private static final long serialVersionUID = 1L;
private DeserializationConverter[] deserializationConverters;
private SerializationConverter[] serializationConverters;
private static final ObjectMapper objectMapper = new ObjectMapper();
public DorisRowConverter() {}
public DorisRowConverter(RowType rowType) {
checkNotNull(rowType);
this.deserializationConverters = new DeserializationConverter[rowType.getFieldCount()];
this.serializationConverters = new SerializationConverter[rowType.getFieldCount()];
for (int i = 0; i < rowType.getFieldCount(); i++) {
deserializationConverters[i] = createNullableInternalConverter(rowType.getTypeAt(i));
serializationConverters[i] = createNullableExternalConverter(rowType.getTypeAt(i));
}
}
public DorisRowConverter(DataType[] dataTypes) {
checkNotNull(dataTypes);
this.deserializationConverters = new DeserializationConverter[dataTypes.length];
this.serializationConverters = new SerializationConverter[dataTypes.length];
for (int i = 0; i < dataTypes.length; i++) {
LogicalType logicalType = dataTypes[i].getLogicalType();
deserializationConverters[i] = createNullableInternalConverter(logicalType);
serializationConverters[i] = createNullableExternalConverter(logicalType);
}
}
public DorisRowConverter setExternalConverter(DataType[] dataTypes) {
checkNotNull(dataTypes);
this.serializationConverters = new SerializationConverter[dataTypes.length];
for (int i = 0; i < dataTypes.length; i++) {
LogicalType logicalType = dataTypes[i].getLogicalType();
serializationConverters[i] = createNullableExternalConverter(logicalType);
}
return this;
}
/**
* Convert data retrieved from {@link RowBatch} to {@link RowData}.
*
* @param record from rowBatch
*/
public GenericRowData convertInternal(List record) {
GenericRowData rowData = new GenericRowData(deserializationConverters.length);
for (int i = 0; i < deserializationConverters.length; i++) {
rowData.setField(i, deserializationConverters[i].deserialize(record.get(i)));
}
return rowData;
}
/**
* Convert data from {@link RowData} to {@link RowBatch}.
*
* @param rowData record from flink rowdata
* @param index the field index
* @return java type value.
*/
public Object convertExternal(RowData rowData, int index) {
return serializationConverters[index].serialize(index, rowData);
}
/**
* Create a nullable runtime {@link DeserializationConverter} from given {@link LogicalType}.
*/
public DeserializationConverter createNullableInternalConverter(LogicalType type) {
return wrapIntoNullableInternalConverter(createInternalConverter(type));
}
public DeserializationConverter wrapIntoNullableInternalConverter(
DeserializationConverter deserializationConverter) {
return val -> {
if (val == null) {
return null;
} else {
return deserializationConverter.deserialize(val);
}
};
}
public static SerializationConverter createNullableExternalConverter(LogicalType type) {
return wrapIntoNullableExternalConverter(createExternalConverter(type));
}
public static SerializationConverter wrapIntoNullableExternalConverter(
SerializationConverter serializationConverter) {
return (index, val) -> {
if (val == null || val.isNullAt(index)) {
return null;
} else {
return serializationConverter.serialize(index, val);
}
};
}
/** Runtime converter to convert doris field to {@link RowData} type object. */
@FunctionalInterface
public interface DeserializationConverter extends Serializable {
/**
* Convert a doris field object of {@link RowBatch } to the data structure object.
*
* @param field
*/
Object deserialize(Object field);
}
/** Runtime converter to convert {@link RowData} type object to doris field. */
@FunctionalInterface
public interface SerializationConverter extends Serializable {
Object serialize(int index, RowData field);
}
public DeserializationConverter createInternalConverter(LogicalType type) {
switch (type.getTypeRoot()) {
case NULL:
return val -> null;
case BOOLEAN:
case FLOAT:
case DOUBLE:
case INTERVAL_YEAR_MONTH:
case INTERVAL_DAY_TIME:
case TINYINT:
case SMALLINT:
case INTEGER:
case BIGINT:
return val -> val;
case DECIMAL:
final int precision = ((DecimalType) type).getPrecision();
final int scale = ((DecimalType) type).getScale();
return val -> DecimalData.fromBigDecimal((BigDecimal) val, precision, scale);
case TIMESTAMP_WITH_TIME_ZONE:
case TIMESTAMP_WITHOUT_TIME_ZONE:
case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
return val -> {
if (val instanceof LocalDateTime) {
return TimestampData.fromLocalDateTime((LocalDateTime) val);
} else {
throw new UnsupportedOperationException(
"timestamp type must be java.time.LocalDateTime, the actual type is: "
+ val.getClass().getName());
}
};
case DATE:
return val -> {
if (val instanceof LocalDate) {
// doris source
return (int) ((LocalDate) val).toEpochDay();
} else if (val instanceof Date) {
// doris lookup
return (int) ((Date) val).toLocalDate().toEpochDay();
} else {
throw new UnsupportedOperationException(
"timestamp type must be java.time.LocalDate, the actual type is: "
+ val.getClass());
}
};
case CHAR:
case VARCHAR:
return val -> StringData.fromString(val.toString());
case TIME_WITHOUT_TIME_ZONE:
case BINARY:
case VARBINARY:
case ARRAY:
return val -> convertArrayData(((List<?>) val).toArray(), type);
case ROW:
return val -> convertRowData((Map<String, ?>) val, type);
case MAP:
return val -> convertMapData((Map) val, type);
case MULTISET:
case RAW:
default:
throw new UnsupportedOperationException("Unsupported type:" + type);
}
}
public static SerializationConverter createExternalConverter(LogicalType type) {
switch (type.getTypeRoot()) {
case NULL:
return ((index, val) -> null);
case CHAR:
case VARCHAR:
return (index, val) -> val.getString(index).toString();
case BOOLEAN:
return (index, val) -> val.getBoolean(index);
case BINARY:
case VARBINARY:
return (index, val) -> val.getBinary(index);
case DECIMAL:
final int decimalPrecision = ((DecimalType) type).getPrecision();
final int decimalScale = ((DecimalType) type).getScale();
return (index, val) ->
val.getDecimal(index, decimalPrecision, decimalScale).toBigDecimal();
case TINYINT:
return (index, val) -> val.getByte(index);
case SMALLINT:
return (index, val) -> val.getShort(index);
case INTEGER:
case INTERVAL_YEAR_MONTH:
case INTERVAL_DAY_TIME:
return (index, val) -> val.getInt(index);
case BIGINT:
return (index, val) -> val.getLong(index);
case FLOAT:
return (index, val) -> val.getFloat(index);
case DOUBLE:
return (index, val) -> val.getDouble(index);
case DATE:
return (index, val) -> Date.valueOf(LocalDate.ofEpochDay(val.getInt(index)));
case TIMESTAMP_WITHOUT_TIME_ZONE:
final int timestampPrecision = ((TimestampType) type).getPrecision();
return (index, val) -> val.getTimestamp(index, timestampPrecision).toTimestamp();
case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
final int localP = ((LocalZonedTimestampType) type).getPrecision();
return (index, val) -> val.getTimestamp(index, localP).toTimestamp();
case TIMESTAMP_WITH_TIME_ZONE:
final int zonedP = ((ZonedTimestampType) type).getPrecision();
return (index, val) -> val.getTimestamp(index, zonedP).toTimestamp();
case ARRAY:
return (index, val) -> convertArrayData(val.getArray(index), type);
case MAP:
return (index, val) -> writeValueAsString(convertMapData(val.getMap(index), type));
case ROW:
return (index, val) -> writeValueAsString(convertRowData(val, index, type));
case MULTISET:
case STRUCTURED_TYPE:
case DISTINCT_TYPE:
case RAW:
case SYMBOL:
case UNRESOLVED:
default:
throw new UnsupportedOperationException("Unsupported type:" + type);
}
}
private ArrayData convertArrayData(Object[] array, LogicalType type) {
List<LogicalType> children = type.getChildren();
Preconditions.checkState(children.size() > 0, "Failed to obtain the item type of array");
DeserializationConverter converter = createNullableInternalConverter(children.get(0));
for (int i = 0; i < array.length; i++) {
Object result = converter.deserialize(array[i]);
array[i] = result;
}
GenericArrayData arrayData = new GenericArrayData(array);
return arrayData;
}
private MapData convertMapData(Map<Object, Object> map, LogicalType type) {
MapType mapType = (MapType) type;
DeserializationConverter keyConverter =
createNullableInternalConverter(mapType.getKeyType());
DeserializationConverter valueConverter =
createNullableInternalConverter(mapType.getValueType());
Map<Object, Object> result = new HashMap<>();
for (Map.Entry<Object, Object> entry : map.entrySet()) {
Object key = keyConverter.deserialize(entry.getKey());
Object value = valueConverter.deserialize(entry.getValue());
result.put(key, value);
}
GenericMapData mapData = new GenericMapData(result);
return mapData;
}
private RowData convertRowData(Map<String, ?> row, LogicalType type) {
RowType rowType = (RowType) type;
GenericRowData rowData = new GenericRowData(row.size());
int index = 0;
for (Map.Entry<String, ?> entry : row.entrySet()) {
DeserializationConverter converter =
createNullableInternalConverter(rowType.getTypeAt(index));
Object value = converter.deserialize(entry.getValue());
rowData.setField(index, value);
index++;
}
return rowData;
}
private static List<Object> convertArrayData(ArrayData array, LogicalType type) {
if (array instanceof GenericArrayData) {
return Arrays.asList(((GenericArrayData) array).toObjectArray());
}
if (array instanceof BinaryArrayData) {
LogicalType elementType = ((ArrayType) type).getElementType();
List<Object> values =
Arrays.asList(((BinaryArrayData) array).toObjectArray(elementType));
if (LogicalTypeRoot.DATE.equals(elementType.getTypeRoot())) {
return values.stream()
.map(date -> Date.valueOf(LocalDate.ofEpochDay((Integer) date)))
.collect(Collectors.toList());
}
if (LogicalTypeRoot.ARRAY.equals(elementType.getTypeRoot())) {
return values.stream()
.map(arr -> convertArrayData((ArrayData) arr, elementType))
.collect(Collectors.toList());
}
return values;
}
throw new UnsupportedOperationException("Unsupported array data: " + array.getClass());
}
private static Object convertMapData(MapData map, LogicalType type) {
Map<Object, Object> result = new HashMap<>();
LogicalType valueType = ((MapType) type).getValueType();
LogicalType keyType = ((MapType) type).getKeyType();
if (map instanceof GenericMapData) {
GenericMapData gMap = (GenericMapData) map;
for (Object key : ((GenericArrayData) gMap.keyArray()).toObjectArray()) {
Object convertedKey = convertMapEntry(key, keyType);
Object convertedValue = convertMapEntry(gMap.get(key), valueType);
result.put(convertedKey, convertedValue);
}
return result;
} else if (map instanceof BinaryMapData) {
BinaryMapData bMap = (BinaryMapData) map;
Map<?, ?> javaMap = bMap.toJavaMap(((MapType) type).getKeyType(), valueType);
for (Map.Entry<?, ?> entry : javaMap.entrySet()) {
Object convertedKey = convertMapEntry(entry.getKey(), keyType);
Object convertedValue = convertMapEntry(entry.getValue(), valueType);
result.put(convertedKey, convertedValue);
}
return result;
}
throw new UnsupportedOperationException("Unsupported map data: " + map.getClass());
}
/**
* Converts the key-value pair of MAP to the actual type.
*
* @param originValue the original value of key-value pair
* @param logicalType key or value logical type
*/
private static Object convertMapEntry(Object originValue, LogicalType logicalType) {
if (LogicalTypeRoot.MAP.equals(logicalType.getTypeRoot())) {
return convertMapData((MapData) originValue, logicalType);
} else if (LogicalTypeRoot.DATE.equals(logicalType.getTypeRoot())) {
return Date.valueOf(LocalDate.ofEpochDay((Integer) originValue)).toString();
} else if (LogicalTypeRoot.ARRAY.equals(logicalType.getTypeRoot())) {
return convertArrayData((ArrayData) originValue, logicalType);
} else if (originValue instanceof TimestampData) {
return ((TimestampData) originValue).toTimestamp().toString();
} else {
return originValue.toString();
}
}
private static Object convertRowData(RowData val, int index, LogicalType type) {
RowType rowType = (RowType) type;
Map<String, Object> value = new HashMap<>();
RowData row = val.getRow(index, rowType.getFieldCount());
List<RowType.RowField> fields = rowType.getFields();
for (int i = 0; i < fields.size(); i++) {
RowType.RowField rowField = fields.get(i);
SerializationConverter converter = createNullableExternalConverter(rowField.getType());
Object valTmp = converter.serialize(i, row);
value.put(rowField.getName(), valTmp.toString());
}
return value;
}
private static String writeValueAsString(Object object) {
try {
return objectMapper.writeValueAsString(object);
} catch (JsonProcessingException e) {
throw new RuntimeException(e);
}
}
}