[catalog] add array map json type for flink catalog (#291)
diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/catalog/DorisCatalog.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/catalog/DorisCatalog.java
index 99ca0a4..96518d3 100644
--- a/flink-doris-connector/src/main/java/org/apache/doris/flink/catalog/DorisCatalog.java
+++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/catalog/DorisCatalog.java
@@ -283,7 +283,6 @@
String columnType = resultSet.getString("DATA_TYPE");
long columnSize = resultSet.getLong("COLUMN_SIZE");
long columnDigit = resultSet.getLong("DECIMAL_DIGITS");
-
DataType flinkType =
DorisTypeMapper.toFlinkType(
columnName, columnType, (int) columnSize, (int) columnDigit);
diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/catalog/DorisTypeMapper.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/catalog/DorisTypeMapper.java
index c388700..cc5fe4b 100644
--- a/flink-doris-connector/src/main/java/org/apache/doris/flink/catalog/DorisTypeMapper.java
+++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/catalog/DorisTypeMapper.java
@@ -40,6 +40,7 @@
import org.apache.doris.flink.catalog.doris.DorisType;
+import static org.apache.doris.flink.catalog.doris.DorisType.ARRAY;
import static org.apache.doris.flink.catalog.doris.DorisType.BIGINT;
import static org.apache.doris.flink.catalog.doris.DorisType.BOOLEAN;
import static org.apache.doris.flink.catalog.doris.DorisType.CHAR;
@@ -52,10 +53,13 @@
import static org.apache.doris.flink.catalog.doris.DorisType.DOUBLE;
import static org.apache.doris.flink.catalog.doris.DorisType.FLOAT;
import static org.apache.doris.flink.catalog.doris.DorisType.INT;
+import static org.apache.doris.flink.catalog.doris.DorisType.JSON;
import static org.apache.doris.flink.catalog.doris.DorisType.JSONB;
import static org.apache.doris.flink.catalog.doris.DorisType.LARGEINT;
+import static org.apache.doris.flink.catalog.doris.DorisType.MAP;
import static org.apache.doris.flink.catalog.doris.DorisType.SMALLINT;
import static org.apache.doris.flink.catalog.doris.DorisType.STRING;
+import static org.apache.doris.flink.catalog.doris.DorisType.STRUCT;
import static org.apache.doris.flink.catalog.doris.DorisType.TINYINT;
import static org.apache.doris.flink.catalog.doris.DorisType.VARCHAR;
@@ -101,6 +105,12 @@
case LARGEINT:
case STRING:
case JSONB:
+ case JSON:
+ // Currently, the subtype of the generic cannot be obtained,
+ // so it is mapped to string
+ case ARRAY:
+ case MAP:
+ case STRUCT:
return DataTypes.STRING();
case DATE:
case DATE_V2:
diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/catalog/doris/DorisType.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/catalog/doris/DorisType.java
index d242320..3779143 100644
--- a/flink-doris-connector/src/main/java/org/apache/doris/flink/catalog/doris/DorisType.java
+++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/catalog/doris/DorisType.java
@@ -39,4 +39,7 @@
public static final String BITMAP = "BITMAP";
public static final String ARRAY = "ARRAY";
public static final String JSONB = "JSONB";
+ public static final String JSON = "JSON";
+ public static final String MAP = "MAP";
+ public static final String STRUCT = "STRUCT";
}
diff --git a/flink-doris-connector/src/test/java/org/apache/doris/flink/catalog/CatalogExample.java b/flink-doris-connector/src/test/java/org/apache/doris/flink/catalog/CatalogExample.java
new file mode 100644
index 0000000..1d7cf1d
--- /dev/null
+++ b/flink-doris-connector/src/test/java/org/apache/doris/flink/catalog/CatalogExample.java
@@ -0,0 +1,50 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.flink.catalog;
+
+import org.apache.flink.api.common.RuntimeExecutionMode;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.table.api.Table;
+import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
+import org.apache.flink.types.Row;
+
+public class CatalogExample {
+
+ public static void main(String[] args) throws Exception {
+ final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+ env.setParallelism(1);
+ env.setRuntimeMode(RuntimeExecutionMode.BATCH);
+ final StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
+ tEnv.executeSql(
+ "CREATE CATALOG doris_catalog WITH(\n"
+ + "'type' = 'doris',\n"
+ + "'default-database' = 'test',\n"
+ + "'username' = 'root',\n"
+ + "'password' = '',\n"
+ + "'fenodes' = '1127.0.0.1:8030',\n"
+ + "'jdbc-url' = 'jdbc:mysql://127.0.0.1:9030',\n"
+ + "'sink.label-prefix' = 'label'\n"
+ + ")");
+ // define a dynamic aggregating query
+ final Table result = tEnv.sqlQuery("SELECT * from doris_catalog.test.type_test");
+
+ // print the result to the console
+ tEnv.toRetractStream(result, Row.class).print();
+ env.execute();
+ }
+}