[fix] Fix map type mapping to doris type error (#267)
diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/deserialization/converter/DorisRowConverter.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/deserialization/converter/DorisRowConverter.java
index b2a49d1..afda237 100644
--- a/flink-doris-connector/src/main/java/org/apache/doris/flink/deserialization/converter/DorisRowConverter.java
+++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/deserialization/converter/DorisRowConverter.java
@@ -360,39 +360,50 @@
private static Object convertMapData(MapData map, LogicalType type) {
Map<Object, Object> result = new HashMap<>();
+ LogicalType valueType = ((MapType) type).getValueType();
+ LogicalType keyType = ((MapType) type).getKeyType();
if (map instanceof GenericMapData) {
GenericMapData gMap = (GenericMapData) map;
for (Object key : ((GenericArrayData) gMap.keyArray()).toObjectArray()) {
- result.put(key, gMap.get(key));
+
+ Object convertedKey = convertMapEntry(key, keyType);
+ Object convertedValue = convertMapEntry(gMap.get(key), valueType);
+ result.put(convertedKey, convertedValue);
}
return result;
- }
- if (map instanceof BinaryMapData) {
+ } else if (map instanceof BinaryMapData) {
BinaryMapData bMap = (BinaryMapData) map;
- LogicalType valueType = ((MapType) type).getValueType();
Map<?, ?> javaMap = bMap.toJavaMap(((MapType) type).getKeyType(), valueType);
for (Map.Entry<?, ?> entry : javaMap.entrySet()) {
- String key = entry.getKey().toString();
- if (LogicalTypeRoot.MAP.equals(valueType.getTypeRoot())) {
- result.put(key, convertMapData((MapData) entry.getValue(), valueType));
- } else if (LogicalTypeRoot.DATE.equals(valueType.getTypeRoot())) {
- result.put(
- key,
- Date.valueOf(LocalDate.ofEpochDay((Integer) entry.getValue()))
- .toString());
- } else if (LogicalTypeRoot.ARRAY.equals(valueType.getTypeRoot())) {
- result.put(key, convertArrayData((ArrayData) entry.getValue(), valueType));
- } else if (entry.getValue() instanceof TimestampData) {
- result.put(key, ((TimestampData) entry.getValue()).toTimestamp().toString());
- } else {
- result.put(key, entry.getValue().toString());
- }
+ Object convertedKey = convertMapEntry(entry.getKey(), keyType);
+ Object convertedValue = convertMapEntry(entry.getValue(), valueType);
+ result.put(convertedKey, convertedValue);
}
return result;
}
throw new UnsupportedOperationException("Unsupported map data: " + map.getClass());
}
+ /**
+ * Converts the key-value pair of MAP to the actual type.
+ *
+ * @param originValue the original value of key-value pair
+ * @param logicalType key or value logical type
+ */
+ private static Object convertMapEntry(Object originValue, LogicalType logicalType) {
+ if (LogicalTypeRoot.MAP.equals(logicalType.getTypeRoot())) {
+ return convertMapData((MapData) originValue, logicalType);
+ } else if (LogicalTypeRoot.DATE.equals(logicalType.getTypeRoot())) {
+ return Date.valueOf(LocalDate.ofEpochDay((Integer) originValue)).toString();
+ } else if (LogicalTypeRoot.ARRAY.equals(logicalType.getTypeRoot())) {
+ return convertArrayData((ArrayData) originValue, logicalType);
+ } else if (originValue instanceof TimestampData) {
+ return ((TimestampData) originValue).toTimestamp().toString();
+ } else {
+ return originValue.toString();
+ }
+ }
+
private static Object convertRowData(RowData val, int index, LogicalType type) {
RowType rowType = (RowType) type;
Map<String, Object> value = new HashMap<>();
diff --git a/flink-doris-connector/src/test/java/org/apache/doris/flink/deserialization/convert/DorisRowConverterTest.java b/flink-doris-connector/src/test/java/org/apache/doris/flink/deserialization/convert/DorisRowConverterTest.java
index d69a131..b63d033 100644
--- a/flink-doris-connector/src/test/java/org/apache/doris/flink/deserialization/convert/DorisRowConverterTest.java
+++ b/flink-doris-connector/src/test/java/org/apache/doris/flink/deserialization/convert/DorisRowConverterTest.java
@@ -21,6 +21,7 @@
import org.apache.flink.table.catalog.Column;
import org.apache.flink.table.catalog.ResolvedSchema;
import org.apache.flink.table.data.DecimalData;
+import org.apache.flink.table.data.GenericMapData;
import org.apache.flink.table.data.GenericRowData;
import org.apache.flink.table.data.StringData;
import org.apache.flink.table.data.TimestampData;
@@ -40,10 +41,11 @@
import java.time.LocalDateTime;
import java.util.ArrayList;
import java.util.Arrays;
+import java.util.HashMap;
import java.util.List;
+import java.util.Map;
public class DorisRowConverterTest implements Serializable {
-
@Test
public void testConvert() throws IOException {
ResolvedSchema schema =
@@ -67,11 +69,11 @@
DorisRowConverter converter =
new DorisRowConverter((RowType) schema.toPhysicalRowDataType().getLogicalType());
-
- LocalDateTime time1 = LocalDateTime.of(2021, 1, 1, 8, 0, 0);
- LocalDateTime time2 = LocalDateTime.of(2021, 1, 1, 8, 0, 0);
+ // Doris DatetimeV2 supports up to 6 decimal places (microseconds).
+ LocalDateTime time1 = LocalDateTime.of(2021, 1, 1, 8, 1, 1, 1000);
+ LocalDateTime time2 = LocalDateTime.of(2021, 1, 1, 8, 1, 1, 1000);
LocalDate date1 = LocalDate.of(2021, 1, 1);
- List record =
+ List<Object> record =
Arrays.asList(
null,
true,
@@ -104,7 +106,7 @@
.build();
String s = new String(serializer.serialize(rowData).getRow());
Assert.assertEquals(
- "\\N|true|1.2|1.2345|24|10|1|32|64|128|10.12|2021-01-01 08:00:00.0|2021-01-01 08:00:00.0|2021-01-01|a|doris",
+ "\\N|true|1.2|1.2345|24|10|1|32|64|128|10.12|2021-01-01 08:01:01.000001|2021-01-01 08:01:01.000001|2021-01-01|a|doris",
s);
}
@@ -130,8 +132,9 @@
Column.physical("f16", DataTypes.VARCHAR(256)));
DorisRowConverter converter =
new DorisRowConverter((RowType) schema.toPhysicalRowDataType().getLogicalType());
- LocalDateTime time1 = LocalDateTime.of(2021, 1, 1, 8, 0, 0);
- LocalDateTime time2 = LocalDateTime.of(2021, 1, 1, 8, 0, 0);
+ // Doris DatetimeV2 supports up to 6 decimal places (microseconds).
+ LocalDateTime time1 = LocalDateTime.of(2021, 1, 1, 8, 1, 1, 1000);
+ LocalDateTime time2 = LocalDateTime.of(2021, 1, 1, 8, 1, 1, 1000);
LocalDate date1 = LocalDate.of(2021, 1, 1);
GenericRowData rowData =
GenericRowData.of(
@@ -151,12 +154,198 @@
(int) date1.toEpochDay(),
StringData.fromString("a"),
StringData.fromString("doris"));
- List row = new ArrayList();
+ List<Object> row = new ArrayList<>();
for (int i = 0; i < rowData.getArity(); i++) {
row.add(converter.convertExternal(rowData, i));
}
Assert.assertEquals(
- "[null, true, 1.2, 1.2345, 24, 10, 1, 32, 64, 128, 10.123, 2021-01-01 08:00:00.0, 2021-01-01 08:00:00.0, 2021-01-01, a, doris]",
+ "[null, true, 1.2, 1.2345, 24, 10, 1, 32, 64, 128, 10.123, 2021-01-01 08:01:01.000001, 2021-01-01 08:01:01.000001, 2021-01-01, a, doris]",
row.toString());
}
+
+ @Test
+ public void testMapInternalConvert() throws IOException {
+
+ ResolvedSchema schema = getRowMapSchema();
+ DorisRowConverter converter =
+ new DorisRowConverter((RowType) schema.toPhysicalRowDataType().getLogicalType());
+ // Doris DatetimeV2 supports up to 6 decimal places (microseconds).
+ LocalDateTime time1 = LocalDateTime.of(2021, 1, 1, 8, 1, 1, 1000);
+ LocalDateTime time2 = LocalDateTime.of(2021, 1, 1, 8, 1, 1, 1000);
+ LocalDateTime time3 = LocalDateTime.of(2021, 1, 1, 8, 1, 1, 1000);
+ LocalDate date1 = LocalDate.of(2021, 1, 1);
+ Map<Boolean, Boolean> booleanMap = createMapAndPut(new HashMap<>(), true, false);
+ Map<Float, Float> floatMap = createMapAndPut(new HashMap<>(), 1.2f, 1.3f);
+ Map<Double, Double> doubleMap = createMapAndPut(new HashMap<>(), 1.2345d, 1.2345d);
+ Map<Integer, Integer> intervalYearMap = createMapAndPut(new HashMap<>(), 24, 24);
+ Map<Integer, Integer> intervalDayMap = createMapAndPut(new HashMap<>(), 10, 10);
+ Map<Byte, Byte> tinyIntMap = createMapAndPut(new HashMap<>(), (byte) 1, (byte) 1);
+ Map<Short, Short> shortIntMap = createMapAndPut(new HashMap<>(), (short) 32, (short) 32);
+ Map<Integer, Integer> intMap = createMapAndPut(new HashMap<>(), 64, 64);
+ Map<Long, Long> longMap = createMapAndPut(new HashMap<>(), 128L, 128L);
+ Map<BigDecimal, BigDecimal> decimalMap =
+ createMapAndPut(
+ new HashMap<>(), BigDecimal.valueOf(10.123), BigDecimal.valueOf(10.123));
+ Map<LocalDateTime, LocalDateTime> timestampWithZoneMap =
+ createMapAndPut(new HashMap<>(), time1, time1);
+ Map<LocalDateTime, LocalDateTime> timestampWithLocalZoneMap =
+ createMapAndPut(new HashMap<>(), time2, time2);
+ Map<LocalDateTime, LocalDateTime> timestampNoLTZ =
+ createMapAndPut(new HashMap<>(), time3, time3);
+ Map<LocalDate, LocalDate> dateMap = createMapAndPut(new HashMap<>(), date1, date1);
+ Map<Character, Character> charMap = createMapAndPut(new HashMap<>(), 'a', 'a');
+ Map<String, String> stringMap = createMapAndPut(new HashMap<>(), "doris", "doris");
+
+ List<Object> record =
+ Arrays.asList(
+ booleanMap,
+ floatMap,
+ doubleMap,
+ intervalYearMap,
+ intervalDayMap,
+ tinyIntMap,
+ shortIntMap,
+ intMap,
+ longMap,
+ decimalMap,
+ timestampWithZoneMap,
+ timestampWithLocalZoneMap,
+ timestampNoLTZ,
+ dateMap,
+ charMap,
+ stringMap);
+ GenericRowData rowData = converter.convertInternal(record);
+
+ RowDataSerializer serializer =
+ new Builder()
+ .setFieldType(schema.getColumnDataTypes().toArray(new DataType[0]))
+ .setType("csv")
+ .setFieldDelimiter("|")
+ .setFieldNames(
+ new String[] {
+ "f1", "f2", "f3", "f4", "f5", "f6", "f7", "f8", "f9", "f10",
+ "f11", "f12", "f13", "f14", "f15", "f16"
+ })
+ .build();
+ String s = new String(serializer.serialize(rowData).getRow());
+ Assert.assertEquals(
+ "{\"true\":\"false\"}|{\"1.2\":\"1.3\"}|{\"1.2345\":\"1.2345\"}|{\"24\":\"24\"}|{\"10\":\"10\"}|{\"1\":\"1\"}|{\"32\":\"32\"}|{\"64\":\"64\"}|{\"128\":\"128\"}|{\"10.12\":\"10.12\"}|{\"2021-01-01 08:01:01.000001\":\"2021-01-01 08:01:01.000001\"}|{\"2021-01-01 08:01:01.000001\":\"2021-01-01 08:01:01.000001\"}|{\"2021-01-01 08:01:01.000001\":\"2021-01-01 08:01:01.000001\"}|{\"2021-01-01\":\"2021-01-01\"}|{\"a\":\"a\"}|{\"doris\":\"doris\"}",
+ s);
+ }
+
+ @Test
+ public void testMapExternalConvert() {
+
+ ResolvedSchema schema = getRowMapSchema();
+ DorisRowConverter converter =
+ new DorisRowConverter((RowType) schema.toPhysicalRowDataType().getLogicalType());
+ // Doris DatetimeV2 supports up to 6 decimal places (microseconds).
+ LocalDateTime time1 = LocalDateTime.of(2021, 1, 1, 8, 1, 1, 1000);
+ LocalDateTime time2 = LocalDateTime.of(2021, 1, 1, 8, 1, 1, 1000);
+ LocalDateTime time3 = LocalDateTime.of(2021, 1, 1, 8, 1, 1, 1000);
+ LocalDate date1 = LocalDate.of(2021, 1, 1);
+
+ Map<Boolean, Boolean> booleanMap = createMapAndPut(new HashMap<>(), true, false);
+ Map<Float, Float> floatMap = createMapAndPut(new HashMap<>(), 1.2f, 1.3f);
+ Map<Double, Double> doubleMap = createMapAndPut(new HashMap<>(), 1.2345d, 1.2345d);
+ Map<Integer, Integer> intervalYearMap = createMapAndPut(new HashMap<>(), 24, 24);
+ Map<Integer, Integer> intervalDayMap = createMapAndPut(new HashMap<>(), 10, 10);
+ Map<Byte, Byte> tinyIntMap = createMapAndPut(new HashMap<>(), (byte) 1, (byte) 1);
+ Map<Short, Short> shortIntMap = createMapAndPut(new HashMap<>(), (short) 32, (short) 32);
+ Map<Integer, Integer> intMap = createMapAndPut(new HashMap<>(), 64, 64);
+ Map<Long, Long> longMap = createMapAndPut(new HashMap<>(), 128L, 128L);
+ Map<BigDecimal, BigDecimal> decimalMap =
+ createMapAndPut(
+ new HashMap<>(), BigDecimal.valueOf(10.123), BigDecimal.valueOf(10.123));
+ Map<TimestampData, TimestampData> timestampWithZoneMap =
+ createMapAndPut(
+ new HashMap<>(),
+ TimestampData.fromLocalDateTime(time1),
+ TimestampData.fromLocalDateTime(time1));
+ Map<TimestampData, TimestampData> timestampWithLocalZoneMap =
+ createMapAndPut(
+ new HashMap<>(),
+ TimestampData.fromLocalDateTime(time2),
+ TimestampData.fromLocalDateTime(time2));
+ Map<TimestampData, TimestampData> timestampNoLTZ =
+ createMapAndPut(
+ new HashMap<>(),
+ TimestampData.fromLocalDateTime(time3),
+ TimestampData.fromLocalDateTime(time3));
+ Map<Integer, Integer> dateMap =
+ createMapAndPut(
+ new HashMap<>(), (int) date1.toEpochDay(), (int) date1.toEpochDay());
+ Map<Character, Character> charMap = createMapAndPut(new HashMap<>(), 'a', 'a');
+ Map<String, String> stringMap = createMapAndPut(new HashMap<>(), "doris", "doris");
+ GenericRowData rowData =
+ GenericRowData.of(
+ new GenericMapData(booleanMap),
+ new GenericMapData(floatMap),
+ new GenericMapData(doubleMap),
+ new GenericMapData(intervalYearMap),
+ new GenericMapData(intervalDayMap),
+ new GenericMapData(tinyIntMap),
+ new GenericMapData(shortIntMap),
+ new GenericMapData(intMap),
+ new GenericMapData(longMap),
+ new GenericMapData(decimalMap),
+ new GenericMapData(timestampWithZoneMap),
+ new GenericMapData(timestampWithLocalZoneMap),
+ new GenericMapData(timestampNoLTZ),
+ new GenericMapData(dateMap),
+ new GenericMapData(charMap),
+ new GenericMapData(stringMap));
+
+ List<Object> row = new ArrayList<>();
+ for (int i = 0; i < rowData.getArity(); i++) {
+ row.add(converter.convertExternal(rowData, i));
+ }
+ Assert.assertEquals(
+ "[{\"true\":\"false\"}, {\"1.2\":\"1.3\"}, {\"1.2345\":\"1.2345\"}, {\"24\":\"24\"}, {\"10\":\"10\"}, {\"1\":\"1\"}, {\"32\":\"32\"}, {\"64\":\"64\"}, {\"128\":\"128\"}, {\"10.123\":\"10.123\"}, {\"2021-01-01 08:01:01.000001\":\"2021-01-01 08:01:01.000001\"}, {\"2021-01-01 08:01:01.000001\":\"2021-01-01 08:01:01.000001\"}, {\"2021-01-01 08:01:01.000001\":\"2021-01-01 08:01:01.000001\"}, {\"2021-01-01\":\"2021-01-01\"}, {\"a\":\"a\"}, {\"doris\":\"doris\"}]",
+ row.toString());
+ }
+
+ /** generate map data. */
+ public static <K, V> Map<K, V> createMapAndPut(Map<K, V> map, K key, V value) {
+ map.put(key, value);
+ return map;
+ }
+
+ public static ResolvedSchema getRowMapSchema() {
+ return ResolvedSchema.of(
+ Column.physical("f1", DataTypes.MAP(DataTypes.BOOLEAN(), DataTypes.BOOLEAN())),
+ Column.physical("f2", DataTypes.MAP(DataTypes.FLOAT(), DataTypes.FLOAT())),
+ Column.physical("f3", DataTypes.MAP(DataTypes.DOUBLE(), DataTypes.DOUBLE())),
+ Column.physical(
+ "f4",
+ DataTypes.MAP(
+ DataTypes.INTERVAL(DataTypes.YEAR()),
+ DataTypes.INTERVAL(DataTypes.YEAR()))),
+ Column.physical(
+ "f5",
+ DataTypes.MAP(
+ DataTypes.INTERVAL(DataTypes.DAY()),
+ DataTypes.INTERVAL(DataTypes.DAY()))),
+ Column.physical("f6", DataTypes.MAP(DataTypes.TINYINT(), DataTypes.TINYINT())),
+ Column.physical("f7", DataTypes.MAP(DataTypes.SMALLINT(), DataTypes.SMALLINT())),
+ Column.physical("f8", DataTypes.MAP(DataTypes.INT(), DataTypes.INT())),
+ Column.physical("f9", DataTypes.MAP(DataTypes.BIGINT(), DataTypes.BIGINT())),
+ Column.physical(
+ "f10", DataTypes.MAP(DataTypes.DECIMAL(10, 2), DataTypes.DECIMAL(10, 2))),
+ Column.physical(
+ "f11",
+ DataTypes.MAP(
+ DataTypes.TIMESTAMP_WITH_TIME_ZONE(),
+ DataTypes.TIMESTAMP_WITH_TIME_ZONE())),
+ Column.physical(
+ "f12",
+ DataTypes.MAP(
+ DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE(),
+ DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE())),
+ Column.physical("f13", DataTypes.MAP(DataTypes.TIMESTAMP(), DataTypes.TIMESTAMP())),
+ Column.physical("f14", DataTypes.MAP(DataTypes.DATE(), DataTypes.DATE())),
+ Column.physical("f15", DataTypes.MAP(DataTypes.CHAR(1), DataTypes.CHAR(1))),
+ Column.physical(
+ "f16", DataTypes.MAP(DataTypes.VARCHAR(256), DataTypes.VARCHAR(256))));
+ }
}