polish(serial) add the generic type, Map<String, String>, into SimpleKeyValueDeserializationSchema
diff --git a/src/main/java/org/apache/rocketmq/flink/legacy/common/serialization/SimpleKeyValueDeserializationSchema.java b/src/main/java/org/apache/rocketmq/flink/legacy/common/serialization/SimpleKeyValueDeserializationSchema.java
index 3c9ae95..1177f76 100644
--- a/src/main/java/org/apache/rocketmq/flink/legacy/common/serialization/SimpleKeyValueDeserializationSchema.java
+++ b/src/main/java/org/apache/rocketmq/flink/legacy/common/serialization/SimpleKeyValueDeserializationSchema.java
@@ -21,8 +21,9 @@
import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.Map;
+import org.apache.flink.api.java.typeutils.MapTypeInfo;
-public class SimpleKeyValueDeserializationSchema implements KeyValueDeserializationSchema<Map> {
+public class SimpleKeyValueDeserializationSchema implements KeyValueDeserializationSchema<Map<String, String>> {
public static final String DEFAULT_KEY_FIELD = "key";
public static final String DEFAULT_VALUE_FIELD = "value";
@@ -45,8 +46,8 @@
}
@Override
- public Map deserializeKeyAndValue(byte[] key, byte[] value) {
- HashMap map = new HashMap(2);
+ public Map<String, String> deserializeKeyAndValue(byte[] key, byte[] value) {
+ HashMap<String, String> map = new HashMap<>(2);
if (keyField != null) {
String k = key != null ? new String(key, StandardCharsets.UTF_8) : null;
map.put(keyField, k);
@@ -59,7 +60,7 @@
}
@Override
- public TypeInformation<Map> getProducedType() {
- return TypeInformation.of(Map.class);
+ public TypeInformation<Map<String, String>> getProducedType() {
+ return new MapTypeInfo<>(String.class, String.class);
}
}
diff --git a/src/test/java/org/apache/rocketmq/flink/legacy/RocketMQSourceTest.java b/src/test/java/org/apache/rocketmq/flink/legacy/RocketMQSourceTest.java
index 9c5042c..9e78190 100644
--- a/src/test/java/org/apache/rocketmq/flink/legacy/RocketMQSourceTest.java
+++ b/src/test/java/org/apache/rocketmq/flink/legacy/RocketMQSourceTest.java
@@ -18,6 +18,7 @@
package org.apache.rocketmq.flink.legacy;
+import java.util.Map;
import org.apache.rocketmq.client.consumer.DefaultLitePullConsumer;
import org.apache.rocketmq.client.consumer.PullResult;
import org.apache.rocketmq.client.consumer.PullStatus;
@@ -50,16 +51,16 @@
@Ignore
public class RocketMQSourceTest {
- private RocketMQSourceFunction rocketMQSource;
+ private RocketMQSourceFunction<Map<String, String>> rocketMQSource;
private DefaultLitePullConsumer consumer;
- private KeyValueDeserializationSchema deserializationSchema;
+ private KeyValueDeserializationSchema<Map<String, String>> deserializationSchema;
private String topic = "tpc";
@Before
public void setUp() throws Exception {
deserializationSchema = new SimpleKeyValueDeserializationSchema();
Properties props = new Properties();
- rocketMQSource = new RocketMQSourceFunction(deserializationSchema, props);
+ rocketMQSource = new RocketMQSourceFunction<>(deserializationSchema, props);
setFieldValue(rocketMQSource, "topic", topic);
setFieldValue(rocketMQSource, "runningChecker", new SingleRunningCheck());
@@ -89,7 +90,7 @@
rocketMQSource.run(context);
// schedule the pull task
- Set<MessageQueue> set = new HashSet();
+ Set<MessageQueue> set = new HashSet<>();
set.add(new MessageQueue(topic, "brk", 1));
MessageExt msg = pullResult.getMsgFoundList().get(0);
diff --git a/src/test/java/org/apache/rocketmq/flink/legacy/common/serialization/SimpleKeyValueSerializationSchemaTest.java b/src/test/java/org/apache/rocketmq/flink/legacy/common/serialization/SimpleKeyValueSerializationSchemaTest.java
index 7e2e0d9..e27fdf5 100644
--- a/src/test/java/org/apache/rocketmq/flink/legacy/common/serialization/SimpleKeyValueSerializationSchemaTest.java
+++ b/src/test/java/org/apache/rocketmq/flink/legacy/common/serialization/SimpleKeyValueSerializationSchemaTest.java
@@ -33,7 +33,7 @@
SimpleKeyValueDeserializationSchema deserializationSchema =
new SimpleKeyValueDeserializationSchema("id", "name");
- Map tuple = new HashMap();
+ Map<String, String> tuple = new HashMap<>();
tuple.put("id", "x001");
tuple.put("name", "vesense");