Fix deserialization of kafka producer json config in the kafka-reporter-plugin. (#542)
diff --git a/CHANGES.md b/CHANGES.md
index b517839..f5ebc46 100644
--- a/CHANGES.md
+++ b/CHANGES.md
@@ -19,6 +19,7 @@
* Fix possible IllegalStateException when using Micrometer.
* Support Grizzly Work ThreadPool Metric Monitor
* Fix the gson dependency in the kafka-reporter-plugin.
+* Fix deserialization of kafka producer json config in the kafka-reporter-plugin.
* Support to config custom decode methods for kafka configurations
#### Documentation
diff --git a/apm-sniffer/optional-reporter-plugins/kafka-reporter-plugin/src/main/java/org/apache/skywalking/apm/agent/core/kafka/KafkaProducerManager.java b/apm-sniffer/optional-reporter-plugins/kafka-reporter-plugin/src/main/java/org/apache/skywalking/apm/agent/core/kafka/KafkaProducerManager.java
index 5bb66e4..31dc848 100644
--- a/apm-sniffer/optional-reporter-plugins/kafka-reporter-plugin/src/main/java/org/apache/skywalking/apm/agent/core/kafka/KafkaProducerManager.java
+++ b/apm-sniffer/optional-reporter-plugins/kafka-reporter-plugin/src/main/java/org/apache/skywalking/apm/agent/core/kafka/KafkaProducerManager.java
@@ -18,8 +18,8 @@
package org.apache.skywalking.apm.agent.core.kafka;
+import com.google.gson.reflect.TypeToken;
import com.google.gson.Gson;
-
import java.lang.reflect.InvocationTargetException;
import java.util.ArrayList;
import java.util.HashSet;
@@ -34,7 +34,6 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.stream.Collectors;
-
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.DescribeTopicsResult;
import org.apache.kafka.clients.producer.KafkaProducer;
@@ -107,11 +106,7 @@
Properties properties = new Properties();
properties.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, Kafka.BOOTSTRAP_SERVERS);
- if (StringUtil.isNotEmpty(Kafka.PRODUCER_CONFIG_JSON)) {
- Gson gson = new Gson();
- Map<String, String> config = (Map<String, String>) gson.fromJson(Kafka.PRODUCER_CONFIG_JSON, Map.class);
- decode(config).forEach(properties::setProperty);
- }
+ setPropertiesFromJsonConfig(properties);
decode(Kafka.PRODUCER_CONFIG).forEach(properties::setProperty);
try (AdminClient adminClient = AdminClient.create(properties)) {
@@ -131,12 +126,12 @@
})
.filter(Objects::nonNull)
.collect(Collectors.toSet());
-
+
if (!topics.isEmpty()) {
LOGGER.warn("kafka topics {} is not exist, connect to kafka cluster abort", topics);
return;
}
-
+
try {
producer = new KafkaProducer<>(properties, new StringSerializer(), new BytesSerializer());
} catch (Exception e) {
@@ -149,6 +144,15 @@
}
}
+ void setPropertiesFromJsonConfig(Properties properties) {
+ if (StringUtil.isNotEmpty(Kafka.PRODUCER_CONFIG_JSON)) {
+ Gson gson = new Gson();
+ Map<String, String> config = gson.fromJson(Kafka.PRODUCER_CONFIG_JSON,
+ new TypeToken<Map<String, String>>() { }.getType());
+ decode(config).forEach(properties::setProperty);
+ }
+ }
+
private void notifyListeners(KafkaConnectionStatus status) {
for (KafkaConnectionStatusListener listener : listeners) {
listener.onStatusChanged(status);
diff --git a/apm-sniffer/optional-reporter-plugins/kafka-reporter-plugin/src/test/java/org/apache/skywalking/apm/agent/core/kafka/KafkaProducerManagerTest.java b/apm-sniffer/optional-reporter-plugins/kafka-reporter-plugin/src/test/java/org/apache/skywalking/apm/agent/core/kafka/KafkaProducerManagerTest.java
index 4317fb0..3a6b7c2 100644
--- a/apm-sniffer/optional-reporter-plugins/kafka-reporter-plugin/src/test/java/org/apache/skywalking/apm/agent/core/kafka/KafkaProducerManagerTest.java
+++ b/apm-sniffer/optional-reporter-plugins/kafka-reporter-plugin/src/test/java/org/apache/skywalking/apm/agent/core/kafka/KafkaProducerManagerTest.java
@@ -18,16 +18,15 @@
package org.apache.skywalking.apm.agent.core.kafka;
-import org.junit.Test;
-
+import static org.junit.Assert.assertEquals;
import java.lang.reflect.Method;
+import java.util.Properties;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.junit.Test;
import java.nio.charset.StandardCharsets;
import java.util.Base64;
import java.util.HashMap;
import java.util.Map;
-import java.util.concurrent.atomic.AtomicInteger;
-
-import static org.junit.Assert.assertEquals;
public class KafkaProducerManagerTest {
@Test
@@ -39,8 +38,8 @@
kafkaProducerManager.addListener(new MockListener(counter));
}
Method notifyListeners = kafkaProducerManager
- .getClass()
- .getDeclaredMethod("notifyListeners", KafkaConnectionStatus.class);
+ .getClass()
+ .getDeclaredMethod("notifyListeners", KafkaConnectionStatus.class);
notifyListeners.setAccessible(true);
notifyListeners.invoke(kafkaProducerManager, KafkaConnectionStatus.CONNECTED);
@@ -61,6 +60,17 @@
}
@Test
+ public void testSetPropertiesFromJsonConfig() {
+ KafkaProducerManager kafkaProducerManager = new KafkaProducerManager();
+ Properties properties = new Properties();
+
+ KafkaReporterPluginConfig.Plugin.Kafka.PRODUCER_CONFIG_JSON = "{\"batch.size\":32768}";
+ kafkaProducerManager.setPropertiesFromJsonConfig(properties);
+
+ assertEquals(properties.get("batch.size"), "32768");
+ }
+
+ @Test
public void testDecode() throws Exception {
KafkaReporterPluginConfig.Plugin.Kafka.DECODE_CLASS = "org.apache.skywalking.apm.agent.core.kafka.KafkaProducerManagerTest$DecodeTool";
KafkaProducerManager kafkaProducerManager = new KafkaProducerManager();