[Feature] Support to configure custom decode method for kafka configurations (#540)

diff --git a/CHANGES.md b/CHANGES.md
index a0b3338..b517839 100644
--- a/CHANGES.md
+++ b/CHANGES.md
@@ -19,6 +19,7 @@
 * Fix possible IllegalStateException when using Micrometer.
 * Support Grizzly Work ThreadPool Metric Monitor
 * Fix the gson dependency in the kafka-reporter-plugin.
+* Support to config custom decode methods for kafka configurations
 
 #### Documentation
 
diff --git a/apm-sniffer/config/agent.config b/apm-sniffer/config/agent.config
index fa172b7..14bf708 100755
--- a/apm-sniffer/config/agent.config
+++ b/apm-sniffer/config/agent.config
@@ -263,6 +263,8 @@
 plugin.kafka.topic_logging=${SW_PLUGIN_KAFKA_TOPIC_LOGGING:skywalking-logs}
 #  isolate multi OAP server when using same Kafka cluster (final topic name will append namespace before Kafka topics with `-` ).
 plugin.kafka.namespace=${SW_KAFKA_NAMESPACE:}
+# Specify which class to decode encoded configuration of kafka.You can set encoded information in `plugin.kafka.producer_config_json` or `plugin.kafka.producer_config` if you need.
+plugin.kafka.decode_class=${SW_KAFKA_DECODE_CLASS:}
 #   Match spring beans with regular expression for the class name. Multiple expressions could be separated by a comma. This only works when `Spring annotation plugin` has been activated. 
 plugin.springannotation.classname_match_regex=${SW_SPRINGANNOTATION_CLASSNAME_MATCH_REGEX:}
 #  Whether or not to transmit logged data as formatted or un-formatted. 
diff --git a/apm-sniffer/optional-reporter-plugins/kafka-config-extension/pom.xml b/apm-sniffer/optional-reporter-plugins/kafka-config-extension/pom.xml
new file mode 100644
index 0000000..a048ed2
--- /dev/null
+++ b/apm-sniffer/optional-reporter-plugins/kafka-config-extension/pom.xml
@@ -0,0 +1,32 @@
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one or more
+  ~ contributor license agreements.  See the NOTICE file distributed with
+  ~ this work for additional information regarding copyright ownership.
+  ~ The ASF licenses this file to You under the Apache License, Version 2.0
+  ~ (the "License"); you may not use this file except in compliance with
+  ~ the License.  You may obtain a copy of the License at
+  ~
+  ~     http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing, software
+  ~ distributed under the License is distributed on an "AS IS" BASIS,
+  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  ~ See the License for the specific language governing permissions and
+  ~ limitations under the License.
+  ~
+  -->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <parent>
+        <artifactId>optional-reporter-plugins</artifactId>
+        <groupId>org.apache.skywalking</groupId>
+        <version>8.16.0-SNAPSHOT</version>
+    </parent>
+    <modelVersion>4.0.0</modelVersion>
+
+    <artifactId>kafka-config-extension</artifactId>
+    <packaging>jar</packaging>
+
+    <url>http://maven.apache.org</url>
+</project>
\ No newline at end of file
diff --git a/apm-sniffer/optional-reporter-plugins/kafka-config-extension/src/main/java/org/apache/skywalking/apm/agent/core/kafka/KafkaConfigExtension.java b/apm-sniffer/optional-reporter-plugins/kafka-config-extension/src/main/java/org/apache/skywalking/apm/agent/core/kafka/KafkaConfigExtension.java
new file mode 100644
index 0000000..5ed0b9a
--- /dev/null
+++ b/apm-sniffer/optional-reporter-plugins/kafka-config-extension/src/main/java/org/apache/skywalking/apm/agent/core/kafka/KafkaConfigExtension.java
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.skywalking.apm.agent.core.kafka;
+
+import java.util.Map;
+
+public interface KafkaConfigExtension {
+    Map<String, String> decode(Map<String, String> config);
+}
diff --git a/apm-sniffer/optional-reporter-plugins/kafka-reporter-plugin/pom.xml b/apm-sniffer/optional-reporter-plugins/kafka-reporter-plugin/pom.xml
index ec7d4cb..d18f203 100644
--- a/apm-sniffer/optional-reporter-plugins/kafka-reporter-plugin/pom.xml
+++ b/apm-sniffer/optional-reporter-plugins/kafka-reporter-plugin/pom.xml
@@ -34,6 +34,11 @@
             <artifactId>kafka-clients</artifactId>
             <version>${kafka-clients.version}</version>
         </dependency>
+        <dependency>
+            <groupId>org.apache.skywalking</groupId>
+            <artifactId>kafka-config-extension</artifactId>
+            <version>${project.version}</version>
+        </dependency>
     </dependencies>
 
 
diff --git a/apm-sniffer/optional-reporter-plugins/kafka-reporter-plugin/src/main/java/org/apache/skywalking/apm/agent/core/kafka/KafkaProducerManager.java b/apm-sniffer/optional-reporter-plugins/kafka-reporter-plugin/src/main/java/org/apache/skywalking/apm/agent/core/kafka/KafkaProducerManager.java
index 9bf52f9..5bb66e4 100644
--- a/apm-sniffer/optional-reporter-plugins/kafka-reporter-plugin/src/main/java/org/apache/skywalking/apm/agent/core/kafka/KafkaProducerManager.java
+++ b/apm-sniffer/optional-reporter-plugins/kafka-reporter-plugin/src/main/java/org/apache/skywalking/apm/agent/core/kafka/KafkaProducerManager.java
@@ -19,6 +19,8 @@
 package org.apache.skywalking.apm.agent.core.kafka;
 
 import com.google.gson.Gson;
+
+import java.lang.reflect.InvocationTargetException;
 import java.util.ArrayList;
 import java.util.HashSet;
 import java.util.List;
@@ -108,9 +110,9 @@
         if (StringUtil.isNotEmpty(Kafka.PRODUCER_CONFIG_JSON)) {
             Gson gson = new Gson();
             Map<String, String> config = (Map<String, String>) gson.fromJson(Kafka.PRODUCER_CONFIG_JSON, Map.class);
-            config.forEach(properties::setProperty);
+            decode(config).forEach(properties::setProperty);
         }
-        Kafka.PRODUCER_CONFIG.forEach(properties::setProperty);
+        decode(Kafka.PRODUCER_CONFIG).forEach(properties::setProperty);
 
         try (AdminClient adminClient = AdminClient.create(properties)) {
             DescribeTopicsResult topicsResult = adminClient.describeTopics(topics);
@@ -153,6 +155,22 @@
         }
     }
 
