[ISSUE-344] Deserialize json in kafka source (#354)

* [ISSUE-344] Support deserializing json in kafka source

* [ISSUE-344] Support deserializing json in kafka source

* [ISSUE-344] Support deserializing json in kafka source

* [ISSUE-344] Support deserializing json in kafka source

* [ISSUE-344] Support deserializing json in kafka source

* [ISSUE-344] Support deserializing json in kafka source
diff --git a/geaflow/geaflow-common/src/main/java/com/antgroup/geaflow/common/config/keys/ConnectorConfigKeys.java b/geaflow/geaflow-common/src/main/java/com/antgroup/geaflow/common/config/keys/ConnectorConfigKeys.java
index a69cf03..2a9eebb 100644
--- a/geaflow/geaflow-common/src/main/java/com/antgroup/geaflow/common/config/keys/ConnectorConfigKeys.java
+++ b/geaflow/geaflow-common/src/main/java/com/antgroup/geaflow/common/config/keys/ConnectorConfigKeys.java
@@ -49,6 +49,24 @@
         .description("Specifies the starting unix timestamp for reading the data table. Format "
             + "must be 'yyyy-MM-dd HH:mm:ss'.");
 
+    public static final ConfigKey GEAFLOW_DSL_CONNECTOR_FORMAT = ConfigKeys
+            .key("geaflow.dsl.connector.format")
+            .defaultValue("text")
+            .description("Specifies the deserialization format for reading from external source like kafka, "
+                    + "possible option currently: json/text");
+
+
+    public static final ConfigKey GEAFLOW_DSL_CONNECTOR_FORMAT_JSON_IGNORE_PARSE_ERROR = ConfigKeys
+            .key("geaflow.dsl.connector.format.json.ignore-parse-error")
+            .defaultValue(false)
+            .description("for json format, skip fields and rows with parse errors instead of failing. "
+                    + "Fields are set to null in case of errors.");
+
+    public static final ConfigKey GEAFLOW_DSL_CONNECTOR_FORMAT_JSON_FAIL_ON_MISSING_FIELD = ConfigKeys
+            .key("geaflow.dsl.connector.format.json.fail-on-missing-field")
+            .defaultValue(false)
+            .description("for json format, whether to fail if a field is missing or not.");
+
     /*************************************************
      *  FILE Connector Parameters.
      *************************************************/
@@ -72,3 +90,5 @@
         .defaultValue(false)
         .description("Whether skip the header for csv format.");
 }
