[improve]support string type record in one topic to multiple tables (#59)
diff --git a/src/main/java/org/apache/doris/kafka/connector/service/DorisDefaultSinkService.java b/src/main/java/org/apache/doris/kafka/connector/service/DorisDefaultSinkService.java
index 7870b9d..36f78fe 100644
--- a/src/main/java/org/apache/doris/kafka/connector/service/DorisDefaultSinkService.java
+++ b/src/main/java/org/apache/doris/kafka/connector/service/DorisDefaultSinkService.java
@@ -20,8 +20,11 @@
 package org.apache.doris.kafka.connector.service;
 
 import com.codahale.metrics.MetricRegistry;
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
 import com.google.common.annotations.VisibleForTesting;
 import java.util.Collection;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -39,6 +42,7 @@
 import org.apache.doris.kafka.connector.writer.load.LoadModel;
 import org.apache.kafka.clients.consumer.OffsetAndMetadata;
 import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.connect.data.Struct;
 import org.apache.kafka.connect.sink.SinkRecord;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -60,9 +64,11 @@
     private final DorisOptions dorisOptions;
     private final MetricsJmxReporter metricsJmxReporter;
     private final DorisConnectMonitor connectMonitor;
+    private final ObjectMapper objectMapper;
 
     DorisDefaultSinkService(Map<String, String> config) {
         this.dorisOptions = new DorisOptions(config);
+        this.objectMapper = new ObjectMapper();
         this.writer = new HashMap<>();
         this.conn = new JdbcConnectionProvider(dorisOptions);
         MetricRegistry metricRegistry = new MetricRegistry();
@@ -202,19 +208,36 @@
         if (StringUtils.isEmpty(field)) {
             return defaultTableName;
         }
-        if (!(record.value() instanceof Map)) {
+        return parseRecordTableName(defaultTableName, field, record);
+    }
+
+    private String parseRecordTableName(
+            String defaultTableName, String tableNameField, SinkRecord record) {
+        Object recordValue = record.value();
+        Map<String, Object> recordMap = Collections.emptyMap();
+        if (recordValue instanceof Struct) {
             LOG.warn(
-                    "Only Map objects supported for The 'record.tablename.field' configuration, field={}, record type={}",
-                    field,
-                    record.value().getClass().getName());
+                    "The Struct type record not supported for The 'record.tablename.field' configuration, field={}",
+                    tableNameField);
             return defaultTableName;
+        } else if (recordValue instanceof Map) {
+            recordMap = (Map<String, Object>) recordValue;
+        } else if (recordValue instanceof String) {
+            try {
+                recordMap = objectMapper.readValue((String) recordValue, Map.class);
+            } catch (JsonProcessingException e) {
+                LOG.warn(
+                        "The String type record failed to parse record value to map. record={}, field={}",
+                        recordValue,
+                        tableNameField,
+                        e);
+            }
         }
-        Map<String, Object> map = (Map<String, Object>) record.value();
         // if the field is not found in the record, use the table name in the config
-        if (map.get(field) == null) {
+        if (!recordMap.containsKey(tableNameField)) {
             return defaultTableName;
         }
-        return map.get(field).toString();
+        return recordMap.get(tableNameField).toString();
     }
 
     private static String getNameIndex(String topic, int partition) {
diff --git a/src/test/java/org/apache/doris/kafka/connector/service/TestDorisSinkService.java b/src/test/java/org/apache/doris/kafka/connector/service/TestDorisSinkService.java
index b947405..7ea1c2a 100644
--- a/src/test/java/org/apache/doris/kafka/connector/service/TestDorisSinkService.java
+++ b/src/test/java/org/apache/doris/kafka/connector/service/TestDorisSinkService.java
@@ -21,12 +21,16 @@
 
 import java.io.IOException;
 import java.io.InputStream;
+import java.nio.charset.StandardCharsets;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.Properties;
 import org.apache.doris.kafka.connector.cfg.DorisSinkConnectorConfig;
 import org.apache.kafka.connect.data.Schema;
+import org.apache.kafka.connect.data.SchemaAndValue;
 import org.apache.kafka.connect.data.SchemaBuilder;
+import org.apache.kafka.connect.data.Struct;
+import org.apache.kafka.connect.json.JsonConverter;
 import org.apache.kafka.connect.sink.SinkRecord;
 import org.junit.Assert;
 import org.junit.Before;
@@ -35,6 +39,7 @@
 public class TestDorisSinkService {
 
     private DorisDefaultSinkService dorisDefaultSinkService;
+    private JsonConverter jsonConverter = new JsonConverter();
 
     @Before
     public void init() throws IOException {
@@ -49,6 +54,7 @@
         props.put("name", "sink-connector-test");
         props.put("record.tablename.field", "table_name");
         dorisDefaultSinkService = new DorisDefaultSinkService((Map) props);
+        jsonConverter.configure(new HashMap<>(), false);
     }
 
     @Test
@@ -81,5 +87,49 @@
                         1);
         Assert.assertEquals(
                 "appoint_table", dorisDefaultSinkService.getSinkDorisTableName(record2));
+
+        String recordValue3 = "{\"id\":1,\"name\":\"bob\",\"age\":12}";
+        SinkRecord record3 =
+                new SinkRecord(
+                        "topic_test",
+                        0,
+                        Schema.OPTIONAL_STRING_SCHEMA,
+                        "key",
+                        Schema.OPTIONAL_STRING_SCHEMA,
+                        recordValue3,
+                        3);
+        Assert.assertEquals(
+                "test_kafka_tbl", dorisDefaultSinkService.getSinkDorisTableName(record3));
+
+        String recordValue4 =
+                "{\"id\":12,\"name\":\"jack\",\"age\":13,\"table_name\":\"appoint_table2\"}";
+        SinkRecord record4 =
+                new SinkRecord(
+                        "topic_test",
+                        0,
+                        Schema.OPTIONAL_STRING_SCHEMA,
+                        "key",
+                        Schema.OPTIONAL_STRING_SCHEMA,
+                        recordValue4,
+                        3);
+        Assert.assertEquals(
+                "appoint_table2", dorisDefaultSinkService.getSinkDorisTableName(record4));
+
+        String structMsg =
+                "{\"schema\":{\"type\":\"struct\",\"fields\":[{\"type\":\"struct\",\"fields\":[{\"type\":\"int32\",\"optional\":false,\"field\":\"id\"},{\"type\":\"string\",\"optional\":true,\"field\":\"name\"}],\"optional\":true,\"name\":\"normal.test.test_sink_normal.Value\",\"field\":\"before\"},{\"type\":\"struct\",\"fields\":[{\"type\":\"int32\",\"optional\":false,\"field\":\"id\"},{\"type\":\"string\",\"optional\":true,\"field\":\"name\"}],\"optional\":true,\"name\":\"normal.test.test_sink_normal.Value\",\"field\":\"after\"},{\"type\":\"struct\",\"fields\":[{\"type\":\"string\",\"optional\":false,\"field\":\"version\"},{\"type\":\"string\",\"optional\":false,\"field\":\"connector\"},{\"type\":\"string\",\"optional\":false,\"field\":\"name\"},{\"type\":\"int64\",\"optional\":false,\"field\":\"ts_ms\"},{\"type\":\"string\",\"optional\":true,\"name\":\"io.debezium.data.Enum\",\"version\":1,\"parameters\":{\"allowed\":\"true,last,false,incremental\"},\"default\":\"false\",\"field\":\"snapshot\"},{\"type\":\"string\",\"optional\":false,\"field\":\"db\"},{\"type\":\"string\",\"optional\":true,\"field\":\"sequence\"},{\"type\":\"string\",\"optional\":true,\"field\":\"table\"},{\"type\":\"int64\",\"optional\":false,\"field\":\"server_id\"},{\"type\":\"string\",\"optional\":true,\"field\":\"gtid\"},{\"type\":\"string\",\"optional\":false,\"field\":\"file\"},{\"type\":\"int64\",\"optional\":false,\"field\":\"pos\"},{\"type\":\"int32\",\"optional\":false,\"field\":\"row\"},{\"type\":\"int64\",\"optional\":true,\"field\":\"thread\"},{\"type\":\"string\",\"optional\":true,\"field\":\"query\"}],\"optional\":false,\"name\":\"io.debezium.connector.mysql.Source\",\"field\":\"source\"},{\"type\":\"string\",\"optional\":false,\"field\":\"op\"},{\"type\":\"int64\",\"optional\":true,\"field\":\"ts_ms\"},{\"type\":\"struct\",\"fields\":[{\"type\":\"string\",\"optional\":false,\"field\":\"id\"},{\"type\":\"int64\",\"optional\":false,\"field\":\"total_order\"},{\"type\":\"int64\",\"optional\":false,\"field\":\"data_collection_order\"}],\"optional\":true,\"name\":\"event.block\",\"version\":1,\"field\":\"transaction\"}],\"optional\":false,\"name\":\"normal.test.test_sink_normal.Envelope\",\"version\":1},\"payload\":{\"before\":null,\"after\":{\"id\":19,\"name\":\"fff\"},\"source\":{\"version\":\"2.5.4.Final\",\"connector\":\"mysql\",\"name\":\"normal\",\"ts_ms\":1712543697000,\"snapshot\":\"false\",\"db\":\"test\",\"sequence\":null,\"table\":\"test_sink_normal\",\"server_id\":1,\"gtid\":null,\"file\":\"binlog.000061\",\"pos\":5320,\"row\":0,\"thread\":260,\"query\":null},\"op\":\"c\",\"ts_ms\":1712543697062,\"transaction\":null}}";
+        SchemaAndValue schemaAndValue =
+                jsonConverter.toConnectData(
+                        "topic_test", structMsg.getBytes(StandardCharsets.UTF_8));
+        SinkRecord record5 =
+                new SinkRecord(
+                        "topic_test",
+                        0,
+                        Schema.OPTIONAL_STRING_SCHEMA,
+                        "key",
+                        Schema.OPTIONAL_STRING_SCHEMA,
+                        new Struct(schemaAndValue.schema()),
+                        3);
+        Assert.assertEquals(
+                "test_kafka_tbl", dorisDefaultSinkService.getSinkDorisTableName(record5));
     }
 }