+    private Map<String, String> decode(Map<String, String> config) {
+        if (StringUtil.isBlank(Kafka.DECODE_CLASS)) {
+            return config;
+        }
+        try {
+            Object decodeTool = Class.forName(Kafka.DECODE_CLASS).getDeclaredConstructor().newInstance();
+            if (decodeTool instanceof KafkaConfigExtension) {
+                return ((KafkaConfigExtension) decodeTool).decode(config);
+            }
+        } catch (ClassNotFoundException | NoSuchMethodException | InstantiationException | IllegalAccessException | InvocationTargetException e) {
+            // ignore
+            LOGGER.warn("The decode class {} does not exist, exception:{}.", Kafka.DECODE_CLASS, e);
+        }
+        return config;
+    }
+
     /**
      * Get the KafkaProducer instance to send data to Kafka broker.
      * @return Kafka producer
diff --git a/apm-sniffer/optional-reporter-plugins/kafka-reporter-plugin/src/main/java/org/apache/skywalking/apm/agent/core/kafka/KafkaReporterPluginConfig.java b/apm-sniffer/optional-reporter-plugins/kafka-reporter-plugin/src/main/java/org/apache/skywalking/apm/agent/core/kafka/KafkaReporterPluginConfig.java
index e3e43d6..23a0bb9 100644
--- a/apm-sniffer/optional-reporter-plugins/kafka-reporter-plugin/src/main/java/org/apache/skywalking/apm/agent/core/kafka/KafkaReporterPluginConfig.java
+++ b/apm-sniffer/optional-reporter-plugins/kafka-reporter-plugin/src/main/java/org/apache/skywalking/apm/agent/core/kafka/KafkaReporterPluginConfig.java
@@ -63,6 +63,10 @@
              * Timeout period of reading topics from the Kafka server, the unit is second.
              */
             public static int GET_TOPIC_TIMEOUT = 10;
+            /**
+             * Class name of decoding encoded information in kafka configuration.
+             */
+            public static String DECODE_CLASS = "";
         }
     }
 }
diff --git a/apm-sniffer/optional-reporter-plugins/kafka-reporter-plugin/src/test/java/org/apache/skywalking/apm/agent/core/kafka/KafkaProducerManagerTest.java b/apm-sniffer/optional-reporter-plugins/kafka-reporter-plugin/src/test/java/org/apache/skywalking/apm/agent/core/kafka/KafkaProducerManagerTest.java
index 20dc593..4317fb0 100644
--- a/apm-sniffer/optional-reporter-plugins/kafka-reporter-plugin/src/test/java/org/apache/skywalking/apm/agent/core/kafka/KafkaProducerManagerTest.java
+++ b/apm-sniffer/optional-reporter-plugins/kafka-reporter-plugin/src/test/java/org/apache/skywalking/apm/agent/core/kafka/KafkaProducerManagerTest.java
@@ -18,11 +18,17 @@
 
 package org.apache.skywalking.apm.agent.core.kafka;
 