+
+
diff --git a/geaflow/geaflow-dsl/geaflow-dsl-connector/geaflow-dsl-connector-api/src/main/java/com/antgroup/geaflow/dsl/connector/api/serde/DeserializerFactory.java b/geaflow/geaflow-dsl/geaflow-dsl-connector/geaflow-dsl-connector-api/src/main/java/com/antgroup/geaflow/dsl/connector/api/serde/DeserializerFactory.java
new file mode 100644
index 0000000..7816867
--- /dev/null
+++ b/geaflow/geaflow-dsl/geaflow-dsl-connector/geaflow-dsl-connector-api/src/main/java/com/antgroup/geaflow/dsl/connector/api/serde/DeserializerFactory.java
@@ -0,0 +1,30 @@
+package com.antgroup.geaflow.dsl.connector.api.serde;
+
+import com.antgroup.geaflow.common.config.Configuration;
+import com.antgroup.geaflow.common.config.keys.ConnectorConfigKeys;
+import com.antgroup.geaflow.dsl.connector.api.serde.impl.JsonDeserializer;
+import com.antgroup.geaflow.dsl.connector.api.serde.impl.RowTableDeserializer;
+import com.antgroup.geaflow.dsl.connector.api.serde.impl.TextDeserializer;
+import com.antgroup.geaflow.dsl.connector.api.util.ConnectorConstants;
+
+public class DeserializerFactory {
+
+    public static <IN> TableDeserializer<IN> loadDeserializer(Configuration conf) {
+        String connectorFormat = conf.getString(ConnectorConfigKeys.GEAFLOW_DSL_CONNECTOR_FORMAT,
+                (String) ConnectorConfigKeys.GEAFLOW_DSL_CONNECTOR_FORMAT.getDefaultValue());
+        if (connectorFormat.equals(ConnectorConstants.CONNECTOR_FORMAT_JSON)) {
+            return (TableDeserializer<IN>) new JsonDeserializer();
+        } else {
+            return (TableDeserializer<IN>) new TextDeserializer();
+        }
+    }
+
+    public static <IN> TableDeserializer<IN> loadRowTableDeserializer() {
+        return (TableDeserializer<IN>) new RowTableDeserializer();
+    }
+
+    public static <IN> TableDeserializer<IN> loadTextDeserializer() {
+        return (TableDeserializer<IN>) new TextDeserializer();
+    }
+
+}
diff --git a/geaflow/geaflow-dsl/geaflow-dsl-connector/geaflow-dsl-connector-api/src/main/java/com/antgroup/geaflow/dsl/connector/api/serde/impl/JsonDeserializer.java b/geaflow/geaflow-dsl/geaflow-dsl-connector/geaflow-dsl-connector-api/src/main/java/com/antgroup/geaflow/dsl/connector/api/serde/impl/JsonDeserializer.java
new file mode 100644
index 0000000..0db4cf2
--- /dev/null
+++ b/geaflow/geaflow-dsl/geaflow-dsl-connector/geaflow-dsl-connector-api/src/main/java/com/antgroup/geaflow/dsl/connector/api/serde/impl/JsonDeserializer.java
@@ -0,0 +1,79 @@
+package com.antgroup.geaflow.dsl.connector.api.serde.impl;
+
+import com.antgroup.geaflow.common.config.Configuration;
+import com.antgroup.geaflow.common.config.keys.ConnectorConfigKeys;
+import com.antgroup.geaflow.common.exception.GeaflowRuntimeException;
+import com.antgroup.geaflow.common.type.IType;
+import com.antgroup.geaflow.dsl.common.data.Row;
+import com.antgroup.geaflow.dsl.common.data.impl.ObjectRow;
+import com.antgroup.geaflow.dsl.common.types.StructType;
+import com.antgroup.geaflow.dsl.common.util.TypeCastUtil;
+import com.antgroup.geaflow.dsl.connector.api.serde.TableDeserializer;
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.Objects;
+
+public class JsonDeserializer implements TableDeserializer<String> {
+
+    private StructType schema;
+
+    private ObjectMapper mapper;
+
+    private boolean ignoreParseError;
+
+    private boolean failOnMissingField;
+
+
+    @Override
+    public void init(Configuration conf, StructType schema) {
+        this.schema = Objects.requireNonNull(schema);
+        this.mapper = new ObjectMapper();
+        this.ignoreParseError = conf.getBoolean(ConnectorConfigKeys.GEAFLOW_DSL_CONNECTOR_FORMAT_JSON_IGNORE_PARSE_ERROR);
+        this.failOnMissingField = conf.getBoolean(ConnectorConfigKeys.GEAFLOW_DSL_CONNECTOR_FORMAT_JSON_FAIL_ON_MISSING_FIELD);
+
+    }
+
+    @Override
+    public List<Row> deserialize(String record) {
+        if (record == null || record.isEmpty()) {
+            return Collections.emptyList();
+        }
+        Object[] values = new Object[schema.size()];
+        JsonNode jsonNode = null;
+        try {
+            jsonNode = mapper.readTree(record);
+        } catch (JsonProcessingException e) {
+            // handle exception according to configuration
+            if (ignoreParseError) {
+                // return empty list
+                return Collections.emptyList();
+            } else {
+                throw new GeaflowRuntimeException("fail to deserialize record " + record , e);
+            }
+        }
+        // if json node is null
+        for (int i = 0 ; i < schema.size() ; i++) {
+            String fieldName = schema.getFieldNames().get(i);
+            if (failOnMissingField) {
+                if (!jsonNode.has(fieldName)) {
+                    throw new GeaflowRuntimeException("fail to deserialize record " + record + " due to  missing field " + fieldName );
+                }
+            }
+            JsonNode value = jsonNode.get(fieldName);
+            IType<?> type = schema.getType(i);
+            // cast the value to the type defined in the schema.
+            if (value != null) {
+                values[i] = TypeCastUtil.cast(value.asText(), type);
+            } else {
+                values[i] = null;
+            }
+
+        }
+        return  Collections.singletonList(ObjectRow.create(values));
+    }
+
+}
diff --git a/geaflow/geaflow-dsl/geaflow-dsl-connector/geaflow-dsl-connector-api/src/main/java/com/antgroup/geaflow/dsl/connector/api/util/ConnectorConstants.java b/geaflow/geaflow-dsl/geaflow-dsl-connector/geaflow-dsl-connector-api/src/main/java/com/antgroup/geaflow/dsl/connector/api/util/ConnectorConstants.java
index 0d7bd6d..1c9afee 100644
--- a/geaflow/geaflow-dsl/geaflow-dsl-connector/geaflow-dsl-connector-api/src/main/java/com/antgroup/geaflow/dsl/connector/api/util/ConnectorConstants.java
+++ b/geaflow/geaflow-dsl/geaflow-dsl-connector/geaflow-dsl-connector-api/src/main/java/com/antgroup/geaflow/dsl/connector/api/util/ConnectorConstants.java
@@ -17,5 +17,7 @@
 public class ConnectorConstants {
 
     public static final String START_TIME_FORMAT = "yyyy-MM-dd HH:mm:ss";
+    public static final String CONNECTOR_FORMAT_JSON = "json";
+    public static final String CONNECTOR_FORMAT_TEXT = "text";
 
 }
