Fix deserialization of kafka producer json config in the kafka-reporter-plugin. (#542)

diff --git a/CHANGES.md b/CHANGES.md
index b517839..f5ebc46 100644
--- a/CHANGES.md
+++ b/CHANGES.md
@@ -19,6 +19,7 @@
 * Fix possible IllegalStateException when using Micrometer.
 * Support Grizzly Work ThreadPool Metric Monitor
 * Fix the gson dependency in the kafka-reporter-plugin.
+* Fix deserialization of kafka producer json config in the kafka-reporter-plugin.
 * Support to config custom decode methods for kafka configurations
 
 #### Documentation
diff --git a/apm-sniffer/optional-reporter-plugins/kafka-reporter-plugin/src/main/java/org/apache/skywalking/apm/agent/core/kafka/KafkaProducerManager.java b/apm-sniffer/optional-reporter-plugins/kafka-reporter-plugin/src/main/java/org/apache/skywalking/apm/agent/core/kafka/KafkaProducerManager.java
index 5bb66e4..31dc848 100644
--- a/apm-sniffer/optional-reporter-plugins/kafka-reporter-plugin/src/main/java/org/apache/skywalking/apm/agent/core/kafka/KafkaProducerManager.java
+++ b/apm-sniffer/optional-reporter-plugins/kafka-reporter-plugin/src/main/java/org/apache/skywalking/apm/agent/core/kafka/KafkaProducerManager.java
@@ -18,8 +18,8 @@
 
 package org.apache.skywalking.apm.agent.core.kafka;
 
+import com.google.gson.reflect.TypeToken;
 import com.google.gson.Gson;
-
 import java.lang.reflect.InvocationTargetException;
 import java.util.ArrayList;
 import java.util.HashSet;
@@ -34,7 +34,6 @@
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 import java.util.stream.Collectors;
-
 import org.apache.kafka.clients.admin.AdminClient;
 import org.apache.kafka.clients.admin.DescribeTopicsResult;
 import org.apache.kafka.clients.producer.KafkaProducer;
@@ -107,11 +106,7 @@
         Properties properties = new Properties();
         properties.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, Kafka.BOOTSTRAP_SERVERS);
 
-        if (StringUtil.isNotEmpty(Kafka.PRODUCER_CONFIG_JSON)) {
-            Gson gson = new Gson();
-            Map<String, String> config = (Map<String, String>) gson.fromJson(Kafka.PRODUCER_CONFIG_JSON, Map.class);
-            decode(config).forEach(properties::setProperty);
-        }
+        setPropertiesFromJsonConfig(properties);
         decode(Kafka.PRODUCER_CONFIG).forEach(properties::setProperty);
 
         try (AdminClient adminClient = AdminClient.create(properties)) {
@@ -131,12 +126,12 @@
                     })
                     .filter(Objects::nonNull)
                     .collect(Collectors.toSet());
-    
+
             if (!topics.isEmpty()) {
                 LOGGER.warn("kafka topics {} is not exist, connect to kafka cluster abort", topics);
                 return;
             }
-    
+
             try {
                 producer = new KafkaProducer<>(properties, new StringSerializer(), new BytesSerializer());
             } catch (Exception e) {
@@ -149,6 +144,15 @@
         }
     }
 
+    void setPropertiesFromJsonConfig(Properties properties) {
+        if (StringUtil.isNotEmpty(Kafka.PRODUCER_CONFIG_JSON)) {
+            Gson gson = new Gson();
+            Map<String, String> config = gson.fromJson(Kafka.PRODUCER_CONFIG_JSON,
+                    new TypeToken<Map<String, String>>() { }.getType());
+            decode(config).forEach(properties::setProperty);
+        }
+    }
+
     private void notifyListeners(KafkaConnectionStatus status) {
         for (KafkaConnectionStatusListener listener : listeners) {
             listener.onStatusChanged(status);
diff --git a/apm-sniffer/optional-reporter-plugins/kafka-reporter-plugin/src/test/java/org/apache/skywalking/apm/agent/core/kafka/KafkaProducerManagerTest.java b/apm-sniffer/optional-reporter-plugins/kafka-reporter-plugin/src/test/java/org/apache/skywalking/apm/agent/core/kafka/KafkaProducerManagerTest.java
index 4317fb0..3a6b7c2 100644
--- a/apm-sniffer/optional-reporter-plugins/kafka-reporter-plugin/src/test/java/org/apache/skywalking/apm/agent/core/kafka/KafkaProducerManagerTest.java
+++ b/apm-sniffer/optional-reporter-plugins/kafka-reporter-plugin/src/test/java/org/apache/skywalking/apm/agent/core/kafka/KafkaProducerManagerTest.java
@@ -18,16 +18,15 @@
 
 package org.apache.skywalking.apm.agent.core.kafka;
 
-import org.junit.Test;
-
+import static org.junit.Assert.assertEquals;
 import java.lang.reflect.Method;
+import java.util.Properties;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.junit.Test;
 import java.nio.charset.StandardCharsets;
 import java.util.Base64;
 import java.util.HashMap;
 import java.util.Map;
-import java.util.concurrent.atomic.AtomicInteger;
-
-import static org.junit.Assert.assertEquals;
 
 public class KafkaProducerManagerTest {
     @Test
@@ -39,8 +38,8 @@
             kafkaProducerManager.addListener(new MockListener(counter));
         }
         Method notifyListeners = kafkaProducerManager
-            .getClass()
-            .getDeclaredMethod("notifyListeners", KafkaConnectionStatus.class);
+                .getClass()
+                .getDeclaredMethod("notifyListeners", KafkaConnectionStatus.class);
         notifyListeners.setAccessible(true);
         notifyListeners.invoke(kafkaProducerManager, KafkaConnectionStatus.CONNECTED);
 
@@ -61,6 +60,17 @@
     }
 
     @Test
+    public void testSetPropertiesFromJsonConfig() {
+        KafkaProducerManager kafkaProducerManager = new KafkaProducerManager();
+        Properties properties = new Properties();
+
+        KafkaReporterPluginConfig.Plugin.Kafka.PRODUCER_CONFIG_JSON = "{\"batch.size\":32768}";
+        kafkaProducerManager.setPropertiesFromJsonConfig(properties);
+
+        assertEquals(properties.get("batch.size"), "32768");
+    }
+
+    @Test
     public void testDecode() throws Exception {
         KafkaReporterPluginConfig.Plugin.Kafka.DECODE_CLASS = "org.apache.skywalking.apm.agent.core.kafka.KafkaProducerManagerTest$DecodeTool";
         KafkaProducerManager kafkaProducerManager = new KafkaProducerManager();