-import static org.junit.Assert.assertEquals;
-import java.lang.reflect.Method;
-import java.util.concurrent.atomic.AtomicInteger;
 import org.junit.Test;
 
+import java.lang.reflect.Method;
+import java.nio.charset.StandardCharsets;
+import java.util.Base64;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static org.junit.Assert.assertEquals;
+
 public class KafkaProducerManagerTest {
     @Test
     public void testAddListener() throws Exception {
@@ -54,6 +60,22 @@
         assertEquals(KafkaReporterPluginConfig.Plugin.Kafka.TOPIC_METRICS, value);
     }
 
+    @Test
+    public void testDecode() throws Exception {
+        KafkaReporterPluginConfig.Plugin.Kafka.DECODE_CLASS = "org.apache.skywalking.apm.agent.core.kafka.KafkaProducerManagerTest$DecodeTool";
+        KafkaProducerManager kafkaProducerManager = new KafkaProducerManager();
+
+        Map<String, String> config = new HashMap<>();
+        String value = "test.99998888";
+        config.put("test.password", Base64.getEncoder().encodeToString(value.getBytes(StandardCharsets.UTF_8)));
+
+        Method decodeMethod = kafkaProducerManager.getClass().getDeclaredMethod("decode", Map.class);
+        decodeMethod.setAccessible(true);
+        Map<String, String> decodeConfig = (Map<String, String>) decodeMethod.invoke(kafkaProducerManager, config);
+
+        assertEquals(value, decodeConfig.get("test.password"));
+    }
+
     static class MockListener implements KafkaConnectionStatusListener {
 
         private AtomicInteger counter;
@@ -68,4 +90,14 @@
         }
     }
 
+    static class DecodeTool implements KafkaConfigExtension {
+        @Override
+        public Map<String, String> decode(Map<String, String> config) {
+            if (config.containsKey("test.password")) {
+                config.put("test.password", new String(Base64.getDecoder().decode(config.get("test.password")), StandardCharsets.UTF_8));
+            }
+            return config;
+        }
+    }
+
 }
diff --git a/apm-sniffer/optional-reporter-plugins/pom.xml b/apm-sniffer/optional-reporter-plugins/pom.xml
index f21f0be..57b2e67 100644
--- a/apm-sniffer/optional-reporter-plugins/pom.xml
+++ b/apm-sniffer/optional-reporter-plugins/pom.xml
@@ -30,6 +30,7 @@
 
     <modules>
         <module>kafka-reporter-plugin</module>
+        <module>kafka-config-extension</module>
     </modules>
 
     <properties>
diff --git a/docs/en/setup/service-agent/java-agent/advanced-reporters.md b/docs/en/setup/service-agent/java-agent/advanced-reporters.md
index 47f571e..44264d4 100644
--- a/docs/en/setup/service-agent/java-agent/advanced-reporters.md
+++ b/docs/en/setup/service-agent/java-agent/advanced-reporters.md
@@ -33,6 +33,59 @@
 
 Currently, there are 2 ways to configure advanced configurations below. Notice that, the new way, configured in JSON format, will be overridden by `plugin.kafka.producer_config[key]=value` when they have the duplication keys.
 
+Since 8.16.0, users could implement their decoder for kafka configurations rather than using plain configurations(such as `password`) of Kafka producer,
+Including `plugin.kafka.producer_config_json`,`plugin.kafka.producer_config` or environment variable `SW_PLUGIN_KAFKA_PRODUCER_CONFIG_JSON`.
+
+By doing that, add the `kafka-config-extension` dependency to your decoder project and implement `decode` interface.
+
+- Add the `KafkaConfigExtension` dependency to your project.
+```
+<dependency>
+    <groupId>org.apache.skywalking</groupId>
+    <artifactId>kafka-config-extension</artifactId>
+    <version>${skywalking.version}</version>
+    <scope>provided</scope>
+</dependency>
+```
+
+- Implement your custom decode method.Like this:
+```
+package org.apache.skywalking.apm.agent.sample;
+
+import org.apache.skywalking.apm.agent.core.kafka.KafkaConfigExtension;
+import java.util.Map;
+
+/**
+ * Custom decode class
+ */
+public class DecodeUtil implements KafkaConfigExtension {
+    /**
+     * Custom decode method.
+     * @param config the value of `plugin.kafka.producer_config` or `plugin.kafka.producer_config_json` in `agent.config`.
+     * @return the decoded configuration if you implement your custom decode logic.
+     */
+    public Map<String, String> decode(Map<String, String> config) {
+        /**
+         * implement your custom decode logic
+         * */
+        return config;
+    }
+}
+```
+
+Then, package your decoder project as a jar and move to `agent/plugins`.
+
+**Notice, the jar package should contain all the dependencies required for your custom decode code.**
+
+The last step is to activate the decoder class in `agent.config` like this:
+```
+plugin.kafka.decrypt_class="org.apache.skywalking.apm.agent.sample.DecodeUtil"
+```
+or configure by environment variable
+```
+SW_KAFKA_DECRYPT_CLASS="org.apache.skywalking.apm.agent.sample.DecodeUtil"
+```
+
 ## 3rd party reporters
 There are other reporter implementations from out of the Apache Software Foundation.
 