diff --git a/geaflow/geaflow-dsl/geaflow-dsl-connector/geaflow-dsl-connector-api/src/test/java/com/antgroup/geaflow/dsl/connector/api/JsonDeserializerTest.java b/geaflow/geaflow-dsl/geaflow-dsl-connector/geaflow-dsl-connector-api/src/test/java/com/antgroup/geaflow/dsl/connector/api/JsonDeserializerTest.java
new file mode 100644
index 0000000..2821caa
--- /dev/null
+++ b/geaflow/geaflow-dsl/geaflow-dsl-connector/geaflow-dsl-connector-api/src/test/java/com/antgroup/geaflow/dsl/connector/api/JsonDeserializerTest.java
@@ -0,0 +1,101 @@
+package com.antgroup.geaflow.dsl.connector.api;
+
+import com.antgroup.geaflow.common.config.Configuration;
+import com.antgroup.geaflow.common.exception.GeaflowRuntimeException;
+import com.antgroup.geaflow.common.type.primitive.BinaryStringType;
+import com.antgroup.geaflow.common.type.primitive.IntegerType;
+import com.antgroup.geaflow.dsl.common.data.Row;
+import com.antgroup.geaflow.dsl.common.types.StructType;
+import com.antgroup.geaflow.dsl.common.types.TableField;
+import com.antgroup.geaflow.dsl.connector.api.serde.impl.JsonDeserializer;
+import org.junit.Test;
+import org.testng.Assert;
+
+import java.util.Collections;
+import java.util.List;
+
+public class JsonDeserializerTest {
+
+    @Test
+    public void testDeserialize() {
+        JsonDeserializer deserializer = new JsonDeserializer();
+        StructType dataSchema = new StructType(
+                new TableField("id", IntegerType.INSTANCE, false),
+                new TableField("name", BinaryStringType.INSTANCE, true),
+                new TableField("age", IntegerType.INSTANCE, false)
+        );
+        deserializer.init(new Configuration(), dataSchema);
+        List<Row> row = deserializer.deserialize("{\"id\":1, \"name\":\"amy\", \"age\":10}");
+        List<Row>  rowWithNull = deserializer.deserialize("{\"id\":1, \"name\":\"amy\"}");
+        Assert.assertEquals(row.get(0).getField(0, IntegerType.INSTANCE), 1);
+        Assert.assertEquals(row.get(0).getField(1, BinaryStringType.INSTANCE).toString(), "amy");
+        Assert.assertEquals(row.get(0).getField(2, IntegerType.INSTANCE), 10);
+        Assert.assertEquals(rowWithNull.get(0).getField(0, IntegerType.INSTANCE), 1);
+        Assert.assertEquals(rowWithNull.get(0).getField(1, BinaryStringType.INSTANCE).toString(), "amy");
+        Assert.assertEquals(rowWithNull.get(0).getField(2, IntegerType.INSTANCE), null);
+
+    }
+
+
+    @Test
+    public void testDeserializeEmptyString() {
+        JsonDeserializer deserializer = new JsonDeserializer();
+        StructType dataSchema = new StructType(
+                new TableField("id", IntegerType.INSTANCE, false),
+                new TableField("name", BinaryStringType.INSTANCE, true),
+                new TableField("age", IntegerType.INSTANCE, false)
+        );
+        deserializer.init(new Configuration(), dataSchema);
+        List<Row> rows = deserializer.deserialize("");
+        List<Row> testNullRows = deserializer.deserialize(null);
+        Assert.assertEquals(rows, Collections.emptyList());
+        Assert.assertEquals(testNullRows, Collections.emptyList());
+
+    }
+
+    @Test(expected = GeaflowRuntimeException.class)
+    public void testDeserializeParseError() {
+        JsonDeserializer deserializer = new JsonDeserializer();
+        StructType dataSchema = new StructType(
+                new TableField("id", IntegerType.INSTANCE, false),
+                new TableField("name", BinaryStringType.INSTANCE, true),
+                new TableField("age", IntegerType.INSTANCE, false)
+        );
+        deserializer.init(new Configuration(), dataSchema);
+        List<Row> rows = deserializer.deserialize("test");
+    }
+
+    @Test
+    public void testDeserializeIgnoreParseError() {
+        JsonDeserializer deserializer = new JsonDeserializer();
+        StructType dataSchema = new StructType(
+                new TableField("id", IntegerType.INSTANCE, false),
+                new TableField("name", BinaryStringType.INSTANCE, true),
+                new TableField("age", IntegerType.INSTANCE, false)
+        );
+        Configuration conf = new Configuration();
+        conf.put("geaflow.dsl.connector.format.json.ignore-parse-error", "true");
+        deserializer.init(conf, dataSchema);
+        List<Row> rows = deserializer.deserialize("test");
+        Assert.assertEquals(rows, Collections.emptyList());
+    }
+
+    @Test(expected = GeaflowRuntimeException.class)
+    public void testDeserializeFailOnMissingField() {
+        JsonDeserializer deserializer = new JsonDeserializer();
+        StructType dataSchema = new StructType(
+                new TableField("id", IntegerType.INSTANCE, false),
+                new TableField("name", BinaryStringType.INSTANCE, true),
+                new TableField("age", IntegerType.INSTANCE, false)
+        );
+        Configuration conf = new Configuration();
+        conf.put("geaflow.dsl.connector.format.json.fail-on-missing-field", "true");
+        deserializer.init(conf, dataSchema);
+        List<Row>  rowWithMissingField = deserializer.deserialize("{\"id\":1, \"name\":\"amy\"}");
+
+    }
+
+
+
+
+}
diff --git a/geaflow/geaflow-dsl/geaflow-dsl-connector/geaflow-dsl-connector-hive/src/main/java/com/antgroup/geaflow/dsl/connector/hive/HiveTableSource.java b/geaflow/geaflow-dsl/geaflow-dsl-connector/geaflow-dsl-connector-hive/src/main/java/com/antgroup/geaflow/dsl/connector/hive/HiveTableSource.java
index 56ed5df..e4a526d 100644
--- a/geaflow/geaflow-dsl/geaflow-dsl-connector/geaflow-dsl-connector-hive/src/main/java/com/antgroup/geaflow/dsl/connector/hive/HiveTableSource.java
+++ b/geaflow/geaflow-dsl/geaflow-dsl-connector/geaflow-dsl-connector-hive/src/main/java/com/antgroup/geaflow/dsl/connector/hive/HiveTableSource.java
@@ -27,8 +27,8 @@
 import com.antgroup.geaflow.dsl.connector.api.Offset;
 import com.antgroup.geaflow.dsl.connector.api.Partition;
 import com.antgroup.geaflow.dsl.connector.api.TableSource;
