MINOR: Improve some requests/responses toString method (#20775)
Improve some requests/responses toString method to log only the
required
info, including the request.Builder toString methods.
1. AlterConfigsRequest
2. AlterUserScramCredentialsRequest
3. ExpireDelegationTokenRequest
4. IncrementalAlterConfigsRequest
5. RenewDelegationTokenRequest
6. SaslAuthenticateRequest
7. createDelegationTokenResponse
8. describeDelegationTokenResponse
9. SaslAuthenticateResponse
Reviewers: Chia-Ping Tsai <chia7712@gmail.com>
Co-authored-by: Luke Chen <showuon@gmail.com>
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/AlterConfigsRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/AlterConfigsRequest.java
index b4d35d5..df3ce5e 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/AlterConfigsRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/AlterConfigsRequest.java
@@ -88,6 +88,11 @@
         public AlterConfigsRequest build(short version) {
             return new AlterConfigsRequest(data, version);
         }
+
+        @Override
+        public String toString() {
+            return maskData(data);
+        }
     }
 
     private final AlterConfigsRequestData data;
@@ -135,4 +140,20 @@
     public static AlterConfigsRequest parse(ByteBuffer buffer, short version) {
         return new AlterConfigsRequest(new AlterConfigsRequestData(new ByteBufferAccessor(buffer), version), version);
     }
+
+    // It is not safe to print all config values
+    private static String maskData(AlterConfigsRequestData data) {
+        AlterConfigsRequestData tempData = data.duplicate();
+        tempData.resources().forEach(resource -> {
+            resource.configs().forEach(config -> {
+                config.setValue("REDACTED");
+            });
+        });
+        return tempData.toString();
+    }
+
+    @Override
+    public String toString() {
+        return maskData(data);
+    }
 }
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/AlterUserScramCredentialsRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/AlterUserScramCredentialsRequest.java
index c779f6d..a51aa7d 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/AlterUserScramCredentialsRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/AlterUserScramCredentialsRequest.java
@@ -17,14 +17,10 @@
 package org.apache.kafka.common.requests;
 
 import org.apache.kafka.common.message.AlterUserScramCredentialsRequestData;
-import org.apache.kafka.common.message.AlterUserScramCredentialsRequestDataJsonConverter;
 import org.apache.kafka.common.message.AlterUserScramCredentialsResponseData;
 import org.apache.kafka.common.protocol.ApiKeys;
 import org.apache.kafka.common.protocol.ByteBufferAccessor;
 
-import com.fasterxml.jackson.databind.JsonNode;
-import com.fasterxml.jackson.databind.node.ObjectNode;
-
 import java.nio.ByteBuffer;
 import java.util.List;
 import java.util.Set;
@@ -48,7 +44,7 @@
 
         @Override
         public String toString() {
-            return data.toString();
+            return maskData(data);
         }
     }
 
@@ -87,15 +83,18 @@
         return new AlterUserScramCredentialsResponse(new AlterUserScramCredentialsResponseData().setResults(results));
     }
 
+    private static String maskData(AlterUserScramCredentialsRequestData data) {
+        AlterUserScramCredentialsRequestData tempData = data.duplicate();
+        tempData.upsertions().forEach(upsertion -> {
+            upsertion.setSalt(new byte[0]);
+            upsertion.setSaltedPassword(new byte[0]);
+        });
+        return tempData.toString();
+    }
+
     // Do not print salt or saltedPassword
     @Override
     public String toString() {
-        JsonNode json = AlterUserScramCredentialsRequestDataJsonConverter.write(data, version()).deepCopy();
-
-        for (JsonNode upsertion : json.get("upsertions")) {
-            ((ObjectNode) upsertion).put("salt", "");
-            ((ObjectNode) upsertion).put("saltedPassword", "");
-        }
-        return AlterUserScramCredentialsRequestDataJsonConverter.read(json, version()).toString();
+        return maskData(data);
     }
 }
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/CreateDelegationTokenResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/CreateDelegationTokenResponse.java
index 0a9f9a8..287013d 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/CreateDelegationTokenResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/CreateDelegationTokenResponse.java
@@ -103,4 +103,13 @@
     public boolean shouldClientThrottle(short version) {
         return version >= 1;
     }
+
+    // Do not print tokenId and Hmac, overwrite a temp copy of the data with empty content
+    @Override
+    public String toString() {
+        CreateDelegationTokenResponseData tempData = data.duplicate();
+        tempData.setTokenId("REDACTED");
+        tempData.setHmac(new byte[0]);
+        return tempData.toString();
+    }
 }
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/DescribeDelegationTokenResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/DescribeDelegationTokenResponse.java
index a922f05..87b3582 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/DescribeDelegationTokenResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/DescribeDelegationTokenResponse.java
@@ -128,4 +128,15 @@
     public boolean shouldClientThrottle(short version) {
         return version >= 1;
     }