@@ -40,4 +93,4 @@
 Go to [Pulsar-reporter-plugin](https://github.com/SkyAPM/transporter-plugin-for-skywalking/blob/main/docs/en/pulsar/Pulsar-Reporter.md) for more details.
 
 ### RocketMQ Reporter
-Go to [RocketMQ-reporter-plugin](https://github.com/SkyAPM/transporter-plugin-for-skywalking/blob/main/docs/en/rocketmq/Rocketmq-Reporter.md) for more details.
\ No newline at end of file
+Go to [RocketMQ-reporter-plugin](https://github.com/SkyAPM/transporter-plugin-for-skywalking/blob/main/docs/en/rocketmq/Rocketmq-Reporter.md) for more details.
diff --git a/docs/en/setup/service-agent/java-agent/configurations.md b/docs/en/setup/service-agent/java-agent/configurations.md
index 2df78b1..b55dc44 100644
--- a/docs/en/setup/service-agent/java-agent/configurations.md
+++ b/docs/en/setup/service-agent/java-agent/configurations.md
@@ -103,6 +103,7 @@
 | `plugin.kafka.topic_management`                                 | Specify which Kafka topic name for the register or heartbeat data of Service Instance to report to.                                                                                                                                                                                                                                                                                                                                                                                                                                                    | SW_PLUGIN_KAFKA_TOPIC_MANAGEMENT                                 | `skywalking-managements`                                                                                                                                                                                                                                                                                                                                                                                                                             |
 | `plugin.kafka.topic_logging`                                    | Specify which Kafka topic name for the logging data to report to.                                                                                                                                                                                                                                                                                                                                                                                                                                                                                      | SW_PLUGIN_KAFKA_TOPIC_LOGGING                                    | `skywalking-logging`                                                                                                                                                                                                                                                                                                                                                                                                                                 |
 | `plugin.kafka.namespace`                                        | isolate multi OAP server when using same Kafka cluster (final topic name will append namespace before Kafka topics with `-` ).                                                                                                                                                                                                                                                                                                                                                                                                                         | SW_KAFKA_NAMESPACE                                               | ``                                                                                                                                                                                                                                                                                                                                                                                                                                                   |
+| `plugin.kafka.decode_class`                                     | Specify which class to decode encoded configuration of kafka.You can set encoded information in `plugin.kafka.producer_config_json` or `plugin.kafka.producer_config` if you need.                                                                                                                                                                                                                                                                                                                                                                     | SW_KAFKA_DECODE_CLASS                                            | ``                                                                                                                                                                                                                                                                                                                                                                                                                                                   |
 | `plugin.springannotation.classname_match_regex`                 | Match spring beans with regular expression for the class name. Multiple expressions could be separated by a comma. This only works when `Spring annotation plugin` has been activated.                                                                                                                                                                                                                                                                                                                                                                 | SW_SPRINGANNOTATION_CLASSNAME_MATCH_REGEX                        | `All the spring beans tagged with @Bean,@Service,@Dao, or @Repository.`                                                                                                                                                                                                                                                                                                                                                                              |
 | `plugin.toolkit.log.transmit_formatted`                         | Whether or not to transmit logged data as formatted or un-formatted.                                                                                                                                                                                                                                                                                                                                                                                                                                                                                   | SW_PLUGIN_TOOLKIT_LOG_TRANSMIT_FORMATTED                         | `true`                                                                                                                                                                                                                                                                                                                                                                                                                                               |
 | `plugin.lettuce.trace_redis_parameters`                         | If set to true, the parameters of Redis commands would be collected by Lettuce agent.                                                                                                                                                                                                                                                                                                                                                                                                                                                                  | SW_PLUGIN_LETTUCE_TRACE_REDIS_PARAMETERS                         | `false`                                                                                                                                                                                                                                                                                                                                                                                                                                              |