+import com.antgroup.geaflow.dsl.connector.api.serde.DeserializerFactory;
 import com.antgroup.geaflow.dsl.connector.api.serde.TableDeserializer;
-import com.antgroup.geaflow.dsl.connector.api.serde.impl.RowTableDeserializer;
 import com.antgroup.geaflow.dsl.connector.hive.adapter.HiveVersionAdapter;
 import com.antgroup.geaflow.dsl.connector.hive.adapter.HiveVersionAdapters;
 import com.antgroup.geaflow.dsl.connector.hive.util.HiveUtils;
@@ -172,7 +172,7 @@
     @SuppressWarnings("unchecked")
     @Override
     public <IN> TableDeserializer<IN> getDeserializer(Configuration conf) {
-        return (TableDeserializer<IN>) new RowTableDeserializer();
+        return DeserializerFactory.loadRowTableDeserializer();
     }
 
     @SuppressWarnings("unchecked")
diff --git a/geaflow/geaflow-dsl/geaflow-dsl-connector/geaflow-dsl-connector-jdbc/src/main/java/com/antgroup/geaflow/dsl/connector/jdbc/JDBCTableSource.java b/geaflow/geaflow-dsl/geaflow-dsl-connector/geaflow-dsl-connector-jdbc/src/main/java/com/antgroup/geaflow/dsl/connector/jdbc/JDBCTableSource.java
index c2b03bb..4e83a29 100644
--- a/geaflow/geaflow-dsl/geaflow-dsl-connector/geaflow-dsl-connector-jdbc/src/main/java/com/antgroup/geaflow/dsl/connector/jdbc/JDBCTableSource.java
+++ b/geaflow/geaflow-dsl/geaflow-dsl-connector/geaflow-dsl-connector-jdbc/src/main/java/com/antgroup/geaflow/dsl/connector/jdbc/JDBCTableSource.java
@@ -25,8 +25,8 @@
 import com.antgroup.geaflow.dsl.connector.api.Offset;
 import com.antgroup.geaflow.dsl.connector.api.Partition;
 import com.antgroup.geaflow.dsl.connector.api.TableSource;
