[catalog] add array map json type for flink catalog (#291)

diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/catalog/DorisCatalog.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/catalog/DorisCatalog.java
index 99ca0a4..96518d3 100644
--- a/flink-doris-connector/src/main/java/org/apache/doris/flink/catalog/DorisCatalog.java
+++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/catalog/DorisCatalog.java
@@ -283,7 +283,6 @@
                 String columnType = resultSet.getString("DATA_TYPE");
                 long columnSize = resultSet.getLong("COLUMN_SIZE");
                 long columnDigit = resultSet.getLong("DECIMAL_DIGITS");
-
                 DataType flinkType =
                         DorisTypeMapper.toFlinkType(
                                 columnName, columnType, (int) columnSize, (int) columnDigit);
diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/catalog/DorisTypeMapper.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/catalog/DorisTypeMapper.java
index c388700..cc5fe4b 100644
--- a/flink-doris-connector/src/main/java/org/apache/doris/flink/catalog/DorisTypeMapper.java
+++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/catalog/DorisTypeMapper.java
@@ -40,6 +40,7 @@
 
 import org.apache.doris.flink.catalog.doris.DorisType;
 
+import static org.apache.doris.flink.catalog.doris.DorisType.ARRAY;
 import static org.apache.doris.flink.catalog.doris.DorisType.BIGINT;
 import static org.apache.doris.flink.catalog.doris.DorisType.BOOLEAN;
 import static org.apache.doris.flink.catalog.doris.DorisType.CHAR;
@@ -52,10 +53,13 @@
 import static org.apache.doris.flink.catalog.doris.DorisType.DOUBLE;
 import static org.apache.doris.flink.catalog.doris.DorisType.FLOAT;
 import static org.apache.doris.flink.catalog.doris.DorisType.INT;
+import static org.apache.doris.flink.catalog.doris.DorisType.JSON;
 import static org.apache.doris.flink.catalog.doris.DorisType.JSONB;
 import static org.apache.doris.flink.catalog.doris.DorisType.LARGEINT;
+import static org.apache.doris.flink.catalog.doris.DorisType.MAP;
 import static org.apache.doris.flink.catalog.doris.DorisType.SMALLINT;
 import static org.apache.doris.flink.catalog.doris.DorisType.STRING;
+import static org.apache.doris.flink.catalog.doris.DorisType.STRUCT;
 import static org.apache.doris.flink.catalog.doris.DorisType.TINYINT;
 import static org.apache.doris.flink.catalog.doris.DorisType.VARCHAR;
 
@@ -101,6 +105,12 @@
             case LARGEINT:
             case STRING:
             case JSONB:
+            case JSON:
+                // Currently, the subtype of the generic cannot be obtained,
+                // so it is mapped to string
+            case ARRAY:
+            case MAP:
+            case STRUCT:
                 return DataTypes.STRING();
             case DATE:
             case DATE_V2:
diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/catalog/doris/DorisType.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/catalog/doris/DorisType.java
index d242320..3779143 100644
--- a/flink-doris-connector/src/main/java/org/apache/doris/flink/catalog/doris/DorisType.java
+++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/catalog/doris/DorisType.java
@@ -39,4 +39,7 @@
     public static final String BITMAP = "BITMAP";
     public static final String ARRAY = "ARRAY";
     public static final String JSONB = "JSONB";
+    public static final String JSON = "JSON";
+    public static final String MAP = "MAP";
+    public static final String STRUCT = "STRUCT";
 }
diff --git a/flink-doris-connector/src/test/java/org/apache/doris/flink/catalog/CatalogExample.java b/flink-doris-connector/src/test/java/org/apache/doris/flink/catalog/CatalogExample.java
new file mode 100644
index 0000000..1d7cf1d
--- /dev/null
+++ b/flink-doris-connector/src/test/java/org/apache/doris/flink/catalog/CatalogExample.java
@@ -0,0 +1,50 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.flink.catalog;
+
+import org.apache.flink.api.common.RuntimeExecutionMode;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.table.api.Table;
+import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
+import org.apache.flink.types.Row;
+
+public class CatalogExample {
+
+    public static void main(String[] args) throws Exception {
+        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+        env.setParallelism(1);
+        env.setRuntimeMode(RuntimeExecutionMode.BATCH);
+        final StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
+        tEnv.executeSql(
+                "CREATE CATALOG doris_catalog WITH(\n"
+                        + "'type' = 'doris',\n"
+                        + "'default-database' = 'test',\n"
+                        + "'username' = 'root',\n"
+                        + "'password' = '',\n"
+                        + "'fenodes' = '1127.0.0.1:8030',\n"
+                        + "'jdbc-url' = 'jdbc:mysql://127.0.0.1:9030',\n"
+                        + "'sink.label-prefix' = 'label'\n"
+                        + ")");
+        // define a dynamic aggregating query
+        final Table result = tEnv.sqlQuery("SELECT * from doris_catalog.test.type_test");
+
+        // print the result to the console
+        tEnv.toRetractStream(result, Row.class).print();
+        env.execute();
+    }
+}