blob: 7dd2143ed305fa4b1d4e13dd44d555c20793f161 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package kafka.admin
import java.io.{ByteArrayOutputStream, PrintStream}
import java.nio.charset.StandardCharsets
import kafka.server.BaseRequestTest
import kafka.utils.Exit
import kafka.utils.TestUtils
import org.junit.jupiter.api.Assertions._
import org.junit.jupiter.params.ParameterizedTest
import org.junit.jupiter.params.provider.ValueSource
class UserScramCredentialsCommandTest extends BaseRequestTest {
override def brokerCount = 1
var exitStatus: Option[Int] = None
var exitMessage: Option[String] = None
case class ConfigCommandResult(stdout: String, exitStatus: Option[Int] = None)
private def runConfigCommandViaBroker(args: Array[String]) : ConfigCommandResult = {
val byteArrayOutputStream = new ByteArrayOutputStream()
val utf8 = StandardCharsets.UTF_8.name
val printStream = new PrintStream(byteArrayOutputStream, true, utf8)
var exitStatus: Option[Int] = None
Exit.setExitProcedure { (status, _) =>
exitStatus = Some(status)
throw new RuntimeException
}
val commandArgs = Array("--bootstrap-server", bootstrapServers()) ++ args
try {
Console.withOut(printStream) {
ConfigCommand.main(commandArgs)
}
ConfigCommandResult(byteArrayOutputStream.toString(utf8))
} catch {
case e: Exception => {
debug(s"Exception running ConfigCommand ${commandArgs.mkString(" ")}", e)
ConfigCommandResult("", exitStatus)
}
} finally {
printStream.close
Exit.resetExitProcedure()
}
}
@ParameterizedTest
@ValueSource(strings = Array("kraft", "zk"))
def testUserScramCredentialsRequests(quorum: String): Unit = {
val user1 = "user1"
// create and describe a credential
var result = runConfigCommandViaBroker(Array("--user", user1, "--alter", "--add-config", "SCRAM-SHA-256=[iterations=4096,password=foo-secret]"))
val alterConfigsUser1Out = s"Completed updating config for user $user1.\n"
assertEquals(alterConfigsUser1Out, result.stdout)
val scramCredentialConfigsUser1Out = s"SCRAM credential configs for user-principal '$user1' are SCRAM-SHA-256=iterations=4096\n"
TestUtils.waitUntilTrue(() => runConfigCommandViaBroker(Array("--user", user1, "--describe")).stdout ==
scramCredentialConfigsUser1Out, s"Failed to describe SCRAM credential change '$user1'")
// create a user quota and describe the user again
result = runConfigCommandViaBroker(Array("--user", user1, "--alter", "--add-config", "consumer_byte_rate=20000"))
assertEquals(alterConfigsUser1Out, result.stdout)
val quotaConfigsUser1Out = s"Quota configs for user-principal '$user1' are consumer_byte_rate=20000.0\n"
TestUtils.waitUntilTrue(() => runConfigCommandViaBroker(Array("--user", user1, "--describe")).stdout ==
s"$quotaConfigsUser1Out$scramCredentialConfigsUser1Out", s"Failed to describe Quota change for '$user1'")
// now do the same thing for user2
val user2 = "user2"
// create and describe a credential
result = runConfigCommandViaBroker(Array("--user", user2, "--alter", "--add-config", "SCRAM-SHA-256=[iterations=4096,password=foo-secret]"))
val alterConfigsUser2Out = s"Completed updating config for user $user2.\n"
assertEquals(alterConfigsUser2Out, result.stdout)
val scramCredentialConfigsUser2Out = s"SCRAM credential configs for user-principal '$user2' are SCRAM-SHA-256=iterations=4096\n"
TestUtils.waitUntilTrue(() => runConfigCommandViaBroker(Array("--user", user2, "--describe")).stdout ==
scramCredentialConfigsUser2Out, s"Failed to describe SCRAM credential change '$user2'")
// create a user quota and describe the user again
result = runConfigCommandViaBroker(Array("--user", user2, "--alter", "--add-config", "consumer_byte_rate=20000"))
assertEquals(alterConfigsUser2Out, result.stdout)
val quotaConfigsUser2Out = s"Quota configs for user-principal '$user2' are consumer_byte_rate=20000.0\n"
TestUtils.waitUntilTrue(() => runConfigCommandViaBroker(Array("--user", user2, "--describe")).stdout ==
s"$quotaConfigsUser2Out$scramCredentialConfigsUser2Out", s"Failed to describe Quota change for '$user2'")
// describe both
result = runConfigCommandViaBroker(Array("--entity-type", "users", "--describe"))
// we don't know the order that quota or scram users come out, so we have 2 possibilities for each, 4 total
val quotaPossibilityAOut = s"$quotaConfigsUser1Out$quotaConfigsUser2Out"
val quotaPossibilityBOut = s"$quotaConfigsUser2Out$quotaConfigsUser1Out"
val scramPossibilityAOut = s"$scramCredentialConfigsUser1Out$scramCredentialConfigsUser2Out"
val scramPossibilityBOut = s"$scramCredentialConfigsUser2Out$scramCredentialConfigsUser1Out"
assertTrue(result.stdout.equals(s"$quotaPossibilityAOut$scramPossibilityAOut")
|| result.stdout.equals(s"$quotaPossibilityAOut$scramPossibilityBOut")
|| result.stdout.equals(s"$quotaPossibilityBOut$scramPossibilityAOut")
|| result.stdout.equals(s"$quotaPossibilityBOut$scramPossibilityBOut"))
// now delete configs, in opposite order, for user1 and user2, and describe
result = runConfigCommandViaBroker(Array("--user", user1, "--alter", "--delete-config", "consumer_byte_rate"))
assertEquals(alterConfigsUser1Out, result.stdout)
result = runConfigCommandViaBroker(Array("--user", user2, "--alter", "--delete-config", "SCRAM-SHA-256"))
assertEquals(alterConfigsUser2Out, result.stdout)
TestUtils.waitUntilTrue(() => runConfigCommandViaBroker(Array("--entity-type", "users", "--describe")).stdout ==
s"$quotaConfigsUser2Out$scramCredentialConfigsUser1Out", s"Failed to describe Quota change for '$user2'")
// now delete the rest of the configs, for user1 and user2, and describe
result = runConfigCommandViaBroker(Array("--user", user1, "--alter", "--delete-config", "SCRAM-SHA-256"))
assertEquals(alterConfigsUser1Out, result.stdout)
result = runConfigCommandViaBroker(Array("--user", user2, "--alter", "--delete-config", "consumer_byte_rate"))
assertEquals(alterConfigsUser2Out, result.stdout)
TestUtils.waitUntilTrue(() => runConfigCommandViaBroker(Array("--entity-type", "users", "--describe")).stdout == "",
s"Failed to describe All users deleted")
}
@ParameterizedTest
@ValueSource(strings = Array("kraft", "zk"))
def testAlterWithEmptyPassword(quorum: String): Unit = {
val user1 = "user1"
val result = runConfigCommandViaBroker(Array("--user", user1, "--alter", "--add-config", "SCRAM-SHA-256=[iterations=4096,password=]"))
assertTrue(result.exitStatus.isDefined, "Expected System.exit() to be called with an empty password")
assertEquals(1, result.exitStatus.get, "Expected empty password to cause failure with exit status=1")
}
@ParameterizedTest
@ValueSource(strings = Array("kraft", "zk"))
def testDescribeUnknownUser(quorum: String): Unit = {
val unknownUser = "unknownUser"
val result = runConfigCommandViaBroker(Array("--user", unknownUser, "--describe"))
assertTrue(result.exitStatus.isEmpty, "Expected System.exit() to not be called with an unknown user")
assertEquals("", result.stdout)
}
}