+import com.antgroup.geaflow.dsl.connector.api.serde.DeserializerFactory;
 import com.antgroup.geaflow.dsl.connector.api.serde.TableDeserializer;
-import com.antgroup.geaflow.dsl.connector.api.serde.impl.RowTableDeserializer;
 import com.antgroup.geaflow.dsl.connector.jdbc.util.JDBCUtils;
 import java.io.IOException;
 import java.sql.Connection;
@@ -137,7 +137,7 @@
 
     @Override
     public <IN> TableDeserializer<IN> getDeserializer(Configuration conf) {
-        return (TableDeserializer<IN>) new RowTableDeserializer();
+        return DeserializerFactory.loadRowTableDeserializer();
     }
 
     @Override
diff --git a/geaflow/geaflow-dsl/geaflow-dsl-connector/geaflow-dsl-connector-kafka/src/main/java/com/antgroup/geaflow/dsl/connector/kafka/KafkaTableSource.java b/geaflow/geaflow-dsl/geaflow-dsl-connector/geaflow-dsl-connector-kafka/src/main/java/com/antgroup/geaflow/dsl/connector/kafka/KafkaTableSource.java
index 6fb71f2..4bd6707 100644
--- a/geaflow/geaflow-dsl/geaflow-dsl-connector/geaflow-dsl-connector-kafka/src/main/java/com/antgroup/geaflow/dsl/connector/kafka/KafkaTableSource.java
+++ b/geaflow/geaflow-dsl/geaflow-dsl-connector/geaflow-dsl-connector-kafka/src/main/java/com/antgroup/geaflow/dsl/connector/kafka/KafkaTableSource.java
@@ -11,9 +11,7 @@
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  */
-
 package com.antgroup.geaflow.dsl.connector.kafka;
-
 import com.antgroup.geaflow.api.context.RuntimeContext;
 import com.antgroup.geaflow.common.config.Configuration;
 import com.antgroup.geaflow.common.config.keys.ConnectorConfigKeys;
@@ -26,8 +24,8 @@
 import com.antgroup.geaflow.dsl.connector.api.Offset;
 import com.antgroup.geaflow.dsl.connector.api.Partition;
 import com.antgroup.geaflow.dsl.connector.api.TableSource;
+import com.antgroup.geaflow.dsl.connector.api.serde.DeserializerFactory;
 import com.antgroup.geaflow.dsl.connector.api.serde.TableDeserializer;
-import com.antgroup.geaflow.dsl.connector.api.serde.impl.TextDeserializer;
 import com.antgroup.geaflow.dsl.connector.api.util.ConnectorConstants;
 import com.antgroup.geaflow.dsl.connector.kafka.utils.KafkaConstants;
 import java.io.IOException;
