[Improve] support nested type fields in ArrayData (#606)
diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/deserialization/converter/DorisRowConverter.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/deserialization/converter/DorisRowConverter.java
index 130d236..ff2ee81 100644
--- a/flink-doris-connector/src/main/java/org/apache/doris/flink/deserialization/converter/DorisRowConverter.java
+++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/deserialization/converter/DorisRowConverter.java
@@ -339,26 +339,40 @@
}
private static List<Object> convertArrayData(ArrayData array, LogicalType type) {
+ LogicalType elementType = ((ArrayType) type).getElementType();
+ List<Object> values;
if (array instanceof GenericArrayData) {
- return Arrays.asList(((GenericArrayData) array).toObjectArray());
+ values = Arrays.asList(((GenericArrayData) array).toObjectArray());
+ } else if (array instanceof BinaryArrayData) {
+ values = Arrays.asList(((BinaryArrayData) array).toObjectArray(elementType));
+ } else {
+ throw new UnsupportedOperationException("Unsupported array data: " + array.getClass());
}
- if (array instanceof BinaryArrayData) {
- LogicalType elementType = ((ArrayType) type).getElementType();
- List<Object> values =
- Arrays.asList(((BinaryArrayData) array).toObjectArray(elementType));
- if (LogicalTypeRoot.DATE.equals(elementType.getTypeRoot())) {
- return values.stream()
- .map(date -> Date.valueOf(LocalDate.ofEpochDay((Integer) date)))
- .collect(Collectors.toList());
- }
- if (LogicalTypeRoot.ARRAY.equals(elementType.getTypeRoot())) {
- return values.stream()
- .map(arr -> convertArrayData((ArrayData) arr, elementType))
- .collect(Collectors.toList());
- }
- return values;
+
+ if (LogicalTypeRoot.DATE.equals(elementType.getTypeRoot())) {
+ return values.stream()
+ .map(date -> Date.valueOf(LocalDate.ofEpochDay((Integer) date)))
+ .collect(Collectors.toList());
}
- throw new UnsupportedOperationException("Unsupported array data: " + array.getClass());
+ if (LogicalTypeRoot.ARRAY.equals(elementType.getTypeRoot())) {
+ return values.stream()
+ .map(arr -> convertArrayData((ArrayData) arr, elementType))
+ .collect(Collectors.toList());
+ }
+ if (LogicalTypeRoot.MAP.equals(elementType.getTypeRoot())) {
+ return values.stream()
+ .map(arr -> writeValueAsString(convertMapData((MapData) arr, elementType)))
+ .collect(Collectors.toList());
+ }
+ if (LogicalTypeRoot.ROW.equals(elementType.getTypeRoot())) {
+ return values.stream()
+ .map(
+ arr ->
+ writeValueAsString(
+ convertRowData(GenericRowData.of(arr), 0, elementType)))
+ .collect(Collectors.toList());
+ }
+ return values;
}
private static Object convertMapData(MapData map, LogicalType type) {
diff --git a/flink-doris-connector/src/test/java/org/apache/doris/flink/deserialization/convert/DorisRowConverterTest.java b/flink-doris-connector/src/test/java/org/apache/doris/flink/deserialization/convert/DorisRowConverterTest.java
index 46e47d5..100356b 100644
--- a/flink-doris-connector/src/test/java/org/apache/doris/flink/deserialization/convert/DorisRowConverterTest.java
+++ b/flink-doris-connector/src/test/java/org/apache/doris/flink/deserialization/convert/DorisRowConverterTest.java
@@ -21,6 +21,7 @@
import org.apache.flink.table.catalog.Column;
import org.apache.flink.table.catalog.ResolvedSchema;
import org.apache.flink.table.data.DecimalData;
+import org.apache.flink.table.data.GenericArrayData;
import org.apache.flink.table.data.GenericMapData;
import org.apache.flink.table.data.GenericRowData;
import org.apache.flink.table.data.StringData;
@@ -366,4 +367,60 @@
Column.physical(
"f16", DataTypes.MAP(DataTypes.VARCHAR(256), DataTypes.VARCHAR(256))));
}
+
+ @Test
+ public void testArrayExternalConvert() {
+ ResolvedSchema schema =
+ ResolvedSchema.of(
+ // list with string
+ Column.physical("f1", DataTypes.ARRAY(DataTypes.STRING())),
+ // list with list
+ Column.physical("f2", DataTypes.ARRAY(DataTypes.ARRAY(DataTypes.STRING()))),
+ // list with row
+ Column.physical(
+ "f3",
+ DataTypes.ARRAY(
+ DataTypes.ROW(
+ DataTypes.FIELD("l1", DataTypes.STRING()),
+ DataTypes.FIELD("l2", DataTypes.INT())))),
+ // list with map
+ Column.physical(
+ "f4",
+ DataTypes.ARRAY(
+ DataTypes.MAP(DataTypes.STRING(), DataTypes.INT()))),
+ // list with date
+ Column.physical("f5", DataTypes.ARRAY(DataTypes.DATE())));
+
+ DorisRowConverter converter =
+ new DorisRowConverter((RowType) schema.toPhysicalRowDataType().getLogicalType());
+
+ Map<String, Integer> mapData1 = createMapAndPut(new HashMap<>(), "hello", 1);
+ Map<String, Integer> mapData2 = createMapAndPut(new HashMap<>(), "world", 2);
+ GenericRowData rowData =
+ GenericRowData.of(
+ new GenericArrayData(new String[] {"1", "2", "3"}),
+ new GenericArrayData(
+ new GenericArrayData[] {
+ new GenericArrayData(new String[] {"1", "2", "3"}),
+ new GenericArrayData(new String[] {"4", "5", "6"})
+ }),
+ new GenericArrayData(
+ new GenericRowData[] {
+ GenericRowData.of(StringData.fromString("on"), 1),
+ GenericRowData.of(StringData.fromString("off"), 2)
+ }),
+ new GenericArrayData(
+ new GenericMapData[] {
+ new GenericMapData(mapData1), new GenericMapData(mapData2)
+ }),
+ new GenericArrayData(new int[] {1, 2, 3}));
+
+ List<Object> row = new ArrayList<>();
+ for (int i = 0; i < rowData.getArity(); i++) {
+ row.add(converter.convertExternal(rowData, i));
+ }
+ String expected =
+ "[[1, 2, 3], [[1, 2, 3], [4, 5, 6]], [{\"l1\":\"on\",\"l2\":\"1\"}, {\"l1\":\"off\",\"l2\":\"2\"}], [{\"hello\":\"1\"}, {\"world\":\"2\"}], [1970-01-02, 1970-01-03, 1970-01-04]]";
+ Assert.assertEquals(expected, row.toString());
+ }
}