[Improve] support nested type fields in ArrayData (#606)

diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/deserialization/converter/DorisRowConverter.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/deserialization/converter/DorisRowConverter.java
index 130d236..ff2ee81 100644
--- a/flink-doris-connector/src/main/java/org/apache/doris/flink/deserialization/converter/DorisRowConverter.java
+++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/deserialization/converter/DorisRowConverter.java
@@ -339,26 +339,40 @@
     }
 
     private static List<Object> convertArrayData(ArrayData array, LogicalType type) {
+        LogicalType elementType = ((ArrayType) type).getElementType();
+        List<Object> values;
         if (array instanceof GenericArrayData) {
-            return Arrays.asList(((GenericArrayData) array).toObjectArray());
+            values = Arrays.asList(((GenericArrayData) array).toObjectArray());
+        } else if (array instanceof BinaryArrayData) {
+            values = Arrays.asList(((BinaryArrayData) array).toObjectArray(elementType));
+        } else {
+            throw new UnsupportedOperationException("Unsupported array data: " + array.getClass());
         }
-        if (array instanceof BinaryArrayData) {
-            LogicalType elementType = ((ArrayType) type).getElementType();
-            List<Object> values =
-                    Arrays.asList(((BinaryArrayData) array).toObjectArray(elementType));
-            if (LogicalTypeRoot.DATE.equals(elementType.getTypeRoot())) {
-                return values.stream()
-                        .map(date -> Date.valueOf(LocalDate.ofEpochDay((Integer) date)))
-                        .collect(Collectors.toList());
-            }
-            if (LogicalTypeRoot.ARRAY.equals(elementType.getTypeRoot())) {
-                return values.stream()
-                        .map(arr -> convertArrayData((ArrayData) arr, elementType))
-                        .collect(Collectors.toList());
-            }
-            return values;
+
+        if (LogicalTypeRoot.DATE.equals(elementType.getTypeRoot())) {
+            return values.stream()
+                    .map(date -> Date.valueOf(LocalDate.ofEpochDay((Integer) date)))
+                    .collect(Collectors.toList());
         }
-        throw new UnsupportedOperationException("Unsupported array data: " + array.getClass());
+        if (LogicalTypeRoot.ARRAY.equals(elementType.getTypeRoot())) {
+            return values.stream()
+                    .map(arr -> convertArrayData((ArrayData) arr, elementType))
+                    .collect(Collectors.toList());
+        }
+        if (LogicalTypeRoot.MAP.equals(elementType.getTypeRoot())) {
+            return values.stream()
+                    .map(arr -> writeValueAsString(convertMapData((MapData) arr, elementType)))
+                    .collect(Collectors.toList());
+        }
+        if (LogicalTypeRoot.ROW.equals(elementType.getTypeRoot())) {
+            return values.stream()
+                    .map(
+                            arr ->
+                                    writeValueAsString(
+                                            convertRowData(GenericRowData.of(arr), 0, elementType)))
+                    .collect(Collectors.toList());
+        }
+        return values;
     }
 
     private static Object convertMapData(MapData map, LogicalType type) {
diff --git a/flink-doris-connector/src/test/java/org/apache/doris/flink/deserialization/convert/DorisRowConverterTest.java b/flink-doris-connector/src/test/java/org/apache/doris/flink/deserialization/convert/DorisRowConverterTest.java
index 46e47d5..100356b 100644
--- a/flink-doris-connector/src/test/java/org/apache/doris/flink/deserialization/convert/DorisRowConverterTest.java
+++ b/flink-doris-connector/src/test/java/org/apache/doris/flink/deserialization/convert/DorisRowConverterTest.java
@@ -21,6 +21,7 @@
 import org.apache.flink.table.catalog.Column;
 import org.apache.flink.table.catalog.ResolvedSchema;
 import org.apache.flink.table.data.DecimalData;
+import org.apache.flink.table.data.GenericArrayData;
 import org.apache.flink.table.data.GenericMapData;
 import org.apache.flink.table.data.GenericRowData;
 import org.apache.flink.table.data.StringData;
@@ -366,4 +367,60 @@
                 Column.physical(
                         "f16", DataTypes.MAP(DataTypes.VARCHAR(256), DataTypes.VARCHAR(256))));
     }
+
+    @Test
+    public void testArrayExternalConvert() {
+        ResolvedSchema schema =
+                ResolvedSchema.of(
+                        // list with string
+                        Column.physical("f1", DataTypes.ARRAY(DataTypes.STRING())),
+                        // list with list
+                        Column.physical("f2", DataTypes.ARRAY(DataTypes.ARRAY(DataTypes.STRING()))),
+                        // list with row
+                        Column.physical(
+                                "f3",
+                                DataTypes.ARRAY(
+                                        DataTypes.ROW(
+                                                DataTypes.FIELD("l1", DataTypes.STRING()),
+                                                DataTypes.FIELD("l2", DataTypes.INT())))),
+                        // list with map
+                        Column.physical(
+                                "f4",
+                                DataTypes.ARRAY(
+                                        DataTypes.MAP(DataTypes.STRING(), DataTypes.INT()))),
+                        // list with date
+                        Column.physical("f5", DataTypes.ARRAY(DataTypes.DATE())));
+
+        DorisRowConverter converter =
+                new DorisRowConverter((RowType) schema.toPhysicalRowDataType().getLogicalType());
+
+        Map<String, Integer> mapData1 = createMapAndPut(new HashMap<>(), "hello", 1);
+        Map<String, Integer> mapData2 = createMapAndPut(new HashMap<>(), "world", 2);
+        GenericRowData rowData =
+                GenericRowData.of(
+                        new GenericArrayData(new String[] {"1", "2", "3"}),
+                        new GenericArrayData(
+                                new GenericArrayData[] {
+                                    new GenericArrayData(new String[] {"1", "2", "3"}),
+                                    new GenericArrayData(new String[] {"4", "5", "6"})
+                                }),
+                        new GenericArrayData(
+                                new GenericRowData[] {
+                                    GenericRowData.of(StringData.fromString("on"), 1),
+                                    GenericRowData.of(StringData.fromString("off"), 2)
+                                }),
+                        new GenericArrayData(
+                                new GenericMapData[] {
+                                    new GenericMapData(mapData1), new GenericMapData(mapData2)
+                                }),
+                        new GenericArrayData(new int[] {1, 2, 3}));
+
+        List<Object> row = new ArrayList<>();
+        for (int i = 0; i < rowData.getArity(); i++) {
+            row.add(converter.convertExternal(rowData, i));
+        }
+        String expected =
+                "[[1, 2, 3], [[1, 2, 3], [4, 5, 6]], [{\"l1\":\"on\",\"l2\":\"1\"}, {\"l1\":\"off\",\"l2\":\"2\"}], [{\"hello\":\"1\"}, {\"world\":\"2\"}], [1970-01-02, 1970-01-03, 1970-01-04]]";
+        Assert.assertEquals(expected, row.toString());
+    }
 }