@@ -50,6 +48,7 @@
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+
 public class KafkaTableSource implements TableSource {
 
     private static final Logger LOGGER = LoggerFactory.getLogger(KafkaTableSource.class);
@@ -63,6 +62,8 @@
     private long windowSize;
     private int startTime;
     private Properties props;
+    private String connectorFormat;
+    private TableSchema schema;
 
     private transient KafkaConsumer<String, String> consumer;
 
@@ -90,6 +91,7 @@
             startTime = DateTimeUtil.toUnixTime(startTimeStr, ConnectorConstants.START_TIME_FORMAT);
         }
 
+        this.schema = tableSchema;
         this.props = new Properties();
         props.setProperty(KafkaConstants.KAFKA_BOOTSTRAP_SERVERS, servers);
         props.setProperty(KafkaConstants.KAFKA_KEY_DESERIALIZER,
@@ -99,7 +101,7 @@
         props.setProperty(KafkaConstants.KAFKA_MAX_POLL_RECORDS,
             String.valueOf(windowSize));
         props.setProperty(KafkaConstants.KAFKA_GROUP_ID, groupId);
-        LOGGER.info("open kafka, servers is: {}, topic is:{}, config is:{}, schema is: {}",
+        LOGGER.info("open kafka, servers is: {}, topic is:{}, config is:{}, schema is: {}, connector format is : {}",
             servers, topic, conf, tableSchema);
     }
 
@@ -121,7 +123,7 @@
 
     @Override
     public <IN> TableDeserializer<IN> getDeserializer(Configuration conf) {
-        return (TableDeserializer<IN>) new TextDeserializer();
+        return DeserializerFactory.loadDeserializer(conf);
     }
 
     @Override
@@ -184,6 +186,7 @@
         }
 
         Iterator<ConsumerRecord<String, String>> recordIterator = consumer.poll(POLL_TIMEOUT).iterator();
+
         List<String> dataList = new ArrayList<>();
         long responseMaxTimestamp = -1;
         while (recordIterator.hasNext()) {
@@ -214,6 +217,7 @@
         LOGGER.info("close");
     }
 
+
     public static class KafkaPartition implements Partition {
 
         private final String topic;
diff --git a/geaflow/geaflow-dsl/geaflow-dsl-connector/geaflow-dsl-connector-kafka/src/test/java/com/antgroup/geaflow/dsl/connector/kafka/KafkaTableConnectorTest.java b/geaflow/geaflow-dsl/geaflow-dsl-connector/geaflow-dsl-connector-kafka/src/test/java/com/antgroup/geaflow/dsl/connector/kafka/KafkaTableConnectorTest.java
index 8e7ecd2..c165d32 100644
--- a/geaflow/geaflow-dsl/geaflow-dsl-connector/geaflow-dsl-connector-kafka/src/test/java/com/antgroup/geaflow/dsl/connector/kafka/KafkaTableConnectorTest.java
+++ b/geaflow/geaflow-dsl/geaflow-dsl-connector/geaflow-dsl-connector-kafka/src/test/java/com/antgroup/geaflow/dsl/connector/kafka/KafkaTableConnectorTest.java
@@ -15,7 +15,11 @@
 package com.antgroup.geaflow.dsl.connector.kafka;
 
 import com.alibaba.fastjson.JSON;
+import com.antgroup.geaflow.common.config.Configuration;
 import com.antgroup.geaflow.dsl.connector.api.function.OffsetStore.ConsoleOffset;
+import com.antgroup.geaflow.dsl.connector.api.serde.TableDeserializer;
+import com.antgroup.geaflow.dsl.connector.api.serde.impl.JsonDeserializer;
+import com.antgroup.geaflow.dsl.connector.api.serde.impl.TextDeserializer;
 import com.antgroup.geaflow.dsl.connector.kafka.KafkaTableSource.KafkaOffset;
 import com.antgroup.geaflow.dsl.connector.kafka.KafkaTableSource.KafkaPartition;
 import java.io.IOException;
@@ -54,4 +58,20 @@
         Assert.assertEquals(kvMap.get("type"), "TIMESTAMP");
         Assert.assertTrue(Long.parseLong(kvMap.get("writeTime")) > 0L);
     }
