KAFKA-14588 UserScramCredentialsCommandTest rewritten in Java (#15832)


Reviewers: Chia-Ping Tsai <chia7712@gmail.com>, Igor Soarez <soarez@apple.com>
diff --git a/checkstyle/import-control-core.xml b/checkstyle/import-control-core.xml
index a675363..1b44541 100644
--- a/checkstyle/import-control-core.xml
+++ b/checkstyle/import-control-core.xml
@@ -123,6 +123,7 @@
     <allow pkg="kafka.zk"/>
     <allow pkg="org.apache.kafka.security"/>
     <allow pkg="org.apache.kafka.server"/>
+    <allow pkg="org.apache.kafka.test"/>
     <allow pkg="kafka.test"/>
     <allow pkg="kafka.test.annotation"/>
     <allow pkg="kafka.test.junit"/>
diff --git a/core/src/test/java/kafka/admin/UserScramCredentialsCommandTest.java b/core/src/test/java/kafka/admin/UserScramCredentialsCommandTest.java
new file mode 100644
index 0000000..eb89b0e
--- /dev/null
+++ b/core/src/test/java/kafka/admin/UserScramCredentialsCommandTest.java
@@ -0,0 +1,204 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.admin;
+
+import kafka.test.ClusterInstance;
+import kafka.test.annotation.ClusterTest;
+import kafka.test.annotation.ClusterTestDefaults;
+import kafka.test.annotation.Type;
+import kafka.test.junit.ClusterTestExtensions;
+import kafka.utils.Exit;
+import org.apache.kafka.test.NoRetryException;
+import org.apache.kafka.test.TestUtils;
+import org.junit.jupiter.api.extension.ExtendWith;
+import scala.Console;
+
+import java.io.ByteArrayOutputStream;
+import java.io.PrintStream;
+import java.io.UnsupportedEncodingException;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Objects;
+import java.util.OptionalInt;
+import java.util.concurrent.atomic.AtomicReference;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+@SuppressWarnings("dontUseSystemExit")
+@ExtendWith(value = ClusterTestExtensions.class)
+@ClusterTestDefaults(clusterType = Type.ALL)
+public class UserScramCredentialsCommandTest {
+    private static final String USER1 = "user1";
+    private static final String USER2 = "user2";
+
+    private final ClusterInstance cluster;
+
+    public UserScramCredentialsCommandTest(ClusterInstance cluster) {
+        this.cluster = cluster;
+    }
+
+    static class ConfigCommandResult {
+        public final String stdout;
+        public final OptionalInt exitStatus;
+
+        public ConfigCommandResult(String stdout) {
+            this(stdout, OptionalInt.empty());
+        }
+
+        public ConfigCommandResult(String stdout, OptionalInt exitStatus) {
+            this.stdout = stdout;
+            this.exitStatus = exitStatus;
+        }
+    }
+
+    private ConfigCommandResult runConfigCommandViaBroker(String...args) {
+        ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
+        String utf8 = StandardCharsets.UTF_8.name();
+        PrintStream printStream;
+        try {
+            printStream = new PrintStream(byteArrayOutputStream, true, utf8);
+        } catch (UnsupportedEncodingException e) {
+            throw new RuntimeException(e);
+        }
+        AtomicReference<OptionalInt> exitStatus = new AtomicReference<>(OptionalInt.empty());
+        Exit.setExitProcedure((status, __) -> {
+            exitStatus.set(OptionalInt.of((Integer) status));
+            throw new RuntimeException();
+        });
+
+        List<String> commandArgs = new ArrayList<>(Arrays.asList("--bootstrap-server", cluster.bootstrapServers()));
+        commandArgs.addAll(Arrays.asList(args));
+        try {
+            Console.withOut(printStream, () -> {
+                ConfigCommand.main(commandArgs.toArray(new String[0]));
+                return null;
+            });
+            return new ConfigCommandResult(byteArrayOutputStream.toString(utf8));
+        } catch (Exception e) {
+            return new ConfigCommandResult("", exitStatus.get());
+        } finally {
+            printStream.close();
+            Exit.resetExitProcedure();
+        }
+    }
+
+    @ClusterTest
+    public void testUserScramCredentialsRequests() throws Exception {
+        createAndAlterUser(USER1);
+        // now do the same thing for user2
+        createAndAlterUser(USER2);
+
+        // describe both
+        // we don't know the order that quota or scram users come out, so we have 2 possibilities for each, 4 total
+        String quotaPossibilityAOut = quotaMessage(USER1) + quotaMessage(USER2);
+        String quotaPossibilityBOut = quotaMessage(USER2) + quotaMessage(USER1);
+        String scramPossibilityAOut = describeUserMessage(USER1) + describeUserMessage(USER2);
+        String scramPossibilityBOut = describeUserMessage(USER2) + describeUserMessage(USER1);
+        describeUsers(
+            quotaPossibilityAOut + scramPossibilityAOut,
+            quotaPossibilityAOut + scramPossibilityBOut,
+            quotaPossibilityBOut + scramPossibilityAOut,
+            quotaPossibilityBOut + scramPossibilityBOut);
+
+        // now delete configs, in opposite order, for user1 and user2, and describe
+        deleteConfig(USER1, "consumer_byte_rate");
+        deleteConfig(USER2, "SCRAM-SHA-256");
+        describeUsers(quotaMessage(USER2) + describeUserMessage(USER1));
+
+        // now delete the rest of the configs, for user1 and user2, and describe
+        deleteConfig(USER1, "SCRAM-SHA-256");
+        deleteConfig(USER2, "consumer_byte_rate");
+        describeUsers("");
+    }
+
+    @ClusterTest
+    public void testAlterWithEmptyPassword() {
+        String user1 = "user1";
+        ConfigCommandResult result = runConfigCommandViaBroker("--user", user1, "--alter", "--add-config", "SCRAM-SHA-256=[iterations=4096,password=]");
+        assertTrue(result.exitStatus.isPresent(), "Expected System.exit() to be called with an empty password");
+        assertEquals(1, result.exitStatus.getAsInt(), "Expected empty password to cause failure with exit status=1");
+    }
+
+    @ClusterTest
+    public void testDescribeUnknownUser() {
+        String unknownUser = "unknownUser";
+        ConfigCommandResult result = runConfigCommandViaBroker("--user", unknownUser, "--describe");
+        assertFalse(result.exitStatus.isPresent(), "Expected System.exit() to not be called with an unknown user");
+        assertEquals("", result.stdout);
+    }
+
+    private void createAndAlterUser(String user) throws InterruptedException {
+        // create and describe a credential
+        ConfigCommandResult result = runConfigCommandViaBroker("--user", user, "--alter", "--add-config", "SCRAM-SHA-256=[iterations=4096,password=foo-secret]");
+        assertEquals(updateUserMessage(user), result.stdout);
+        TestUtils.waitForCondition(
+            () -> {
+                try {
+                    return Objects.equals(runConfigCommandViaBroker("--user", user, "--describe").stdout, describeUserMessage(user));
+                } catch (Exception e) {
+                    throw new NoRetryException(e);
+                }
+            },
+            () -> "Failed to describe SCRAM credential change '" + user + "'");
+        // create a user quota and describe the user again
+        result = runConfigCommandViaBroker("--user", user, "--alter", "--add-config", "consumer_byte_rate=20000");
+        assertEquals(updateUserMessage(user), result.stdout);
+        TestUtils.waitForCondition(
+            () -> {
+                try {
+                    return Objects.equals(runConfigCommandViaBroker("--user", user, "--describe").stdout, quotaMessage(user) + describeUserMessage(user));
+                } catch (Exception e) {
+                    throw new NoRetryException(e);
+                }
+            },
+            () -> "Failed to describe Quota change for '" + user + "'");
+    }
+
+    private void deleteConfig(String user, String config) {
+        ConfigCommandResult result = runConfigCommandViaBroker("--user", user, "--alter", "--delete-config", config);
+        assertEquals(updateUserMessage(user), result.stdout);
+    }
+
+    private void describeUsers(String... msgs) throws InterruptedException {
+        TestUtils.waitForCondition(
+            () -> {
+                try {
+                    String output = runConfigCommandViaBroker("--entity-type", "users", "--describe").stdout;
+                    return Arrays.asList(msgs).contains(output);
+                } catch (Exception e) {
+                    throw new NoRetryException(e);
+                }
+            },
+            () -> "Failed to describe config");
+    }
+
+    private static String describeUserMessage(String user) {
+        return "SCRAM credential configs for user-principal '" + user + "' are SCRAM-SHA-256=iterations=4096\n";
+    }
+
+    private static String updateUserMessage(String user) {
+        return "Completed updating config for user " + user + ".\n";
+    }
+
+    private static String quotaMessage(String user) {
+        return "Quota configs for user-principal '" + user + "' are consumer_byte_rate=20000.0\n";
+    }
+}
diff --git a/core/src/test/scala/unit/kafka/admin/UserScramCredentialsCommandTest.scala b/core/src/test/scala/unit/kafka/admin/UserScramCredentialsCommandTest.scala
deleted file mode 100644
index 7dd2143..0000000
--- a/core/src/test/scala/unit/kafka/admin/UserScramCredentialsCommandTest.scala
+++ /dev/null
@@ -1,142 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package kafka.admin
-
-import java.io.{ByteArrayOutputStream, PrintStream}
-import java.nio.charset.StandardCharsets
-
-import kafka.server.BaseRequestTest
-import kafka.utils.Exit
-import kafka.utils.TestUtils
-import org.junit.jupiter.api.Assertions._
-import org.junit.jupiter.params.ParameterizedTest
-import org.junit.jupiter.params.provider.ValueSource
-
-class UserScramCredentialsCommandTest extends BaseRequestTest {
-  override def brokerCount = 1
-  var exitStatus: Option[Int] = None
-  var exitMessage: Option[String] = None
-
-  case class ConfigCommandResult(stdout: String, exitStatus: Option[Int] = None)
-
-  private def runConfigCommandViaBroker(args: Array[String]) : ConfigCommandResult = {
-    val byteArrayOutputStream = new ByteArrayOutputStream()
-    val utf8 = StandardCharsets.UTF_8.name
-    val printStream = new PrintStream(byteArrayOutputStream, true, utf8)
-    var exitStatus: Option[Int] = None
-    Exit.setExitProcedure { (status, _) =>
-      exitStatus = Some(status)
-      throw new RuntimeException
-    }
-    val commandArgs = Array("--bootstrap-server", bootstrapServers()) ++ args
-    try {
-      Console.withOut(printStream) {
-        ConfigCommand.main(commandArgs)
-      }
-      ConfigCommandResult(byteArrayOutputStream.toString(utf8))
-    } catch {
-      case e: Exception => {
-        debug(s"Exception running ConfigCommand ${commandArgs.mkString(" ")}", e)
-        ConfigCommandResult("", exitStatus)
-      }
-    } finally {
-      printStream.close
-      Exit.resetExitProcedure()
-    }
-  }
-
-  @ParameterizedTest
-  @ValueSource(strings = Array("kraft", "zk"))
-  def testUserScramCredentialsRequests(quorum: String): Unit = {
-    val user1 = "user1"
-    // create and describe a credential
-    var result = runConfigCommandViaBroker(Array("--user", user1, "--alter", "--add-config", "SCRAM-SHA-256=[iterations=4096,password=foo-secret]"))
-    val alterConfigsUser1Out = s"Completed updating config for user $user1.\n"
-    assertEquals(alterConfigsUser1Out, result.stdout)
-    val scramCredentialConfigsUser1Out = s"SCRAM credential configs for user-principal '$user1' are SCRAM-SHA-256=iterations=4096\n"
-    TestUtils.waitUntilTrue(() => runConfigCommandViaBroker(Array("--user", user1, "--describe")).stdout ==
-      scramCredentialConfigsUser1Out, s"Failed to describe SCRAM credential change '$user1'")
-    // create a user quota and describe the user again
-    result = runConfigCommandViaBroker(Array("--user", user1, "--alter", "--add-config", "consumer_byte_rate=20000"))
-    assertEquals(alterConfigsUser1Out, result.stdout)
-    val quotaConfigsUser1Out = s"Quota configs for user-principal '$user1' are consumer_byte_rate=20000.0\n"
-    TestUtils.waitUntilTrue(() => runConfigCommandViaBroker(Array("--user", user1, "--describe")).stdout ==
-      s"$quotaConfigsUser1Out$scramCredentialConfigsUser1Out", s"Failed to describe Quota change for '$user1'")
-
-    // now do the same thing for user2
-    val user2 = "user2"
-    // create and describe a credential
-    result = runConfigCommandViaBroker(Array("--user", user2, "--alter", "--add-config", "SCRAM-SHA-256=[iterations=4096,password=foo-secret]"))
-    val alterConfigsUser2Out = s"Completed updating config for user $user2.\n"
-    assertEquals(alterConfigsUser2Out, result.stdout)
-    val scramCredentialConfigsUser2Out = s"SCRAM credential configs for user-principal '$user2' are SCRAM-SHA-256=iterations=4096\n"
-    TestUtils.waitUntilTrue(() => runConfigCommandViaBroker(Array("--user", user2, "--describe")).stdout ==
-      scramCredentialConfigsUser2Out, s"Failed to describe SCRAM credential change '$user2'")
-    // create a user quota and describe the user again
-    result = runConfigCommandViaBroker(Array("--user", user2, "--alter", "--add-config", "consumer_byte_rate=20000"))
-    assertEquals(alterConfigsUser2Out, result.stdout)
-    val quotaConfigsUser2Out = s"Quota configs for user-principal '$user2' are consumer_byte_rate=20000.0\n"
-    TestUtils.waitUntilTrue(() => runConfigCommandViaBroker(Array("--user", user2, "--describe")).stdout ==
-      s"$quotaConfigsUser2Out$scramCredentialConfigsUser2Out", s"Failed to describe Quota change for '$user2'")
-
-    // describe both
-    result = runConfigCommandViaBroker(Array("--entity-type", "users", "--describe"))
-    // we don't know the order that quota or scram users come out, so we have 2 possibilities for each, 4 total
-    val quotaPossibilityAOut = s"$quotaConfigsUser1Out$quotaConfigsUser2Out"
-    val quotaPossibilityBOut = s"$quotaConfigsUser2Out$quotaConfigsUser1Out"
-    val scramPossibilityAOut = s"$scramCredentialConfigsUser1Out$scramCredentialConfigsUser2Out"
-    val scramPossibilityBOut = s"$scramCredentialConfigsUser2Out$scramCredentialConfigsUser1Out"
-    assertTrue(result.stdout.equals(s"$quotaPossibilityAOut$scramPossibilityAOut")
-      || result.stdout.equals(s"$quotaPossibilityAOut$scramPossibilityBOut")
-      || result.stdout.equals(s"$quotaPossibilityBOut$scramPossibilityAOut")
-      || result.stdout.equals(s"$quotaPossibilityBOut$scramPossibilityBOut"))
-
-    // now delete configs, in opposite order, for user1 and user2, and describe
-    result = runConfigCommandViaBroker(Array("--user", user1, "--alter", "--delete-config", "consumer_byte_rate"))
-    assertEquals(alterConfigsUser1Out, result.stdout)
-    result = runConfigCommandViaBroker(Array("--user", user2, "--alter", "--delete-config", "SCRAM-SHA-256"))
-    assertEquals(alterConfigsUser2Out, result.stdout)
-    TestUtils.waitUntilTrue(() => runConfigCommandViaBroker(Array("--entity-type", "users", "--describe")).stdout ==
-      s"$quotaConfigsUser2Out$scramCredentialConfigsUser1Out", s"Failed to describe Quota change for '$user2'")
-
-    // now delete the rest of the configs, for user1 and user2, and describe
-    result = runConfigCommandViaBroker(Array("--user", user1, "--alter", "--delete-config", "SCRAM-SHA-256"))
-    assertEquals(alterConfigsUser1Out, result.stdout)
-    result = runConfigCommandViaBroker(Array("--user", user2, "--alter", "--delete-config", "consumer_byte_rate"))
-    assertEquals(alterConfigsUser2Out, result.stdout)
-    TestUtils.waitUntilTrue(() => runConfigCommandViaBroker(Array("--entity-type", "users", "--describe")).stdout == "",
-      s"Failed to describe All users deleted")
-  }
-
-  @ParameterizedTest
-  @ValueSource(strings = Array("kraft", "zk"))
-  def testAlterWithEmptyPassword(quorum: String): Unit = {
-    val user1 = "user1"
-    val result = runConfigCommandViaBroker(Array("--user", user1, "--alter", "--add-config", "SCRAM-SHA-256=[iterations=4096,password=]"))
-    assertTrue(result.exitStatus.isDefined, "Expected System.exit() to be called with an empty password")
-    assertEquals(1, result.exitStatus.get, "Expected empty password to cause failure with exit status=1")
-  }
-
-  @ParameterizedTest
-  @ValueSource(strings = Array("kraft", "zk"))
-  def testDescribeUnknownUser(quorum: String): Unit = {
-    val unknownUser = "unknownUser"
-    val result = runConfigCommandViaBroker(Array("--user", unknownUser, "--describe"))
-    assertTrue(result.exitStatus.isEmpty, "Expected System.exit() to not be called with an unknown user")
-    assertEquals("", result.stdout)
-  }
-}