blob: f07bf84f43854f1f5797dc59f9b455ed6110efa0 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package kafka
import java.io.File
import java.nio.file.Files
import java.util
import kafka.server.KafkaConfig
import kafka.utils.Exit
import org.apache.kafka.common.config.types.Password
import org.apache.kafka.common.internals.FatalExitError
import org.junit.{After, Before, Test}
import org.junit.Assert._
import org.apache.kafka.common.config.internals.BrokerSecurityConfigs
class KafkaTest {
@Before
def setUp(): Unit = Exit.setExitProcedure((status, _) => throw new FatalExitError(status))
@After
def tearDown(): Unit = Exit.resetExitProcedure()
@Test
def testGetKafkaConfigFromArgs(): Unit = {
val propertiesFile = prepareDefaultConfig()
// We should load configuration file without any arguments
val config1 = KafkaConfig.fromProps(Kafka.getPropsFromArgs(Array(propertiesFile)))
assertEquals(1, config1.brokerId)
// We should be able to override given property on command line
val config2 = KafkaConfig.fromProps(Kafka.getPropsFromArgs(Array(propertiesFile, "--override", "broker.id=2")))
assertEquals(2, config2.brokerId)
// We should be also able to set completely new property
val config3 = KafkaConfig.fromProps(Kafka.getPropsFromArgs(Array(propertiesFile, "--override", "log.cleanup.policy=compact")))
assertEquals(1, config3.brokerId)
assertEquals(util.Arrays.asList("compact"), config3.logCleanupPolicy)
// We should be also able to set several properties
val config4 = KafkaConfig.fromProps(Kafka.getPropsFromArgs(Array(propertiesFile, "--override", "log.cleanup.policy=compact,delete", "--override", "broker.id=2")))
assertEquals(2, config4.brokerId)
assertEquals(util.Arrays.asList("compact","delete"), config4.logCleanupPolicy)
}
@Test(expected = classOf[FatalExitError])
def testGetKafkaConfigFromArgsNonArgsAtTheEnd(): Unit = {
val propertiesFile = prepareDefaultConfig()
KafkaConfig.fromProps(Kafka.getPropsFromArgs(Array(propertiesFile, "--override", "broker.id=1", "broker.id=2")))
}
@Test(expected = classOf[FatalExitError])
def testGetKafkaConfigFromArgsNonArgsOnly(): Unit = {
val propertiesFile = prepareDefaultConfig()
KafkaConfig.fromProps(Kafka.getPropsFromArgs(Array(propertiesFile, "broker.id=1", "broker.id=2")))
}
@Test(expected = classOf[FatalExitError])
def testGetKafkaConfigFromArgsNonArgsAtTheBegging(): Unit = {
val propertiesFile = prepareDefaultConfig()
KafkaConfig.fromProps(Kafka.getPropsFromArgs(Array(propertiesFile, "broker.id=1", "--override", "broker.id=2")))
}
@Test
def testKafkaSslPasswords(): Unit = {
val propertiesFile = prepareDefaultConfig()
val config = KafkaConfig.fromProps(Kafka.getPropsFromArgs(Array(propertiesFile, "--override", "ssl.keystore.password=keystore_password",
"--override", "ssl.key.password=key_password",
"--override", "ssl.truststore.password=truststore_password")))
assertEquals(Password.HIDDEN, config.getPassword(KafkaConfig.SslKeyPasswordProp).toString)
assertEquals(Password.HIDDEN, config.getPassword(KafkaConfig.SslKeystorePasswordProp).toString)
assertEquals(Password.HIDDEN, config.getPassword(KafkaConfig.SslTruststorePasswordProp).toString)
assertEquals("key_password", config.getPassword(KafkaConfig.SslKeyPasswordProp).value)
assertEquals("keystore_password", config.getPassword(KafkaConfig.SslKeystorePasswordProp).value)
assertEquals("truststore_password", config.getPassword(KafkaConfig.SslTruststorePasswordProp).value)
}
@Test
def testKafkaSslPasswordsWithSymbols(): Unit = {
val password = "=!#-+!?*/\"\'^%$=\\.,@:;="
val propertiesFile = prepareDefaultConfig()
val config = KafkaConfig.fromProps(Kafka.getPropsFromArgs(Array(propertiesFile,
"--override", "ssl.keystore.password=" + password,
"--override", "ssl.key.password=" + password,
"--override", "ssl.truststore.password=" + password)))
assertEquals(Password.HIDDEN, config.getPassword(KafkaConfig.SslKeyPasswordProp).toString)
assertEquals(Password.HIDDEN, config.getPassword(KafkaConfig.SslKeystorePasswordProp).toString)
assertEquals(Password.HIDDEN, config.getPassword(KafkaConfig.SslTruststorePasswordProp).toString)
assertEquals(password, config.getPassword(KafkaConfig.SslKeystorePasswordProp).value)
assertEquals(password, config.getPassword(KafkaConfig.SslKeyPasswordProp).value)
assertEquals(password, config.getPassword(KafkaConfig.SslTruststorePasswordProp).value)
}
@Test
def testConnectionsMaxReauthMsDefault(): Unit = {
val propertiesFile = prepareDefaultConfig()
val config = KafkaConfig.fromProps(Kafka.getPropsFromArgs(Array(propertiesFile)))
assertEquals(0L, config.valuesWithPrefixOverride("sasl_ssl.oauthbearer.").get(BrokerSecurityConfigs.CONNECTIONS_MAX_REAUTH_MS).asInstanceOf[Long])
}
@Test
def testConnectionsMaxReauthMsExplicit(): Unit = {
val propertiesFile = prepareDefaultConfig()
val expected = 3600000
val config = KafkaConfig.fromProps(Kafka.getPropsFromArgs(Array(propertiesFile, "--override", s"sasl_ssl.oauthbearer.connections.max.reauth.ms=${expected}")))
assertEquals(expected, config.valuesWithPrefixOverride("sasl_ssl.oauthbearer.").get(BrokerSecurityConfigs.CONNECTIONS_MAX_REAUTH_MS).asInstanceOf[Long])
}
def prepareDefaultConfig(): String = {
prepareConfig(Array("broker.id=1", "zookeeper.connect=somewhere"))
}
def prepareConfig(lines : Array[String]): String = {
val file = File.createTempFile("kafkatest", ".properties")
file.deleteOnExit()
val writer = Files.newOutputStream(file.toPath)
try {
lines.foreach { l =>
writer.write(l.getBytes)
writer.write("\n".getBytes)
}
file.getAbsolutePath
} finally writer.close()
}
}