Merge pull request #86 from ni-ze/main

[ISSUE #85]add the generic type into SimpleKeyValueDeserializationSchema
diff --git a/src/main/java/org/apache/rocketmq/flink/legacy/common/serialization/SimpleKeyValueDeserializationSchema.java b/src/main/java/org/apache/rocketmq/flink/legacy/common/serialization/SimpleKeyValueDeserializationSchema.java
index 3c9ae95..1177f76 100644
--- a/src/main/java/org/apache/rocketmq/flink/legacy/common/serialization/SimpleKeyValueDeserializationSchema.java
+++ b/src/main/java/org/apache/rocketmq/flink/legacy/common/serialization/SimpleKeyValueDeserializationSchema.java
@@ -21,8 +21,9 @@
 import java.nio.charset.StandardCharsets;
 import java.util.HashMap;
 import java.util.Map;
+import org.apache.flink.api.java.typeutils.MapTypeInfo;
 
-public class SimpleKeyValueDeserializationSchema implements KeyValueDeserializationSchema<Map> {
+public class SimpleKeyValueDeserializationSchema implements KeyValueDeserializationSchema<Map<String, String>> {
     public static final String DEFAULT_KEY_FIELD = "key";
     public static final String DEFAULT_VALUE_FIELD = "value";
 
@@ -45,8 +46,8 @@
     }
 
     @Override
-    public Map deserializeKeyAndValue(byte[] key, byte[] value) {
-        HashMap map = new HashMap(2);
+    public Map<String, String> deserializeKeyAndValue(byte[] key, byte[] value) {
+        HashMap<String, String> map = new HashMap<>(2);
         if (keyField != null) {
             String k = key != null ? new String(key, StandardCharsets.UTF_8) : null;
             map.put(keyField, k);
@@ -59,7 +60,7 @@
     }
 
     @Override
-    public TypeInformation<Map> getProducedType() {
-        return TypeInformation.of(Map.class);
+    public TypeInformation<Map<String, String>> getProducedType() {
+        return new MapTypeInfo<>(String.class, String.class);
     }
 }
diff --git a/src/test/java/org/apache/rocketmq/flink/legacy/RocketMQSourceTest.java b/src/test/java/org/apache/rocketmq/flink/legacy/RocketMQSourceTest.java
index 9c5042c..9e78190 100644
--- a/src/test/java/org/apache/rocketmq/flink/legacy/RocketMQSourceTest.java
+++ b/src/test/java/org/apache/rocketmq/flink/legacy/RocketMQSourceTest.java
@@ -18,6 +18,7 @@
 
 package org.apache.rocketmq.flink.legacy;
 
+import java.util.Map;
 import org.apache.rocketmq.client.consumer.DefaultLitePullConsumer;
 import org.apache.rocketmq.client.consumer.PullResult;
 import org.apache.rocketmq.client.consumer.PullStatus;
@@ -50,16 +51,16 @@
 @Ignore
 public class RocketMQSourceTest {
 
-    private RocketMQSourceFunction rocketMQSource;
+    private RocketMQSourceFunction<Map<String, String>> rocketMQSource;
     private DefaultLitePullConsumer consumer;
-    private KeyValueDeserializationSchema deserializationSchema;
+    private KeyValueDeserializationSchema<Map<String, String>> deserializationSchema;
     private String topic = "tpc";
 
     @Before
     public void setUp() throws Exception {
         deserializationSchema = new SimpleKeyValueDeserializationSchema();
         Properties props = new Properties();
-        rocketMQSource = new RocketMQSourceFunction(deserializationSchema, props);
+        rocketMQSource = new RocketMQSourceFunction<>(deserializationSchema, props);
 
         setFieldValue(rocketMQSource, "topic", topic);
         setFieldValue(rocketMQSource, "runningChecker", new SingleRunningCheck());
@@ -89,7 +90,7 @@
         rocketMQSource.run(context);
 
         // schedule the pull task
-        Set<MessageQueue> set = new HashSet();
+        Set<MessageQueue> set = new HashSet<>();
         set.add(new MessageQueue(topic, "brk", 1));
 
         MessageExt msg = pullResult.getMsgFoundList().get(0);
diff --git a/src/test/java/org/apache/rocketmq/flink/legacy/common/serialization/SimpleKeyValueSerializationSchemaTest.java b/src/test/java/org/apache/rocketmq/flink/legacy/common/serialization/SimpleKeyValueSerializationSchemaTest.java
index 7e2e0d9..e27fdf5 100644
--- a/src/test/java/org/apache/rocketmq/flink/legacy/common/serialization/SimpleKeyValueSerializationSchemaTest.java
+++ b/src/test/java/org/apache/rocketmq/flink/legacy/common/serialization/SimpleKeyValueSerializationSchemaTest.java
@@ -33,7 +33,7 @@
         SimpleKeyValueDeserializationSchema deserializationSchema =
                 new SimpleKeyValueDeserializationSchema("id", "name");
 
-        Map tuple = new HashMap();
+        Map<String, String> tuple = new HashMap<>();
         tuple.put("id", "x001");
         tuple.put("name", "vesense");