blob: 9ee3d3289261aedf6c9435f6cf6d89768a4d5faa [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package kafka.server
import java.util.Properties
import kafka.api.{ApiVersion, KAFKA_0_8_2}
import kafka.message._
import kafka.utils.{CoreUtils, TestUtils}
import org.apache.kafka.common.config.ConfigException
import org.apache.kafka.common.protocol.SecurityProtocol
import org.junit.Assert._
import org.junit.Test
import org.scalatest.Assertions.intercept
class KafkaConfigTest {
@Test
def testLogRetentionTimeHoursProvided() {
val props = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect, port = 8181)
props.put(KafkaConfig.LogRetentionTimeHoursProp, "1")
val cfg = KafkaConfig.fromProps(props)
assertEquals(60L * 60L * 1000L, cfg.logRetentionTimeMillis)
}
@Test
def testLogRetentionTimeMinutesProvided() {
val props = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect, port = 8181)
props.put(KafkaConfig.LogRetentionTimeMinutesProp, "30")
val cfg = KafkaConfig.fromProps(props)
assertEquals(30 * 60L * 1000L, cfg.logRetentionTimeMillis)
}
@Test
def testLogRetentionTimeMsProvided() {
val props = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect, port = 8181)
props.put(KafkaConfig.LogRetentionTimeMillisProp, "1800000")
val cfg = KafkaConfig.fromProps(props)
assertEquals(30 * 60L * 1000L, cfg.logRetentionTimeMillis)
}
@Test
def testLogRetentionTimeNoConfigProvided() {
val props = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect, port = 8181)
val cfg = KafkaConfig.fromProps(props)
assertEquals(24 * 7 * 60L * 60L * 1000L, cfg.logRetentionTimeMillis)
}
@Test
def testLogRetentionTimeBothMinutesAndHoursProvided() {
val props = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect, port = 8181)
props.put(KafkaConfig.LogRetentionTimeMinutesProp, "30")
props.put(KafkaConfig.LogRetentionTimeHoursProp, "1")
val cfg = KafkaConfig.fromProps(props)
assertEquals( 30 * 60L * 1000L, cfg.logRetentionTimeMillis)
}
@Test
def testLogRetentionTimeBothMinutesAndMsProvided() {
val props = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect, port = 8181)
props.put(KafkaConfig.LogRetentionTimeMillisProp, "1800000")
props.put(KafkaConfig.LogRetentionTimeMinutesProp, "10")
val cfg = KafkaConfig.fromProps(props)
assertEquals( 30 * 60L * 1000L, cfg.logRetentionTimeMillis)
}
@Test
def testLogRetentionUnlimited() {
val props1 = TestUtils.createBrokerConfig(0,TestUtils.MockZkConnect, port = 8181)
val props2 = TestUtils.createBrokerConfig(0,TestUtils.MockZkConnect, port = 8181)
val props3 = TestUtils.createBrokerConfig(0,TestUtils.MockZkConnect, port = 8181)
val props4 = TestUtils.createBrokerConfig(0,TestUtils.MockZkConnect, port = 8181)
val props5 = TestUtils.createBrokerConfig(0,TestUtils.MockZkConnect, port = 8181)
props1.put("log.retention.ms", "-1")
props2.put("log.retention.minutes", "-1")
props3.put("log.retention.hours", "-1")
val cfg1 = KafkaConfig.fromProps(props1)
val cfg2 = KafkaConfig.fromProps(props2)
val cfg3 = KafkaConfig.fromProps(props3)
assertEquals("Should be -1", -1, cfg1.logRetentionTimeMillis)
assertEquals("Should be -1", -1, cfg2.logRetentionTimeMillis)
assertEquals("Should be -1", -1, cfg3.logRetentionTimeMillis)
props4.put("log.retention.ms", "-1")
props4.put("log.retention.minutes", "30")
val cfg4 = KafkaConfig.fromProps(props4)
assertEquals("Should be -1", -1, cfg4.logRetentionTimeMillis)
props5.put("log.retention.ms", "0")
intercept[IllegalArgumentException] {
val cfg5 = KafkaConfig.fromProps(props5)
}
}
@Test
def testLogRetentionValid {
val props1 = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect, port = 8181)
val props2 = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect, port = 8181)
val props3 = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect, port = 8181)
props1.put("log.retention.ms", "0")
props2.put("log.retention.minutes", "0")
props3.put("log.retention.hours", "0")
intercept[IllegalArgumentException] {
val cfg1 = KafkaConfig.fromProps(props1)
}
intercept[IllegalArgumentException] {
val cfg2 = KafkaConfig.fromProps(props2)
}
intercept[IllegalArgumentException] {
val cfg3 = KafkaConfig.fromProps(props3)
}
}
@Test
def testAdvertiseDefaults() {
val port = "9999"
val hostName = "fake-host"
val props = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect)
props.remove(KafkaConfig.ListenersProp)
props.put(KafkaConfig.HostNameProp, hostName)
props.put(KafkaConfig.PortProp, port)
val serverConfig = KafkaConfig.fromProps(props)
val endpoints = serverConfig.advertisedListeners
val endpoint = endpoints.get(SecurityProtocol.PLAINTEXT).get
assertEquals(endpoint.host, hostName)
assertEquals(endpoint.port, port.toInt)
}
@Test
def testAdvertiseConfigured() {
val advertisedHostName = "routable-host"
val advertisedPort = "1234"
val props = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect)
props.put(KafkaConfig.AdvertisedHostNameProp, advertisedHostName)
props.put(KafkaConfig.AdvertisedPortProp, advertisedPort)
val serverConfig = KafkaConfig.fromProps(props)
val endpoints = serverConfig.advertisedListeners
val endpoint = endpoints.get(SecurityProtocol.PLAINTEXT).get
assertEquals(endpoint.host, advertisedHostName)
assertEquals(endpoint.port, advertisedPort.toInt)
}
@Test
def testAdvertisePortDefault() {
val advertisedHostName = "routable-host"
val port = "9999"
val props = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect)
props.put(KafkaConfig.AdvertisedHostNameProp, advertisedHostName)
props.put(KafkaConfig.PortProp, port)
val serverConfig = KafkaConfig.fromProps(props)
val endpoints = serverConfig.advertisedListeners
val endpoint = endpoints.get(SecurityProtocol.PLAINTEXT).get
assertEquals(endpoint.host, advertisedHostName)
assertEquals(endpoint.port, port.toInt)
}
@Test
def testAdvertiseHostNameDefault() {
val hostName = "routable-host"
val advertisedPort = "9999"
val props = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect)
props.put(KafkaConfig.HostNameProp, hostName)
props.put(KafkaConfig.AdvertisedPortProp, advertisedPort)
val serverConfig = KafkaConfig.fromProps(props)
val endpoints = serverConfig.advertisedListeners
val endpoint = endpoints.get(SecurityProtocol.PLAINTEXT).get
assertEquals(endpoint.host, hostName)
assertEquals(endpoint.port, advertisedPort.toInt)
}
@Test
def testDuplicateListeners() {
val props = new Properties()
props.put(KafkaConfig.BrokerIdProp, "1")
props.put(KafkaConfig.ZkConnectProp, "localhost:2181")
// listeners with duplicate port
props.put(KafkaConfig.ListenersProp, "PLAINTEXT://localhost:9091,TRACE://localhost:9091")
assert(!isValidKafkaConfig(props))
// listeners with duplicate protocol
props.put(KafkaConfig.ListenersProp, "PLAINTEXT://localhost:9091,PLAINTEXT://localhost:9092")
assert(!isValidKafkaConfig(props))
// advertised listeners with duplicate port
props.put(KafkaConfig.AdvertisedListenersProp, "PLAINTEXT://localhost:9091,TRACE://localhost:9091")
assert(!isValidKafkaConfig(props))
}
@Test
def testBadListenerProtocol() {
val props = new Properties()
props.put(KafkaConfig.BrokerIdProp, "1")
props.put(KafkaConfig.ZkConnectProp, "localhost:2181")
props.put(KafkaConfig.ListenersProp, "BAD://localhost:9091")
assert(!isValidKafkaConfig(props))
}
@Test
def testCaseInsensitiveListenerProtocol() {
val props = new Properties()
props.put(KafkaConfig.BrokerIdProp, "1")
props.put(KafkaConfig.ZkConnectProp, "localhost:2181")
props.put(KafkaConfig.ListenersProp, "plaintext://localhost:9091,SsL://localhost:9092")
assert(isValidKafkaConfig(props))
}
@Test
def testListenerDefaults() {
val props = new Properties()
props.put(KafkaConfig.BrokerIdProp, "1")
props.put(KafkaConfig.ZkConnectProp, "localhost:2181")
// configuration with host and port, but no listeners
props.put(KafkaConfig.HostNameProp, "myhost")
props.put(KafkaConfig.PortProp, "1111")
val conf = KafkaConfig.fromProps(props)
assertEquals(CoreUtils.listenerListToEndPoints("PLAINTEXT://myhost:1111"), conf.listeners)
// configuration with null host
props.remove(KafkaConfig.HostNameProp)
val conf2 = KafkaConfig.fromProps(props)
assertEquals(CoreUtils.listenerListToEndPoints("PLAINTEXT://:1111"), conf2.listeners)
assertEquals(CoreUtils.listenerListToEndPoints("PLAINTEXT://:1111"), conf2.advertisedListeners)
assertEquals(null, conf2.listeners(SecurityProtocol.PLAINTEXT).host)
// configuration with advertised host and port, and no advertised listeners
props.put(KafkaConfig.AdvertisedHostNameProp, "otherhost")
props.put(KafkaConfig.AdvertisedPortProp, "2222")
val conf3 = KafkaConfig.fromProps(props)
assertEquals(conf3.advertisedListeners, CoreUtils.listenerListToEndPoints("PLAINTEXT://otherhost:2222"))
}
@Test
def testVersionConfiguration() {
val props = new Properties()
props.put(KafkaConfig.BrokerIdProp, "1")
props.put(KafkaConfig.ZkConnectProp, "localhost:2181")
val conf = KafkaConfig.fromProps(props)
assertEquals(ApiVersion.latestVersion, conf.interBrokerProtocolVersion)
props.put(KafkaConfig.InterBrokerProtocolVersionProp, "0.8.2.0")
// We need to set the message format version to make the configuration valid.
props.put(KafkaConfig.LogMessageFormatVersionProp, "0.8.2.0")
val conf2 = KafkaConfig.fromProps(props)
assertEquals(KAFKA_0_8_2, conf2.interBrokerProtocolVersion)
// check that 0.8.2.0 is the same as 0.8.2.1
props.put(KafkaConfig.InterBrokerProtocolVersionProp, "0.8.2.1")
// We need to set the message format version to make the configuration valid
props.put(KafkaConfig.LogMessageFormatVersionProp, "0.8.2.1")
val conf3 = KafkaConfig.fromProps(props)
assertEquals(KAFKA_0_8_2, conf3.interBrokerProtocolVersion)
//check that latest is newer than 0.8.2
assert(ApiVersion.latestVersion >= conf3.interBrokerProtocolVersion)
}
private def isValidKafkaConfig(props: Properties): Boolean = {
try {
KafkaConfig.fromProps(props)
true
} catch {
case e: IllegalArgumentException => false
}
}
@Test
def testUncleanLeaderElectionDefault() {
val props = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect, port = 8181)
val serverConfig = KafkaConfig.fromProps(props)
assertEquals(serverConfig.uncleanLeaderElectionEnable, true)
}
@Test
def testUncleanElectionDisabled() {
val props = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect, port = 8181)
props.put(KafkaConfig.UncleanLeaderElectionEnableProp, String.valueOf(false))
val serverConfig = KafkaConfig.fromProps(props)
assertEquals(serverConfig.uncleanLeaderElectionEnable, false)
}
@Test
def testUncleanElectionEnabled() {
val props = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect, port = 8181)
props.put(KafkaConfig.UncleanLeaderElectionEnableProp, String.valueOf(true))
val serverConfig = KafkaConfig.fromProps(props)
assertEquals(serverConfig.uncleanLeaderElectionEnable, true)
}
@Test
def testUncleanElectionInvalid() {
val props = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect, port = 8181)
props.put(KafkaConfig.UncleanLeaderElectionEnableProp, "invalid")
intercept[ConfigException] {
KafkaConfig.fromProps(props)
}
}
@Test
def testLogRollTimeMsProvided() {
val props = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect, port = 8181)
props.put(KafkaConfig.LogRollTimeMillisProp, "1800000")
val cfg = KafkaConfig.fromProps(props)
assertEquals(30 * 60L * 1000L, cfg.logRollTimeMillis)
}
@Test
def testLogRollTimeBothMsAndHoursProvided() {
val props = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect, port = 8181)
props.put(KafkaConfig.LogRollTimeMillisProp, "1800000")
props.put(KafkaConfig.LogRollTimeHoursProp, "1")
val cfg = KafkaConfig.fromProps(props)
assertEquals( 30 * 60L * 1000L, cfg.logRollTimeMillis)
}
@Test
def testLogRollTimeNoConfigProvided() {
val props = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect, port = 8181)
val cfg = KafkaConfig.fromProps(props)
assertEquals(24 * 7 * 60L * 60L * 1000L, cfg.logRollTimeMillis )
}
@Test
def testDefaultCompressionType() {
val props = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect, port = 8181)
val serverConfig = KafkaConfig.fromProps(props)
assertEquals(serverConfig.compressionType, "producer")
}
@Test
def testValidCompressionType() {
val props = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect, port = 8181)
props.put("compression.type", "gzip")
val serverConfig = KafkaConfig.fromProps(props)
assertEquals(serverConfig.compressionType, "gzip")
}
@Test
def testInvalidCompressionType() {
val props = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect, port = 8181)
props.put(KafkaConfig.CompressionTypeProp, "abc")
intercept[IllegalArgumentException] {
KafkaConfig.fromProps(props)
}
}
@Test
def testInvalidInterBrokerSecurityProtocol() {
val props = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect, port = 8181)
props.put(KafkaConfig.ListenersProp, "SSL://localhost:0")
props.put(KafkaConfig.InterBrokerSecurityProtocolProp, SecurityProtocol.PLAINTEXT.toString)
intercept[IllegalArgumentException] {
KafkaConfig.fromProps(props)
}
}
@Test
def testEqualAdvertisedListenersProtocol() {
val props = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect, port = 8181)
props.put(KafkaConfig.ListenersProp, "PLAINTEXT://localhost:9092,SSL://localhost:9093")
props.put(KafkaConfig.AdvertisedListenersProp, "PLAINTEXT://localhost:9092,SSL://localhost:9093")
KafkaConfig.fromProps(props)
}
@Test
def testInvalidAdvertisedListenersProtocol() {
val props = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect, port = 8181)
props.put(KafkaConfig.ListenersProp, "TRACE://localhost:9091,SSL://localhost:9093")
props.put(KafkaConfig.AdvertisedListenersProp, "PLAINTEXT://localhost:9092")
intercept[IllegalArgumentException] {
KafkaConfig.fromProps(props)
}
}
@Test
def testFromPropsInvalid() {
def getBaseProperties(): Properties = {
val validRequiredProperties = new Properties()
validRequiredProperties.put(KafkaConfig.ZkConnectProp, "127.0.0.1:2181")
validRequiredProperties
}
// to ensure a basis is valid - bootstraps all needed validation
KafkaConfig.fromProps(getBaseProperties())
KafkaConfig.configNames().foreach(name => {
name match {
case KafkaConfig.ZkConnectProp => // ignore string
case KafkaConfig.ZkSessionTimeoutMsProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number")
case KafkaConfig.ZkConnectionTimeoutMsProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number")
case KafkaConfig.ZkSyncTimeMsProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number")
case KafkaConfig.ZkEnableSecureAclsProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_boolean")
case KafkaConfig.BrokerIdProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number")
case KafkaConfig.NumNetworkThreadsProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number", "0")
case KafkaConfig.NumIoThreadsProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number", "0")
case KafkaConfig.BackgroundThreadsProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number", "0")
case KafkaConfig.QueuedMaxRequestsProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number", "0")
case KafkaConfig.RequestTimeoutMsProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number")
case KafkaConfig.AuthorizerClassNameProp => //ignore string
case KafkaConfig.PortProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number")
case KafkaConfig.HostNameProp => // ignore string
case KafkaConfig.AdvertisedHostNameProp => //ignore string
case KafkaConfig.AdvertisedPortProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number")
case KafkaConfig.SocketSendBufferBytesProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number")
case KafkaConfig.SocketReceiveBufferBytesProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number")
case KafkaConfig.MaxConnectionsPerIpOverridesProp =>
assertPropertyInvalid(getBaseProperties(), name, "127.0.0.1:not_a_number")
case KafkaConfig.ConnectionsMaxIdleMsProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number")
case KafkaConfig.NumPartitionsProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number", "0")
case KafkaConfig.LogDirsProp => // ignore string
case KafkaConfig.LogDirProp => // ignore string
case KafkaConfig.LogSegmentBytesProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number", Message.MinMessageOverhead - 1)
case KafkaConfig.LogRollTimeMillisProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number", "0")
case KafkaConfig.LogRollTimeHoursProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number", "0")
case KafkaConfig.LogRetentionTimeMillisProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number", "0")
case KafkaConfig.LogRetentionTimeMinutesProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number", "0")
case KafkaConfig.LogRetentionTimeHoursProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number", "0")
case KafkaConfig.LogRetentionBytesProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number")
case KafkaConfig.LogCleanupIntervalMsProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number", "0")
case KafkaConfig.LogCleanupPolicyProp => assertPropertyInvalid(getBaseProperties(), name, "unknown_policy", "0")
case KafkaConfig.LogCleanerIoMaxBytesPerSecondProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number")
case KafkaConfig.LogCleanerDedupeBufferSizeProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number", "1024")
case KafkaConfig.LogCleanerDedupeBufferLoadFactorProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number")
case KafkaConfig.LogCleanerEnableProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_boolean")
case KafkaConfig.LogCleanerDeleteRetentionMsProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number")
case KafkaConfig.LogCleanerMinCompactionLagMsProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number")
case KafkaConfig.LogCleanerMinCleanRatioProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number")
case KafkaConfig.LogIndexSizeMaxBytesProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number", "3")
case KafkaConfig.LogFlushIntervalMessagesProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number", "0")
case KafkaConfig.LogFlushSchedulerIntervalMsProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number")
case KafkaConfig.LogFlushIntervalMsProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number")
case KafkaConfig.NumRecoveryThreadsPerDataDirProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number", "0")
case KafkaConfig.AutoCreateTopicsEnableProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_boolean", "0")
case KafkaConfig.MinInSyncReplicasProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number", "0")
case KafkaConfig.ControllerSocketTimeoutMsProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number")
case KafkaConfig.DefaultReplicationFactorProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number")
case KafkaConfig.ReplicaLagTimeMaxMsProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number")
case KafkaConfig.ReplicaSocketTimeoutMsProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number", "-2")
case KafkaConfig.ReplicaSocketReceiveBufferBytesProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number")
case KafkaConfig.ReplicaFetchMaxBytesProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number")
case KafkaConfig.ReplicaFetchWaitMaxMsProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number")
case KafkaConfig.ReplicaFetchMinBytesProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number")
case KafkaConfig.NumReplicaFetchersProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number")
case KafkaConfig.ReplicaHighWatermarkCheckpointIntervalMsProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number")
case KafkaConfig.FetchPurgatoryPurgeIntervalRequestsProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number")
case KafkaConfig.ProducerPurgatoryPurgeIntervalRequestsProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number")
case KafkaConfig.AutoLeaderRebalanceEnableProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_boolean", "0")
case KafkaConfig.LeaderImbalancePerBrokerPercentageProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number")
case KafkaConfig.LeaderImbalanceCheckIntervalSecondsProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number")
case KafkaConfig.UncleanLeaderElectionEnableProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_boolean", "0")
case KafkaConfig.ControlledShutdownMaxRetriesProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number")
case KafkaConfig.ControlledShutdownRetryBackoffMsProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number")
case KafkaConfig.ControlledShutdownEnableProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_boolean", "0")
case KafkaConfig.GroupMinSessionTimeoutMsProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number")
case KafkaConfig.GroupMaxSessionTimeoutMsProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number")
case KafkaConfig.OffsetMetadataMaxSizeProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number")
case KafkaConfig.OffsetsLoadBufferSizeProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number", "0")
case KafkaConfig.OffsetsTopicReplicationFactorProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number", "0")
case KafkaConfig.OffsetsTopicPartitionsProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number", "0")
case KafkaConfig.OffsetsTopicSegmentBytesProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number", "0")
case KafkaConfig.OffsetsTopicCompressionCodecProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number", "-1")
case KafkaConfig.OffsetsRetentionMinutesProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number", "0")
case KafkaConfig.OffsetsRetentionCheckIntervalMsProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number", "0")
case KafkaConfig.OffsetCommitTimeoutMsProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number", "0")
case KafkaConfig.OffsetCommitRequiredAcksProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number", "-2")
case KafkaConfig.ProducerQuotaBytesPerSecondDefaultProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number", "0")
case KafkaConfig.ConsumerQuotaBytesPerSecondDefaultProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number", "0")
case KafkaConfig.NumQuotaSamplesProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number", "0")
case KafkaConfig.QuotaWindowSizeSecondsProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_number", "0")
case KafkaConfig.DeleteTopicEnableProp => assertPropertyInvalid(getBaseProperties(), name, "not_a_boolean", "0")
case KafkaConfig.MetricNumSamplesProp => assertPropertyInvalid(getBaseProperties, name, "not_a_number", "-1", "0")
case KafkaConfig.MetricSampleWindowMsProp => assertPropertyInvalid(getBaseProperties, name, "not_a_number", "-1", "0")
case KafkaConfig.MetricReporterClassesProp => // ignore string
case KafkaConfig.RackProp => // ignore string
//SSL Configs
case KafkaConfig.PrincipalBuilderClassProp =>
case KafkaConfig.SslProtocolProp => // ignore string
case KafkaConfig.SslProviderProp => // ignore string
case KafkaConfig.SslEnabledProtocolsProp =>
case KafkaConfig.SslKeystoreTypeProp => // ignore string
case KafkaConfig.SslKeystoreLocationProp => // ignore string
case KafkaConfig.SslKeystorePasswordProp => // ignore string
case KafkaConfig.SslKeyPasswordProp => // ignore string
case KafkaConfig.SslTruststoreTypeProp => // ignore string
case KafkaConfig.SslTruststorePasswordProp => // ignore string
case KafkaConfig.SslTruststoreLocationProp => // ignore string
case KafkaConfig.SslKeyManagerAlgorithmProp =>
case KafkaConfig.SslTrustManagerAlgorithmProp =>
case KafkaConfig.SslClientAuthProp => // ignore string
case KafkaConfig.SslEndpointIdentificationAlgorithmProp => // ignore string
case KafkaConfig.SslSecureRandomImplementationProp => // ignore string
case KafkaConfig.SslCipherSuitesProp => // ignore string
//Sasl Configs
case KafkaConfig.SaslMechanismInterBrokerProtocolProp => // ignore
case KafkaConfig.SaslEnabledMechanismsProp =>
case KafkaConfig.SaslKerberosServiceNameProp => // ignore string
case KafkaConfig.SaslKerberosKinitCmdProp =>
case KafkaConfig.SaslKerberosTicketRenewWindowFactorProp =>
case KafkaConfig.SaslKerberosTicketRenewJitterProp =>
case KafkaConfig.SaslKerberosMinTimeBeforeReloginProp =>
case KafkaConfig.SaslKerberosPrincipalToLocalRulesProp => // ignore string
case nonNegativeIntProperty => assertPropertyInvalid(getBaseProperties(), name, "not_a_number", "-1")
}
})
}
@Test
def testSpecificProperties(): Unit = {
val defaults = new Properties()
defaults.put(KafkaConfig.ZkConnectProp, "127.0.0.1:2181")
// For ZkConnectionTimeoutMs
defaults.put(KafkaConfig.ZkSessionTimeoutMsProp, "1234")
defaults.put(KafkaConfig.BrokerIdGenerationEnableProp, "false")
defaults.put(KafkaConfig.MaxReservedBrokerIdProp, "1")
defaults.put(KafkaConfig.BrokerIdProp, "1")
defaults.put(KafkaConfig.HostNameProp, "127.0.0.1")
defaults.put(KafkaConfig.PortProp, "1122")
defaults.put(KafkaConfig.MaxConnectionsPerIpOverridesProp, "127.0.0.1:2, 127.0.0.2:3")
defaults.put(KafkaConfig.LogDirProp, "/tmp1,/tmp2")
defaults.put(KafkaConfig.LogRollTimeHoursProp, "12")
defaults.put(KafkaConfig.LogRollTimeJitterHoursProp, "11")
defaults.put(KafkaConfig.LogRetentionTimeHoursProp, "10")
//For LogFlushIntervalMsProp
defaults.put(KafkaConfig.LogFlushSchedulerIntervalMsProp, "123")
defaults.put(KafkaConfig.OffsetsTopicCompressionCodecProp, SnappyCompressionCodec.codec.toString)
val config = KafkaConfig.fromProps(defaults)
assertEquals("127.0.0.1:2181", config.zkConnect)
assertEquals(1234, config.zkConnectionTimeoutMs)
assertEquals(false, config.brokerIdGenerationEnable)
assertEquals(1, config.maxReservedBrokerId)
assertEquals(1, config.brokerId)
assertEquals("127.0.0.1", config.hostName)
assertEquals(1122, config.advertisedPort)
assertEquals("127.0.0.1", config.advertisedHostName)
assertEquals(Map("127.0.0.1" -> 2, "127.0.0.2" -> 3), config.maxConnectionsPerIpOverrides)
assertEquals(List("/tmp1", "/tmp2"), config.logDirs)
assertEquals(12 * 60L * 1000L * 60, config.logRollTimeMillis)
assertEquals(11 * 60L * 1000L * 60, config.logRollTimeJitterMillis)
assertEquals(10 * 60L * 1000L * 60, config.logRetentionTimeMillis)
assertEquals(123L, config.logFlushIntervalMs)
assertEquals(SnappyCompressionCodec, config.offsetsTopicCompressionCodec)
}
private def assertPropertyInvalid(validRequiredProps: => Properties, name: String, values: Any*) {
values.foreach((value) => {
val props = validRequiredProps
props.setProperty(name, value.toString)
intercept[Exception] {
KafkaConfig.fromProps(props)
}
})
}
}