+
+    // Do not print tokenId and Hmac, overwrite a temp copy of the data with empty content
+    @Override
+    public String toString() {
+        DescribeDelegationTokenResponseData tempData = data.duplicate();
+        tempData.tokens().forEach(token -> {
+            token.setTokenId("REDACTED");
+            token.setHmac(new byte[0]);
+        });
+        return tempData.toString();
+    }
 }
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/ExpireDelegationTokenRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/ExpireDelegationTokenRequest.java
index 3660a45..ab4bcd9 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/ExpireDelegationTokenRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/ExpireDelegationTokenRequest.java
@@ -74,7 +74,19 @@
 
         @Override
         public String toString() {
-            return data.toString();
+            return maskData(data);
         }
     }
+
+    private static String maskData(ExpireDelegationTokenRequestData data) {
+        ExpireDelegationTokenRequestData tempData = data.duplicate();
+        tempData.setHmac(new byte[0]);
+        return tempData.toString();
+    }
+
+    // Do not print Hmac, overwrite a temp copy of the data with empty content
+    @Override
+    public String toString() {
+        return maskData(data);
+    }
 }
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/IncrementalAlterConfigsRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/IncrementalAlterConfigsRequest.java
index 5454083..bfac0e40 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/IncrementalAlterConfigsRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/IncrementalAlterConfigsRequest.java
@@ -21,15 +21,11 @@
 import org.apache.kafka.common.config.ConfigResource;
 import org.apache.kafka.common.message.IncrementalAlterConfigsRequestData;
 import org.apache.kafka.common.message.IncrementalAlterConfigsRequestData.AlterConfigsResource;
-import org.apache.kafka.common.message.IncrementalAlterConfigsRequestDataJsonConverter;
 import org.apache.kafka.common.message.IncrementalAlterConfigsResponseData;
 import org.apache.kafka.common.message.IncrementalAlterConfigsResponseData.AlterConfigsResourceResponse;
 import org.apache.kafka.common.protocol.ApiKeys;
 import org.apache.kafka.common.protocol.ByteBufferAccessor;
 
-import com.fasterxml.jackson.databind.JsonNode;
-import com.fasterxml.jackson.databind.node.ObjectNode;
-
 import java.nio.ByteBuffer;
 import java.util.Collection;
 import java.util.Map;
@@ -77,7 +73,7 @@
 
         @Override
         public String toString() {
-            return data.toString();
+            return maskData(data);
         }
     }
 
@@ -113,14 +109,18 @@
     }
 
     // It is not safe to print all config values
+    private static String maskData(IncrementalAlterConfigsRequestData data) {
+        IncrementalAlterConfigsRequestData tempData = data.duplicate();
+        tempData.resources().forEach(resource -> {
+            resource.configs().forEach(config -> {
+                config.setValue("REDACTED");
+            });
+        });
+        return tempData.toString();
+    }
+
     @Override
     public String toString() {
-        JsonNode json = IncrementalAlterConfigsRequestDataJsonConverter.write(data, version()).deepCopy();
-        for (JsonNode resource : json.get("resources")) {
-            for (JsonNode config : resource.get("configs")) {
-                ((ObjectNode) config).put("value", "REDACTED");
-            }
-        }
-        return IncrementalAlterConfigsRequestDataJsonConverter.read(json, version()).toString();
+        return maskData(data);
     }
 }
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/RenewDelegationTokenRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/RenewDelegationTokenRequest.java
index 9630970..deeb0db 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/RenewDelegationTokenRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/RenewDelegationTokenRequest.java
@@ -66,7 +66,19 @@
 
         @Override
         public String toString() {
-            return data.toString();
+            return maskData(data);
         }
     }
+
+    private static String maskData(RenewDelegationTokenRequestData data) {
+        RenewDelegationTokenRequestData tempData = data.duplicate();
+        tempData.setHmac(new byte[0]);
+        return tempData.toString();
+    }
+
+    // Do not print Hmac, overwrite a temp copy of the data with empty content
+    @Override
+    public String toString() {
+        return maskData(data);
+    }
 }
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/SaslAuthenticateRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/SaslAuthenticateRequest.java
index 47dd5fd..0816744 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/SaslAuthenticateRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/SaslAuthenticateRequest.java
@@ -78,4 +78,12 @@
         return new SaslAuthenticateRequest(new SaslAuthenticateRequestData(new ByteBufferAccessor(buffer), version),
             version);
     }
