[Improve][SeaTunnel-Schema] Support parse row type from config file (#2771)
* [Hotfix][SeaTunnel-Schema] Fix the bug that can not parse decimal
* [Improve][SeaTunnel-Schema] Support parse row type from config file
diff --git a/seatunnel-connectors-v2/connector-common/src/main/java/org/apache/seatunnel/connectors/seatunnel/common/schema/SeaTunnelSchema.java b/seatunnel-connectors-v2/connector-common/src/main/java/org/apache/seatunnel/connectors/seatunnel/common/schema/SeaTunnelSchema.java
index 9a9e035..144a02c 100644
--- a/seatunnel-connectors-v2/connector-common/src/main/java/org/apache/seatunnel/connectors/seatunnel/common/schema/SeaTunnelSchema.java
+++ b/seatunnel-connectors-v2/connector-common/src/main/java/org/apache/seatunnel/connectors/seatunnel/common/schema/SeaTunnelSchema.java
@@ -33,7 +33,12 @@
import org.apache.seatunnel.shade.com.typesafe.config.Config;
import org.apache.seatunnel.shade.com.typesafe.config.ConfigRenderOptions;
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.node.JsonNodeType;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+
import java.io.Serializable;
+import java.util.LinkedHashMap;
import java.util.Map;
public class SeaTunnelSchema implements Serializable {
@@ -108,6 +113,11 @@
String valueGenericType = "";
// convert type to uppercase
type = type.toUpperCase();
+ String originContent = type;
+ if (type.contains("{") || type.contains("}")) {
+ // Row type
+ type = SqlType.ROW.name();
+ }
if (type.contains("<") || type.contains(">")) {
// Map type or Array type
if (type.startsWith(SqlType.MAP.name())) {
@@ -188,26 +198,34 @@
case TIMESTAMP:
return LocalTimeType.LOCAL_DATE_TIME_TYPE;
default:
- throw new RuntimeException("Not support [row] type now");
+ return mapToSeaTunnelRowType(convertJsonToMap(originContent));
}
}
- private static Map<String, String> convertConfig2Map(Config config) {
+ private static Map<String, String> convertConfigToMap(Config config) {
// Because the entrySet in typesafe config couldn't keep key-value order
// So use jackson parsing schema information into a map to keep key-value order
ConfigRenderOptions options = ConfigRenderOptions.concise();
String schema = config.root().render(options);
- return JsonUtils.toMap(schema);
+ return convertJsonToMap(schema);
}
- public static SeaTunnelSchema buildWithConfig(Config schemaConfig) {
- CheckResult checkResult = CheckConfigUtil.checkAllExists(schemaConfig, FIELD_KEY);
- if (!checkResult.isSuccess()) {
- String errorMsg = String.format("Schema config need option [%s], please correct your config first", FIELD_KEY);
- throw new RuntimeException(errorMsg);
- }
- Config fields = schemaConfig.getConfig(FIELD_KEY);
- Map<String, String> fieldsMap = convertConfig2Map(fields);
+ private static Map<String, String> convertJsonToMap(String json) {
+ ObjectNode jsonNodes = JsonUtils.parseObject(json);
+ LinkedHashMap<String, String> fieldsMap = new LinkedHashMap<>();
+ jsonNodes.fields().forEachRemaining(field -> {
+ String key = field.getKey();
+ JsonNode value = field.getValue();
+ if (value.getNodeType() == JsonNodeType.OBJECT) {
+ fieldsMap.put(key, value.toString());
+ } else {
+ fieldsMap.put(key, value.textValue());
+ }
+ });
+ return fieldsMap;
+ }
+
+ private static SeaTunnelRowType mapToSeaTunnelRowType(Map<String, String> fieldsMap) {
int fieldsNum = fieldsMap.size();
int i = 0;
String[] fieldsName = new String[fieldsNum];
@@ -220,8 +238,18 @@
seaTunnelDataTypes[i] = dataType;
i++;
}
- SeaTunnelRowType seaTunnelRowType = new SeaTunnelRowType(fieldsName, seaTunnelDataTypes);
- return new SeaTunnelSchema(seaTunnelRowType);
+ return new SeaTunnelRowType(fieldsName, seaTunnelDataTypes);
+ }
+
+ public static SeaTunnelSchema buildWithConfig(Config schemaConfig) {
+ CheckResult checkResult = CheckConfigUtil.checkAllExists(schemaConfig, FIELD_KEY);
+ if (!checkResult.isSuccess()) {
+ String errorMsg = String.format("Schema config need option [%s], please correct your config first", FIELD_KEY);
+ throw new RuntimeException(errorMsg);
+ }
+ Config fields = schemaConfig.getConfig(FIELD_KEY);
+ Map<String, String> fieldsMap = convertConfigToMap(fields);
+ return new SeaTunnelSchema(mapToSeaTunnelRowType(fieldsMap));
}
public static SeaTunnelRowType buildSimpleTextSchema() {
diff --git a/seatunnel-connectors-v2/connector-common/src/test/java/org/apache/seatunnel/connector/common/schema/SchemaParseTest.java b/seatunnel-connectors-v2/connector-common/src/test/java/org/apache/seatunnel/connector/common/schema/SchemaParseTest.java
index 1baad8f..c413392 100644
--- a/seatunnel-connectors-v2/connector-common/src/test/java/org/apache/seatunnel/connector/common/schema/SchemaParseTest.java
+++ b/seatunnel-connectors-v2/connector-common/src/test/java/org/apache/seatunnel/connector/common/schema/SchemaParseTest.java
@@ -23,6 +23,7 @@
import org.apache.seatunnel.api.table.type.MapType;
import org.apache.seatunnel.api.table.type.PrimitiveByteArrayType;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import org.apache.seatunnel.api.table.type.SqlType;
import org.apache.seatunnel.connectors.seatunnel.common.schema.SeaTunnelSchema;
import org.apache.seatunnel.shade.com.typesafe.config.Config;
@@ -73,6 +74,7 @@
new MapType<>(BasicType.STRING_TYPE, new MapType<>(BasicType.STRING_TYPE, BasicType.STRING_TYPE)));
Assertions.assertEquals(seaTunnelRowType.getFieldType(1),
new MapType<>(BasicType.STRING_TYPE, new MapType<>(BasicType.STRING_TYPE, ArrayType.INT_ARRAY_TYPE)));
+ Assertions.assertEquals(seaTunnelRowType.getFieldType(17).getSqlType(), SqlType.ROW);
}
public static String getTestConfigFile(String configFile) throws FileNotFoundException, URISyntaxException {
diff --git a/seatunnel-connectors-v2/connector-common/src/test/resources/complex.schema.conf b/seatunnel-connectors-v2/connector-common/src/test/resources/complex.schema.conf
index 6a06dbf..d71d87c 100644
--- a/seatunnel-connectors-v2/connector-common/src/test/resources/complex.schema.conf
+++ b/seatunnel-connectors-v2/connector-common/src/test/resources/complex.schema.conf
@@ -34,5 +34,43 @@
date = date
time = time
timestamp = timestamp
+ row = {
+ map = "map<string, map<string, string>>"
+ map_array = "map<string, map<string, array<int>>>"
+ array = "array<tinyint>"
+ string = string
+ boolean = boolean
+ tinyint = tinyint
+ smallint = smallint
+ int = int
+ bigint = bigint
+ float = float
+ double = double
+ decimal = "decimal(30, 8)"
+ null = "null"
+ bytes = bytes
+ date = date
+ time = time
+ timestamp = timestamp
+ row = {
+ map = "map<string, map<string, string>>"
+ map_array = "map<string, map<string, array<int>>>"
+ array = "array<tinyint>"
+ string = string
+ boolean = boolean
+ tinyint = tinyint
+ smallint = smallint
+ int = int
+ bigint = bigint
+ float = float
+ double = double
+ decimal = "decimal(30, 8)"
+ null = "null"
+ bytes = bytes
+ date = date
+ time = time
+ timestamp = timestamp
+ }
+ }
}
}
\ No newline at end of file