+
+    @Test
+    public void testJsonDeserializer() {
+        KafkaTableSource kafkaTableSource = new KafkaTableSource();
+        Configuration conf = new Configuration();
+        conf.put("geaflow.dsl.connector.format","json");
+        TableDeserializer<Object> deserializer = kafkaTableSource.getDeserializer(conf);
+        Assert.assertEquals(deserializer.getClass(), JsonDeserializer.class);
+    }
+
+    @Test
+    public void testTextDeserializer() {
+        KafkaTableSource kafkaTableSource = new KafkaTableSource();
+        TableDeserializer<Object> deserializer = kafkaTableSource.getDeserializer(new Configuration());
+        Assert.assertEquals(deserializer.getClass(), TextDeserializer.class);
+    }
 }
diff --git a/geaflow/geaflow-dsl/geaflow-dsl-connector/geaflow-dsl-connector-pulsar/src/main/java/com/antgroup/geaflow/dsl/connector/pulsar/PulsarTableSource.java b/geaflow/geaflow-dsl/geaflow-dsl-connector/geaflow-dsl-connector-pulsar/src/main/java/com/antgroup/geaflow/dsl/connector/pulsar/PulsarTableSource.java
index e2c2423..6e8d0a9 100644
--- a/geaflow/geaflow-dsl/geaflow-dsl-connector/geaflow-dsl-connector-pulsar/src/main/java/com/antgroup/geaflow/dsl/connector/pulsar/PulsarTableSource.java
+++ b/geaflow/geaflow-dsl/geaflow-dsl-connector/geaflow-dsl-connector-pulsar/src/main/java/com/antgroup/geaflow/dsl/connector/pulsar/PulsarTableSource.java
@@ -11,8 +11,8 @@
 import com.antgroup.geaflow.dsl.connector.api.Offset;
 import com.antgroup.geaflow.dsl.connector.api.Partition;
 import com.antgroup.geaflow.dsl.connector.api.TableSource;
+import com.antgroup.geaflow.dsl.connector.api.serde.DeserializerFactory;
 import com.antgroup.geaflow.dsl.connector.api.serde.TableDeserializer;
-import com.antgroup.geaflow.dsl.connector.api.serde.impl.TextDeserializer;
 import com.antgroup.geaflow.dsl.connector.api.util.ConnectorConstants;
 import com.antgroup.geaflow.dsl.connector.pulsar.utils.PulsarConstants;
 
@@ -128,7 +128,7 @@
 
     @Override
     public <IN> TableDeserializer<IN> getDeserializer(Configuration conf) {
-        return (TableDeserializer<IN>) new TextDeserializer();
+        return DeserializerFactory.loadTextDeserializer();
     }
 
     @Override
diff --git a/geaflow/geaflow-dsl/geaflow-dsl-connector/geaflow-dsl-connector-socket/src/main/java/com/antgroup/geaflow/dsl/connector/socket/SocketTableSource.java b/geaflow/geaflow-dsl/geaflow-dsl-connector/geaflow-dsl-connector-socket/src/main/java/com/antgroup/geaflow/dsl/connector/socket/SocketTableSource.java
index 2e87c9d..897548a 100644
--- a/geaflow/geaflow-dsl/geaflow-dsl-connector/geaflow-dsl-connector-socket/src/main/java/com/antgroup/geaflow/dsl/connector/socket/SocketTableSource.java
+++ b/geaflow/geaflow-dsl/geaflow-dsl-connector/geaflow-dsl-connector-socket/src/main/java/com/antgroup/geaflow/dsl/connector/socket/SocketTableSource.java
@@ -25,8 +25,8 @@
 import com.antgroup.geaflow.dsl.connector.api.Offset;
 import com.antgroup.geaflow.dsl.connector.api.Partition;
 import com.antgroup.geaflow.dsl.connector.api.TableSource;
+import com.antgroup.geaflow.dsl.connector.api.serde.DeserializerFactory;
 import com.antgroup.geaflow.dsl.connector.api.serde.TableDeserializer;
-import com.antgroup.geaflow.dsl.connector.api.serde.impl.TextDeserializer;
 import com.antgroup.geaflow.dsl.connector.socket.server.NettySourceClient;
 import java.io.IOException;
 import java.util.ArrayList;
@@ -75,7 +75,7 @@
 
     @Override
     public <IN> TableDeserializer<IN> getDeserializer(Configuration conf) {
-        return (TableDeserializer<IN>) new TextDeserializer();
+        return DeserializerFactory.loadTextDeserializer();
     }
 
     @Override