+
+    // Do not print authBytes, overwrite a temp copy of the data with empty bytes
+    @Override
+    public String toString() {
+        SaslAuthenticateRequestData tempData = data.duplicate();
+        tempData.setAuthBytes(new byte[0]);
+        return tempData.toString();
+    }
 }
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/SaslAuthenticateResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/SaslAuthenticateResponse.java
index d6ca8c1..8105952 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/SaslAuthenticateResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/SaslAuthenticateResponse.java
@@ -80,4 +80,12 @@
     public static SaslAuthenticateResponse parse(ByteBuffer buffer, short version) {
         return new SaslAuthenticateResponse(new SaslAuthenticateResponseData(new ByteBufferAccessor(buffer), version));
     }
+
+    // Do not print authBytes, overwrite a temp copy of the data with empty bytes
+    @Override
+    public String toString() {
+        SaslAuthenticateResponseData tempData = data.duplicate();
+        tempData.setAuthBytes(new byte[0]);
+        return tempData.toString();
+    }
 }
diff --git a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
index ad52482..008e60d 100644
--- a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
@@ -305,7 +305,6 @@
 import static org.apache.kafka.common.protocol.ApiKeys.PRODUCE;
 import static org.apache.kafka.common.protocol.ApiKeys.SASL_AUTHENTICATE;
 import static org.apache.kafka.common.protocol.ApiKeys.SYNC_GROUP;
-import static org.apache.kafka.common.protocol.ApiKeys.WRITE_TXN_MARKERS;
 import static org.apache.kafka.common.requests.EndTxnRequest.LAST_STABLE_VERSION_BEFORE_TRANSACTION_V2;
 import static org.apache.kafka.common.requests.FetchMetadata.INVALID_SESSION_ID;
 import static org.junit.jupiter.api.Assertions.assertEquals;
@@ -2989,7 +2988,7 @@
     }
 
     private AlterConfigsRequest createAlterConfigsRequest(short version) {
-        Map<ConfigResource, AlterConfigsRequest.Config> configs = new HashMap<>();
+        Map<ConfigResource, AlterConfigsRequest.Config> configs = new LinkedHashMap<>();
         List<AlterConfigsRequest.ConfigEntry> configEntries = asList(
                 new AlterConfigsRequest.ConfigEntry("config_name", "config_value"),
                 new AlterConfigsRequest.ConfigEntry("another_name", "another value")
@@ -2997,7 +2996,19 @@
         configs.put(new ConfigResource(ConfigResource.Type.BROKER, "0"), new AlterConfigsRequest.Config(configEntries));
         configs.put(new ConfigResource(ConfigResource.Type.TOPIC, "topic"),
                 new AlterConfigsRequest.Config(emptyList()));
-        return new AlterConfigsRequest.Builder(configs, false).build(version);
+        AlterConfigsRequest alterConfigsRequest = new AlterConfigsRequest.Builder(configs, false).build(version);
+        assertEquals(
+                "AlterConfigsRequestData(resources=[" +
+                        "AlterConfigsResource(resourceType=" + ConfigResource.Type.BROKER.id() + ", " +
+                        "resourceName='0', " +
+                        "configs=[AlterableConfig(name='config_name', value='REDACTED'), " +
+                        "AlterableConfig(name='another_name', value='REDACTED')]), " +
+                        "AlterConfigsResource(resourceType=" + ConfigResource.Type.TOPIC.id() + ", " +
+                        "resourceName='topic', configs=[])], " +
+                        "validateOnly=false)",
+                alterConfigsRequest.toString()
+        );
+        return alterConfigsRequest;
     }
 
     private AlterConfigsResponse createAlterConfigsResponse() {
@@ -3100,7 +3111,12 @@
                 .setMaxTimestampMs(System.currentTimeMillis())
                 .setTokenId("token1")
                 .setHmac("test".getBytes());
-        return new CreateDelegationTokenResponse(data);
+        var response = new CreateDelegationTokenResponse(data);
+
+        String responseStr = response.toString();
+        assertTrue(responseStr.contains("tokenId='REDACTED'"));
+        assertTrue(responseStr.contains("hmac=[]"));
+        return response;
     }
 
     private RenewDelegationTokenRequest createRenewTokenRequest(short version) {
@@ -3156,7 +3172,14 @@
         tokenList.add(new DelegationToken(tokenInfo1, "test".getBytes()));
         tokenList.add(new DelegationToken(tokenInfo2, "test".getBytes()));
 
-        return new DescribeDelegationTokenResponse(version, 20, Errors.NONE, tokenList);
+        var response = new DescribeDelegationTokenResponse(version, 20, Errors.NONE, tokenList);
+
+        String responseStr = response.toString();
+        String[] parts = responseStr.split(",");
+        // The 2 token info should both be redacted
+        assertEquals(2, Arrays.stream(parts).filter(s -> s.trim().contains("tokenId='REDACTED'")).count());
+        assertEquals(2, Arrays.stream(parts).filter(s -> s.trim().contains("hmac=[]")).count());
+        return response;
     }
 
     private ElectLeadersRequest createElectLeadersRequestNullPartitions() {
@@ -3773,4 +3796,26 @@
                 parseRequest(SASL_AUTHENTICATE, SASL_AUTHENTICATE.latestVersion(), accessor.buffer())).getMessage();
         assertEquals("Error reading byte array of 32767 byte(s): only 3 byte(s) available", msg);
     }
