blob: de452c7de9530c6e8ab61dae8eafe8955e1deb4f [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.seatunnel.translation.flink.utils;
import org.apache.seatunnel.api.table.type.ArrayType;
import org.apache.seatunnel.api.table.type.BasicType;
import org.apache.seatunnel.api.table.type.DecimalType;
import org.apache.seatunnel.api.table.type.LocalTimeType;
import org.apache.seatunnel.api.table.type.MapType;
import org.apache.seatunnel.api.table.type.PrimitiveByteArrayType;
import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
import org.apache.flink.api.common.typeinfo.BasicArrayTypeInfo;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.LocalTimeTypeInfo;
import org.apache.flink.api.common.typeinfo.PrimitiveArrayTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.typeutils.MapTypeInfo;
import org.apache.flink.api.java.typeutils.RowTypeInfo;
import org.apache.flink.table.runtime.typeutils.BigDecimalTypeInfo;
import java.math.BigDecimal;
import java.time.LocalDate;
import java.time.LocalDateTime;
import java.time.LocalTime;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;
@SuppressWarnings("checkstyle:MagicNumber")
public class TypeConverterUtils {
private static final Map<Class<?>, BridgedType> BRIDGED_TYPES = new HashMap<>(32);
static {
// basic types
BRIDGED_TYPES.put(String.class, BridgedType.of(BasicType.STRING_TYPE, BasicTypeInfo.STRING_TYPE_INFO));
BRIDGED_TYPES.put(Boolean.class, BridgedType.of(BasicType.BOOLEAN_TYPE, BasicTypeInfo.BOOLEAN_TYPE_INFO));
BRIDGED_TYPES.put(Byte.class, BridgedType.of(BasicType.BYTE_TYPE, BasicTypeInfo.BYTE_TYPE_INFO));
BRIDGED_TYPES.put(Short.class, BridgedType.of(BasicType.SHORT_TYPE, BasicTypeInfo.SHORT_TYPE_INFO));
BRIDGED_TYPES.put(Integer.class, BridgedType.of(BasicType.INT_TYPE, BasicTypeInfo.INT_TYPE_INFO));
BRIDGED_TYPES.put(Long.class, BridgedType.of(BasicType.LONG_TYPE, BasicTypeInfo.LONG_TYPE_INFO));
BRIDGED_TYPES.put(Float.class, BridgedType.of(BasicType.FLOAT_TYPE, BasicTypeInfo.FLOAT_TYPE_INFO));
BRIDGED_TYPES.put(Double.class, BridgedType.of(BasicType.DOUBLE_TYPE, BasicTypeInfo.DOUBLE_TYPE_INFO));
BRIDGED_TYPES.put(Void.class, BridgedType.of(BasicType.VOID_TYPE, BasicTypeInfo.VOID_TYPE_INFO));
BRIDGED_TYPES.put(BigDecimal.class, BridgedType.of(new DecimalType(38, 18), BasicTypeInfo.BIG_DEC_TYPE_INFO));
// data time types
BRIDGED_TYPES.put(LocalDate.class, BridgedType.of(LocalTimeType.LOCAL_DATE_TYPE, LocalTimeTypeInfo.LOCAL_DATE));
BRIDGED_TYPES.put(LocalTime.class, BridgedType.of(LocalTimeType.LOCAL_TIME_TYPE, LocalTimeTypeInfo.LOCAL_TIME));
BRIDGED_TYPES.put(LocalDateTime.class, BridgedType.of(LocalTimeType.LOCAL_DATE_TIME_TYPE, LocalTimeTypeInfo.LOCAL_DATE_TIME));
// basic array types
BRIDGED_TYPES.put(byte[].class, BridgedType.of(PrimitiveByteArrayType.INSTANCE, PrimitiveArrayTypeInfo.BYTE_PRIMITIVE_ARRAY_TYPE_INFO));
BRIDGED_TYPES.put(String[].class, BridgedType.of(ArrayType.STRING_ARRAY_TYPE, BasicArrayTypeInfo.STRING_ARRAY_TYPE_INFO));
BRIDGED_TYPES.put(Boolean[].class, BridgedType.of(ArrayType.BOOLEAN_ARRAY_TYPE, BasicArrayTypeInfo.BOOLEAN_ARRAY_TYPE_INFO));
BRIDGED_TYPES.put(Byte[].class, BridgedType.of(ArrayType.BYTE_ARRAY_TYPE, BasicArrayTypeInfo.BYTE_ARRAY_TYPE_INFO));
BRIDGED_TYPES.put(Short[].class, BridgedType.of(ArrayType.SHORT_ARRAY_TYPE, BasicArrayTypeInfo.SHORT_ARRAY_TYPE_INFO));
BRIDGED_TYPES.put(Integer[].class, BridgedType.of(ArrayType.INT_ARRAY_TYPE, BasicArrayTypeInfo.INT_ARRAY_TYPE_INFO));
BRIDGED_TYPES.put(Long[].class, BridgedType.of(ArrayType.LONG_ARRAY_TYPE, BasicArrayTypeInfo.LONG_ARRAY_TYPE_INFO));
BRIDGED_TYPES.put(Float[].class, BridgedType.of(ArrayType.FLOAT_ARRAY_TYPE, BasicArrayTypeInfo.FLOAT_ARRAY_TYPE_INFO));
BRIDGED_TYPES.put(Double[].class, BridgedType.of(ArrayType.DOUBLE_ARRAY_TYPE, BasicArrayTypeInfo.DOUBLE_ARRAY_TYPE_INFO));
}
private TypeConverterUtils() {
throw new UnsupportedOperationException("TypeConverterUtils is a utility class and cannot be instantiated");
}
public static SeaTunnelDataType<?> convert(TypeInformation<?> dataType) {
BridgedType bridgedType = BRIDGED_TYPES.get(dataType.getTypeClass());
if (bridgedType != null) {
return bridgedType.getSeaTunnelType();
}
if (dataType instanceof BigDecimalTypeInfo) {
BigDecimalTypeInfo decimalType = (BigDecimalTypeInfo) dataType;
return new DecimalType(decimalType.precision(), decimalType.scale());
}
if (dataType instanceof MapTypeInfo) {
MapTypeInfo<?, ?> mapTypeInfo = (MapTypeInfo<?, ?>) dataType;
return new MapType<>(convert(mapTypeInfo.getKeyTypeInfo()), convert(mapTypeInfo.getValueTypeInfo()));
}
if (dataType instanceof RowTypeInfo) {
RowTypeInfo typeInformation = (RowTypeInfo) dataType;
String[] fieldNames = typeInformation.getFieldNames();
SeaTunnelDataType<?>[] seaTunnelDataTypes = Arrays.stream(typeInformation.getFieldTypes())
.map(TypeConverterUtils::convert).toArray(SeaTunnelDataType[]::new);
return new SeaTunnelRowType(fieldNames, seaTunnelDataTypes);
}
throw new IllegalArgumentException("Unsupported Flink's data type: " + dataType);
}
public static TypeInformation<?> convert(SeaTunnelDataType<?> dataType) {
BridgedType bridgedType = BRIDGED_TYPES.get(dataType.getTypeClass());
if (bridgedType != null) {
return bridgedType.getFlinkType();
}
if (dataType instanceof DecimalType) {
DecimalType decimalType = (DecimalType) dataType;
return new BigDecimalTypeInfo(decimalType.getPrecision(), decimalType.getScale());
}
if (dataType instanceof MapType) {
MapType<?, ?> mapType = (MapType<?, ?>) dataType;
return new MapTypeInfo<>(convert(mapType.getKeyType()), convert(mapType.getValueType()));
}
if (dataType instanceof SeaTunnelRowType) {
SeaTunnelRowType rowType = (SeaTunnelRowType) dataType;
TypeInformation<?>[] types = Arrays.stream(rowType.getFieldTypes())
.map(TypeConverterUtils::convert).toArray(TypeInformation[]::new);
return new RowTypeInfo(types, rowType.getFieldNames());
}
throw new IllegalArgumentException("Unsupported SeaTunnel's data type: " + dataType);
}
public static class BridgedType {
private final SeaTunnelDataType<?> seaTunnelType;
private final TypeInformation<?> flinkType;
private BridgedType(SeaTunnelDataType<?> seaTunnelType, TypeInformation<?> flinkType) {
this.seaTunnelType = seaTunnelType;
this.flinkType = flinkType;
}
public static BridgedType of(SeaTunnelDataType<?> seaTunnelType, TypeInformation<?> flinkType) {
return new BridgedType(seaTunnelType, flinkType);
}
public TypeInformation<?> getFlinkType() {
return flinkType;
}
public SeaTunnelDataType<?> getSeaTunnelType() {
return seaTunnelType;
}
}
}