[ISSUE-344] Deserialize json in kafka source (#354)
* [ISSUE-344] Support deserializing json in kafka source
* [ISSUE-344] Support deserializing json in kafka source
* [ISSUE-344] Support deserializing json in kafka source
* [ISSUE-344] Support deserializing json in kafka source
* [ISSUE-344] Support deserializing json in kafka source
* [ISSUE-344] Support deserializing json in kafka source
diff --git a/geaflow/geaflow-common/src/main/java/com/antgroup/geaflow/common/config/keys/ConnectorConfigKeys.java b/geaflow/geaflow-common/src/main/java/com/antgroup/geaflow/common/config/keys/ConnectorConfigKeys.java
index a69cf03..2a9eebb 100644
--- a/geaflow/geaflow-common/src/main/java/com/antgroup/geaflow/common/config/keys/ConnectorConfigKeys.java
+++ b/geaflow/geaflow-common/src/main/java/com/antgroup/geaflow/common/config/keys/ConnectorConfigKeys.java
@@ -49,6 +49,24 @@
.description("Specifies the starting unix timestamp for reading the data table. Format "
+ "must be 'yyyy-MM-dd HH:mm:ss'.");
+ public static final ConfigKey GEAFLOW_DSL_CONNECTOR_FORMAT = ConfigKeys
+ .key("geaflow.dsl.connector.format")
+ .defaultValue("text")
+ .description("Specifies the deserialization format for reading from external source like kafka, "
+ + "possible option currently: json/text");
+
+
+ public static final ConfigKey GEAFLOW_DSL_CONNECTOR_FORMAT_JSON_IGNORE_PARSE_ERROR = ConfigKeys
+ .key("geaflow.dsl.connector.format.json.ignore-parse-error")
+ .defaultValue(false)
+ .description("for json format, skip fields and rows with parse errors instead of failing. "
+ + "Fields are set to null in case of errors.");
+
+ public static final ConfigKey GEAFLOW_DSL_CONNECTOR_FORMAT_JSON_FAIL_ON_MISSING_FIELD = ConfigKeys
+ .key("geaflow.dsl.connector.format.json.fail-on-missing-field")
+ .defaultValue(false)
+ .description("for json format, whether to fail if a field is missing or not.");
+
/*************************************************
* FILE Connector Parameters.
*************************************************/
@@ -72,3 +90,5 @@
.defaultValue(false)
.description("Whether skip the header for csv format.");
}
+
+
diff --git a/geaflow/geaflow-dsl/geaflow-dsl-connector/geaflow-dsl-connector-api/src/main/java/com/antgroup/geaflow/dsl/connector/api/serde/DeserializerFactory.java b/geaflow/geaflow-dsl/geaflow-dsl-connector/geaflow-dsl-connector-api/src/main/java/com/antgroup/geaflow/dsl/connector/api/serde/DeserializerFactory.java
new file mode 100644
index 0000000..7816867
--- /dev/null
+++ b/geaflow/geaflow-dsl/geaflow-dsl-connector/geaflow-dsl-connector-api/src/main/java/com/antgroup/geaflow/dsl/connector/api/serde/DeserializerFactory.java
@@ -0,0 +1,30 @@
+package com.antgroup.geaflow.dsl.connector.api.serde;
+
+import com.antgroup.geaflow.common.config.Configuration;
+import com.antgroup.geaflow.common.config.keys.ConnectorConfigKeys;
+import com.antgroup.geaflow.dsl.connector.api.serde.impl.JsonDeserializer;
+import com.antgroup.geaflow.dsl.connector.api.serde.impl.RowTableDeserializer;
+import com.antgroup.geaflow.dsl.connector.api.serde.impl.TextDeserializer;
+import com.antgroup.geaflow.dsl.connector.api.util.ConnectorConstants;
+
+public class DeserializerFactory {
+
+ public static <IN> TableDeserializer<IN> loadDeserializer(Configuration conf) {
+ String connectorFormat = conf.getString(ConnectorConfigKeys.GEAFLOW_DSL_CONNECTOR_FORMAT,
+ (String) ConnectorConfigKeys.GEAFLOW_DSL_CONNECTOR_FORMAT.getDefaultValue());
+ if (connectorFormat.equals(ConnectorConstants.CONNECTOR_FORMAT_JSON)) {
+ return (TableDeserializer<IN>) new JsonDeserializer();
+ } else {
+ return (TableDeserializer<IN>) new TextDeserializer();
+ }
+ }
+
+ public static <IN> TableDeserializer<IN> loadRowTableDeserializer() {
+ return (TableDeserializer<IN>) new RowTableDeserializer();
+ }
+
+ public static <IN> TableDeserializer<IN> loadTextDeserializer() {
+ return (TableDeserializer<IN>) new TextDeserializer();
+ }
+
+}
diff --git a/geaflow/geaflow-dsl/geaflow-dsl-connector/geaflow-dsl-connector-api/src/main/java/com/antgroup/geaflow/dsl/connector/api/serde/impl/JsonDeserializer.java b/geaflow/geaflow-dsl/geaflow-dsl-connector/geaflow-dsl-connector-api/src/main/java/com/antgroup/geaflow/dsl/connector/api/serde/impl/JsonDeserializer.java
new file mode 100644
index 0000000..0db4cf2
--- /dev/null
+++ b/geaflow/geaflow-dsl/geaflow-dsl-connector/geaflow-dsl-connector-api/src/main/java/com/antgroup/geaflow/dsl/connector/api/serde/impl/JsonDeserializer.java
@@ -0,0 +1,79 @@
+package com.antgroup.geaflow.dsl.connector.api.serde.impl;
+
+import com.antgroup.geaflow.common.config.Configuration;
+import com.antgroup.geaflow.common.config.keys.ConnectorConfigKeys;
+import com.antgroup.geaflow.common.exception.GeaflowRuntimeException;
+import com.antgroup.geaflow.common.type.IType;
+import com.antgroup.geaflow.dsl.common.data.Row;
+import com.antgroup.geaflow.dsl.common.data.impl.ObjectRow;
+import com.antgroup.geaflow.dsl.common.types.StructType;
+import com.antgroup.geaflow.dsl.common.util.TypeCastUtil;
+import com.antgroup.geaflow.dsl.connector.api.serde.TableDeserializer;
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.Objects;
+
+public class JsonDeserializer implements TableDeserializer<String> {
+
+ private StructType schema;
+
+ private ObjectMapper mapper;
+
+ private boolean ignoreParseError;
+
+ private boolean failOnMissingField;
+
+
+ @Override
+ public void init(Configuration conf, StructType schema) {
+ this.schema = Objects.requireNonNull(schema);
+ this.mapper = new ObjectMapper();
+ this.ignoreParseError = conf.getBoolean(ConnectorConfigKeys.GEAFLOW_DSL_CONNECTOR_FORMAT_JSON_IGNORE_PARSE_ERROR);
+ this.failOnMissingField = conf.getBoolean(ConnectorConfigKeys.GEAFLOW_DSL_CONNECTOR_FORMAT_JSON_FAIL_ON_MISSING_FIELD);
+
+ }
+
+ @Override
+ public List<Row> deserialize(String record) {
+ if (record == null || record.isEmpty()) {
+ return Collections.emptyList();
+ }
+ Object[] values = new Object[schema.size()];
+ JsonNode jsonNode = null;
+ try {
+ jsonNode = mapper.readTree(record);
+ } catch (JsonProcessingException e) {
+ // handle exception according to configuration
+ if (ignoreParseError) {
+ // return empty list
+ return Collections.emptyList();
+ } else {
+ throw new GeaflowRuntimeException("fail to deserialize record " + record , e);
+ }
+ }
+ // if json node is null
+ for (int i = 0 ; i < schema.size() ; i++) {
+ String fieldName = schema.getFieldNames().get(i);
+ if (failOnMissingField) {
+ if (!jsonNode.has(fieldName)) {
+ throw new GeaflowRuntimeException("fail to deserialize record " + record + " due to missing field " + fieldName );
+ }
+ }
+ JsonNode value = jsonNode.get(fieldName);
+ IType<?> type = schema.getType(i);
+ // cast the value to the type defined in the schema.
+ if (value != null) {
+ values[i] = TypeCastUtil.cast(value.asText(), type);
+ } else {
+ values[i] = null;
+ }
+
+ }
+ return Collections.singletonList(ObjectRow.create(values));
+ }
+
+}
diff --git a/geaflow/geaflow-dsl/geaflow-dsl-connector/geaflow-dsl-connector-api/src/main/java/com/antgroup/geaflow/dsl/connector/api/util/ConnectorConstants.java b/geaflow/geaflow-dsl/geaflow-dsl-connector/geaflow-dsl-connector-api/src/main/java/com/antgroup/geaflow/dsl/connector/api/util/ConnectorConstants.java
index 0d7bd6d..1c9afee 100644
--- a/geaflow/geaflow-dsl/geaflow-dsl-connector/geaflow-dsl-connector-api/src/main/java/com/antgroup/geaflow/dsl/connector/api/util/ConnectorConstants.java
+++ b/geaflow/geaflow-dsl/geaflow-dsl-connector/geaflow-dsl-connector-api/src/main/java/com/antgroup/geaflow/dsl/connector/api/util/ConnectorConstants.java
@@ -17,5 +17,7 @@
public class ConnectorConstants {
public static final String START_TIME_FORMAT = "yyyy-MM-dd HH:mm:ss";
+ public static final String CONNECTOR_FORMAT_JSON = "json";
+ public static final String CONNECTOR_FORMAT_TEXT = "text";
}
diff --git a/geaflow/geaflow-dsl/geaflow-dsl-connector/geaflow-dsl-connector-api/src/test/java/com/antgroup/geaflow/dsl/connector/api/JsonDeserializerTest.java b/geaflow/geaflow-dsl/geaflow-dsl-connector/geaflow-dsl-connector-api/src/test/java/com/antgroup/geaflow/dsl/connector/api/JsonDeserializerTest.java
new file mode 100644
index 0000000..2821caa
--- /dev/null
+++ b/geaflow/geaflow-dsl/geaflow-dsl-connector/geaflow-dsl-connector-api/src/test/java/com/antgroup/geaflow/dsl/connector/api/JsonDeserializerTest.java
@@ -0,0 +1,101 @@
+package com.antgroup.geaflow.dsl.connector.api;
+
+import com.antgroup.geaflow.common.config.Configuration;
+import com.antgroup.geaflow.common.exception.GeaflowRuntimeException;
+import com.antgroup.geaflow.common.type.primitive.BinaryStringType;
+import com.antgroup.geaflow.common.type.primitive.IntegerType;
+import com.antgroup.geaflow.dsl.common.data.Row;
+import com.antgroup.geaflow.dsl.common.types.StructType;
+import com.antgroup.geaflow.dsl.common.types.TableField;
+import com.antgroup.geaflow.dsl.connector.api.serde.impl.JsonDeserializer;
+import org.junit.Test;
+import org.testng.Assert;
+
+import java.util.Collections;
+import java.util.List;
+
+public class JsonDeserializerTest {
+
+ @Test
+ public void testDeserialize() {
+ JsonDeserializer deserializer = new JsonDeserializer();
+ StructType dataSchema = new StructType(
+ new TableField("id", IntegerType.INSTANCE, false),
+ new TableField("name", BinaryStringType.INSTANCE, true),
+ new TableField("age", IntegerType.INSTANCE, false)
+ );
+ deserializer.init(new Configuration(), dataSchema);
+ List<Row> row = deserializer.deserialize("{\"id\":1, \"name\":\"amy\", \"age\":10}");
+ List<Row> rowWithNull = deserializer.deserialize("{\"id\":1, \"name\":\"amy\"}");
+ Assert.assertEquals(row.get(0).getField(0, IntegerType.INSTANCE), 1);
+ Assert.assertEquals(row.get(0).getField(1, BinaryStringType.INSTANCE).toString(), "amy");
+ Assert.assertEquals(row.get(0).getField(2, IntegerType.INSTANCE), 10);
+ Assert.assertEquals(rowWithNull.get(0).getField(0, IntegerType.INSTANCE), 1);
+ Assert.assertEquals(rowWithNull.get(0).getField(1, BinaryStringType.INSTANCE).toString(), "amy");
+ Assert.assertEquals(rowWithNull.get(0).getField(2, IntegerType.INSTANCE), null);
+
+ }
+
+
+ @Test
+ public void testDeserializeEmptyString() {
+ JsonDeserializer deserializer = new JsonDeserializer();
+ StructType dataSchema = new StructType(
+ new TableField("id", IntegerType.INSTANCE, false),
+ new TableField("name", BinaryStringType.INSTANCE, true),
+ new TableField("age", IntegerType.INSTANCE, false)
+ );
+ deserializer.init(new Configuration(), dataSchema);
+ List<Row> rows = deserializer.deserialize("");
+ List<Row> testNullRows = deserializer.deserialize(null);
+ Assert.assertEquals(rows, Collections.emptyList());
+ Assert.assertEquals(testNullRows, Collections.emptyList());
+
+ }
+
+ @Test(expected = GeaflowRuntimeException.class)
+ public void testDeserializeParseError() {
+ JsonDeserializer deserializer = new JsonDeserializer();
+ StructType dataSchema = new StructType(
+ new TableField("id", IntegerType.INSTANCE, false),
+ new TableField("name", BinaryStringType.INSTANCE, true),
+ new TableField("age", IntegerType.INSTANCE, false)
+ );
+ deserializer.init(new Configuration(), dataSchema);
+ List<Row> rows = deserializer.deserialize("test");
+ }
+
+ @Test
+ public void testDeserializeIgnoreParseError() {
+ JsonDeserializer deserializer = new JsonDeserializer();
+ StructType dataSchema = new StructType(
+ new TableField("id", IntegerType.INSTANCE, false),
+ new TableField("name", BinaryStringType.INSTANCE, true),
+ new TableField("age", IntegerType.INSTANCE, false)
+ );
+ Configuration conf = new Configuration();
+ conf.put("geaflow.dsl.connector.format.json.ignore-parse-error", "true");
+ deserializer.init(conf, dataSchema);
+ List<Row> rows = deserializer.deserialize("test");
+ Assert.assertEquals(rows, Collections.emptyList());
+ }
+
+ @Test(expected = GeaflowRuntimeException.class)
+ public void testDeserializeFailOnMissingField() {
+ JsonDeserializer deserializer = new JsonDeserializer();
+ StructType dataSchema = new StructType(
+ new TableField("id", IntegerType.INSTANCE, false),
+ new TableField("name", BinaryStringType.INSTANCE, true),
+ new TableField("age", IntegerType.INSTANCE, false)
+ );
+ Configuration conf = new Configuration();
+ conf.put("geaflow.dsl.connector.format.json.fail-on-missing-field", "true");
+ deserializer.init(conf, dataSchema);
+ List<Row> rowWithMissingField = deserializer.deserialize("{\"id\":1, \"name\":\"amy\"}");
+
+ }
+
+
+
+
+}
diff --git a/geaflow/geaflow-dsl/geaflow-dsl-connector/geaflow-dsl-connector-hive/src/main/java/com/antgroup/geaflow/dsl/connector/hive/HiveTableSource.java b/geaflow/geaflow-dsl/geaflow-dsl-connector/geaflow-dsl-connector-hive/src/main/java/com/antgroup/geaflow/dsl/connector/hive/HiveTableSource.java
index 56ed5df..e4a526d 100644
--- a/geaflow/geaflow-dsl/geaflow-dsl-connector/geaflow-dsl-connector-hive/src/main/java/com/antgroup/geaflow/dsl/connector/hive/HiveTableSource.java
+++ b/geaflow/geaflow-dsl/geaflow-dsl-connector/geaflow-dsl-connector-hive/src/main/java/com/antgroup/geaflow/dsl/connector/hive/HiveTableSource.java
@@ -27,8 +27,8 @@
import com.antgroup.geaflow.dsl.connector.api.Offset;
import com.antgroup.geaflow.dsl.connector.api.Partition;
import com.antgroup.geaflow.dsl.connector.api.TableSource;
+import com.antgroup.geaflow.dsl.connector.api.serde.DeserializerFactory;
import com.antgroup.geaflow.dsl.connector.api.serde.TableDeserializer;
-import com.antgroup.geaflow.dsl.connector.api.serde.impl.RowTableDeserializer;
import com.antgroup.geaflow.dsl.connector.hive.adapter.HiveVersionAdapter;
import com.antgroup.geaflow.dsl.connector.hive.adapter.HiveVersionAdapters;
import com.antgroup.geaflow.dsl.connector.hive.util.HiveUtils;
@@ -172,7 +172,7 @@
@SuppressWarnings("unchecked")
@Override
public <IN> TableDeserializer<IN> getDeserializer(Configuration conf) {
- return (TableDeserializer<IN>) new RowTableDeserializer();
+ return DeserializerFactory.loadRowTableDeserializer();
}
@SuppressWarnings("unchecked")
diff --git a/geaflow/geaflow-dsl/geaflow-dsl-connector/geaflow-dsl-connector-jdbc/src/main/java/com/antgroup/geaflow/dsl/connector/jdbc/JDBCTableSource.java b/geaflow/geaflow-dsl/geaflow-dsl-connector/geaflow-dsl-connector-jdbc/src/main/java/com/antgroup/geaflow/dsl/connector/jdbc/JDBCTableSource.java
index c2b03bb..4e83a29 100644
--- a/geaflow/geaflow-dsl/geaflow-dsl-connector/geaflow-dsl-connector-jdbc/src/main/java/com/antgroup/geaflow/dsl/connector/jdbc/JDBCTableSource.java
+++ b/geaflow/geaflow-dsl/geaflow-dsl-connector/geaflow-dsl-connector-jdbc/src/main/java/com/antgroup/geaflow/dsl/connector/jdbc/JDBCTableSource.java
@@ -25,8 +25,8 @@
import com.antgroup.geaflow.dsl.connector.api.Offset;
import com.antgroup.geaflow.dsl.connector.api.Partition;
import com.antgroup.geaflow.dsl.connector.api.TableSource;
+import com.antgroup.geaflow.dsl.connector.api.serde.DeserializerFactory;
import com.antgroup.geaflow.dsl.connector.api.serde.TableDeserializer;
-import com.antgroup.geaflow.dsl.connector.api.serde.impl.RowTableDeserializer;
import com.antgroup.geaflow.dsl.connector.jdbc.util.JDBCUtils;
import java.io.IOException;
import java.sql.Connection;
@@ -137,7 +137,7 @@
@Override
public <IN> TableDeserializer<IN> getDeserializer(Configuration conf) {
- return (TableDeserializer<IN>) new RowTableDeserializer();
+ return DeserializerFactory.loadRowTableDeserializer();
}
@Override
diff --git a/geaflow/geaflow-dsl/geaflow-dsl-connector/geaflow-dsl-connector-kafka/src/main/java/com/antgroup/geaflow/dsl/connector/kafka/KafkaTableSource.java b/geaflow/geaflow-dsl/geaflow-dsl-connector/geaflow-dsl-connector-kafka/src/main/java/com/antgroup/geaflow/dsl/connector/kafka/KafkaTableSource.java
index 6fb71f2..4bd6707 100644
--- a/geaflow/geaflow-dsl/geaflow-dsl-connector/geaflow-dsl-connector-kafka/src/main/java/com/antgroup/geaflow/dsl/connector/kafka/KafkaTableSource.java
+++ b/geaflow/geaflow-dsl/geaflow-dsl-connector/geaflow-dsl-connector-kafka/src/main/java/com/antgroup/geaflow/dsl/connector/kafka/KafkaTableSource.java
@@ -11,9 +11,7 @@
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
*/
-
package com.antgroup.geaflow.dsl.connector.kafka;
-
import com.antgroup.geaflow.api.context.RuntimeContext;
import com.antgroup.geaflow.common.config.Configuration;
import com.antgroup.geaflow.common.config.keys.ConnectorConfigKeys;
@@ -26,8 +24,8 @@
import com.antgroup.geaflow.dsl.connector.api.Offset;
import com.antgroup.geaflow.dsl.connector.api.Partition;
import com.antgroup.geaflow.dsl.connector.api.TableSource;
+import com.antgroup.geaflow.dsl.connector.api.serde.DeserializerFactory;
import com.antgroup.geaflow.dsl.connector.api.serde.TableDeserializer;
-import com.antgroup.geaflow.dsl.connector.api.serde.impl.TextDeserializer;
import com.antgroup.geaflow.dsl.connector.api.util.ConnectorConstants;
import com.antgroup.geaflow.dsl.connector.kafka.utils.KafkaConstants;
import java.io.IOException;
@@ -50,6 +48,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+
public class KafkaTableSource implements TableSource {
private static final Logger LOGGER = LoggerFactory.getLogger(KafkaTableSource.class);
@@ -63,6 +62,8 @@
private long windowSize;
private int startTime;
private Properties props;
+ private String connectorFormat;
+ private TableSchema schema;
private transient KafkaConsumer<String, String> consumer;
@@ -90,6 +91,7 @@
startTime = DateTimeUtil.toUnixTime(startTimeStr, ConnectorConstants.START_TIME_FORMAT);
}
+ this.schema = tableSchema;
this.props = new Properties();
props.setProperty(KafkaConstants.KAFKA_BOOTSTRAP_SERVERS, servers);
props.setProperty(KafkaConstants.KAFKA_KEY_DESERIALIZER,
@@ -99,7 +101,7 @@
props.setProperty(KafkaConstants.KAFKA_MAX_POLL_RECORDS,
String.valueOf(windowSize));
props.setProperty(KafkaConstants.KAFKA_GROUP_ID, groupId);
- LOGGER.info("open kafka, servers is: {}, topic is:{}, config is:{}, schema is: {}",
+ LOGGER.info("open kafka, servers is: {}, topic is:{}, config is:{}, schema is: {}, connector format is : {}",
servers, topic, conf, tableSchema);
}
@@ -121,7 +123,7 @@
@Override
public <IN> TableDeserializer<IN> getDeserializer(Configuration conf) {
- return (TableDeserializer<IN>) new TextDeserializer();
+ return DeserializerFactory.loadDeserializer(conf);
}
@Override
@@ -184,6 +186,7 @@
}
Iterator<ConsumerRecord<String, String>> recordIterator = consumer.poll(POLL_TIMEOUT).iterator();
+
List<String> dataList = new ArrayList<>();
long responseMaxTimestamp = -1;
while (recordIterator.hasNext()) {
@@ -214,6 +217,7 @@
LOGGER.info("close");
}
+
public static class KafkaPartition implements Partition {
private final String topic;
diff --git a/geaflow/geaflow-dsl/geaflow-dsl-connector/geaflow-dsl-connector-kafka/src/test/java/com/antgroup/geaflow/dsl/connector/kafka/KafkaTableConnectorTest.java b/geaflow/geaflow-dsl/geaflow-dsl-connector/geaflow-dsl-connector-kafka/src/test/java/com/antgroup/geaflow/dsl/connector/kafka/KafkaTableConnectorTest.java
index 8e7ecd2..c165d32 100644
--- a/geaflow/geaflow-dsl/geaflow-dsl-connector/geaflow-dsl-connector-kafka/src/test/java/com/antgroup/geaflow/dsl/connector/kafka/KafkaTableConnectorTest.java
+++ b/geaflow/geaflow-dsl/geaflow-dsl-connector/geaflow-dsl-connector-kafka/src/test/java/com/antgroup/geaflow/dsl/connector/kafka/KafkaTableConnectorTest.java
@@ -15,7 +15,11 @@
package com.antgroup.geaflow.dsl.connector.kafka;
import com.alibaba.fastjson.JSON;
+import com.antgroup.geaflow.common.config.Configuration;
import com.antgroup.geaflow.dsl.connector.api.function.OffsetStore.ConsoleOffset;
+import com.antgroup.geaflow.dsl.connector.api.serde.TableDeserializer;
+import com.antgroup.geaflow.dsl.connector.api.serde.impl.JsonDeserializer;
+import com.antgroup.geaflow.dsl.connector.api.serde.impl.TextDeserializer;
import com.antgroup.geaflow.dsl.connector.kafka.KafkaTableSource.KafkaOffset;
import com.antgroup.geaflow.dsl.connector.kafka.KafkaTableSource.KafkaPartition;
import java.io.IOException;
@@ -54,4 +58,20 @@
Assert.assertEquals(kvMap.get("type"), "TIMESTAMP");
Assert.assertTrue(Long.parseLong(kvMap.get("writeTime")) > 0L);
}
+
+ @Test
+ public void testJsonDeserializer() {
+ KafkaTableSource kafkaTableSource = new KafkaTableSource();
+ Configuration conf = new Configuration();
+ conf.put("geaflow.dsl.connector.format","json");
+ TableDeserializer<Object> deserializer = kafkaTableSource.getDeserializer(conf);
+ Assert.assertEquals(deserializer.getClass(), JsonDeserializer.class);
+ }
+
+ @Test
+ public void testTextDeserializer() {
+ KafkaTableSource kafkaTableSource = new KafkaTableSource();
+ TableDeserializer<Object> deserializer = kafkaTableSource.getDeserializer(new Configuration());
+ Assert.assertEquals(deserializer.getClass(), TextDeserializer.class);
+ }
}
diff --git a/geaflow/geaflow-dsl/geaflow-dsl-connector/geaflow-dsl-connector-pulsar/src/main/java/com/antgroup/geaflow/dsl/connector/pulsar/PulsarTableSource.java b/geaflow/geaflow-dsl/geaflow-dsl-connector/geaflow-dsl-connector-pulsar/src/main/java/com/antgroup/geaflow/dsl/connector/pulsar/PulsarTableSource.java
index e2c2423..6e8d0a9 100644
--- a/geaflow/geaflow-dsl/geaflow-dsl-connector/geaflow-dsl-connector-pulsar/src/main/java/com/antgroup/geaflow/dsl/connector/pulsar/PulsarTableSource.java
+++ b/geaflow/geaflow-dsl/geaflow-dsl-connector/geaflow-dsl-connector-pulsar/src/main/java/com/antgroup/geaflow/dsl/connector/pulsar/PulsarTableSource.java
@@ -11,8 +11,8 @@
import com.antgroup.geaflow.dsl.connector.api.Offset;
import com.antgroup.geaflow.dsl.connector.api.Partition;
import com.antgroup.geaflow.dsl.connector.api.TableSource;
+import com.antgroup.geaflow.dsl.connector.api.serde.DeserializerFactory;
import com.antgroup.geaflow.dsl.connector.api.serde.TableDeserializer;
-import com.antgroup.geaflow.dsl.connector.api.serde.impl.TextDeserializer;
import com.antgroup.geaflow.dsl.connector.api.util.ConnectorConstants;
import com.antgroup.geaflow.dsl.connector.pulsar.utils.PulsarConstants;
@@ -128,7 +128,7 @@
@Override
public <IN> TableDeserializer<IN> getDeserializer(Configuration conf) {
- return (TableDeserializer<IN>) new TextDeserializer();
+ return DeserializerFactory.loadTextDeserializer();
}
@Override
diff --git a/geaflow/geaflow-dsl/geaflow-dsl-connector/geaflow-dsl-connector-socket/src/main/java/com/antgroup/geaflow/dsl/connector/socket/SocketTableSource.java b/geaflow/geaflow-dsl/geaflow-dsl-connector/geaflow-dsl-connector-socket/src/main/java/com/antgroup/geaflow/dsl/connector/socket/SocketTableSource.java
index 2e87c9d..897548a 100644
--- a/geaflow/geaflow-dsl/geaflow-dsl-connector/geaflow-dsl-connector-socket/src/main/java/com/antgroup/geaflow/dsl/connector/socket/SocketTableSource.java
+++ b/geaflow/geaflow-dsl/geaflow-dsl-connector/geaflow-dsl-connector-socket/src/main/java/com/antgroup/geaflow/dsl/connector/socket/SocketTableSource.java
@@ -25,8 +25,8 @@
import com.antgroup.geaflow.dsl.connector.api.Offset;
import com.antgroup.geaflow.dsl.connector.api.Partition;
import com.antgroup.geaflow.dsl.connector.api.TableSource;
+import com.antgroup.geaflow.dsl.connector.api.serde.DeserializerFactory;
import com.antgroup.geaflow.dsl.connector.api.serde.TableDeserializer;
-import com.antgroup.geaflow.dsl.connector.api.serde.impl.TextDeserializer;
import com.antgroup.geaflow.dsl.connector.socket.server.NettySourceClient;
import java.io.IOException;
import java.util.ArrayList;
@@ -75,7 +75,7 @@
@Override
public <IN> TableDeserializer<IN> getDeserializer(Configuration conf) {
- return (TableDeserializer<IN>) new TextDeserializer();
+ return DeserializerFactory.loadTextDeserializer();
}
@Override