+
+    @Test
+    public void testSaslAuthenticateRequestResponseToStringMasksSensitiveData() {
+        byte[] sensitiveAuthBytes = "sensitive-auth-token-123".getBytes(StandardCharsets.UTF_8);
+        SaslAuthenticateRequestData requestData = new SaslAuthenticateRequestData().setAuthBytes(sensitiveAuthBytes);
+        SaslAuthenticateRequest request = new SaslAuthenticateRequest(requestData, (short) 2);
+
+        String requestString = request.toString();
+
+        // Verify that the authBytes field is present but empty in the output
+        assertTrue(requestString.contains("authBytes=[]"),
+                "authBytes field should be empty in toString() output");
+
+        SaslAuthenticateResponseData responseData = new SaslAuthenticateResponseData().setAuthBytes(sensitiveAuthBytes);
+        SaslAuthenticateResponse response = new SaslAuthenticateResponse(responseData);
+
+        String responseString = response.toString();
+
+        // Verify that the authBytes field is present but empty in the output
+        assertTrue(responseString.contains("authBytes=[]"),
+                "authBytes field should be empty in toString() output");
+    }
 }
diff --git a/core/src/test/scala/unit/kafka/network/RequestChannelTest.scala b/core/src/test/scala/unit/kafka/network/RequestChannelTest.scala
index ecea412..dacabd1 100644
--- a/core/src/test/scala/unit/kafka/network/RequestChannelTest.scala
+++ b/core/src/test/scala/unit/kafka/network/RequestChannelTest.scala
@@ -47,6 +47,7 @@
 import java.io.IOException
 import java.net.InetAddress
 import java.nio.ByteBuffer
+import java.util
 import java.util.Collections
 import java.util.concurrent.atomic.AtomicReference
 import scala.collection.{Map, Seq}
@@ -65,13 +66,23 @@
 
     val sensitiveValue = "secret"
     def verifyConfig(resource: ConfigResource, entries: Seq[ConfigEntry], expectedValues: Map[String, String]): Unit = {
-      val alterConfigs = request(new AlterConfigsRequest.Builder(
-          Collections.singletonMap(resource, new Config(entries.asJavaCollection)), true).build())
+      val alterConfigs = new AlterConfigsRequest.Builder(
+          util.Map.of(resource, new Config(entries.asJavaCollection)), true).build()
 
-      val loggableAlterConfigs = alterConfigs.loggableRequest.asInstanceOf[AlterConfigsRequest]
+      val alterConfigsString = alterConfigs.toString
+      entries.foreach { entry =>
+        if (!alterConfigsString.contains(entry.name())) {
+          fail("Config names should be in the request string")
+        }
+        if (entry.value() != null && alterConfigsString.contains(entry.value())) {
+          fail("Config values should not be in the request string")
+        }
+      }
+      val alterConfigsReq = request(alterConfigs)
+      val loggableAlterConfigs = alterConfigsReq.loggableRequest.asInstanceOf[AlterConfigsRequest]
       val loggedConfig = loggableAlterConfigs.configs.get(resource)
       assertEquals(expectedValues, toMap(loggedConfig))
-      val alterConfigsDesc = RequestConvertToJson.requestDesc(alterConfigs.header, alterConfigs.requestLog.toJava, alterConfigs.isForwarded).toString
+      val alterConfigsDesc = RequestConvertToJson.requestDesc(alterConfigsReq.header, alterConfigsReq.requestLog.toJava, alterConfigsReq.isForwarded).toString
       assertFalse(alterConfigsDesc.contains(sensitiveValue), s"Sensitive config logged $alterConfigsDesc")
     }