KAFKA-6680: Fix issues related to Dynamic Broker configs (#4731)
- Fix kafkaConfig initialization if there are no dynamic configs defined in ZK.
- Update DynamicListenerConfig.validateReconfiguration() to check new Listeners must be subset of listener map
Reviewers: Rajini Sivaram <rajinisivaram@googlemail.com>
diff --git a/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala b/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala
index 3236af0..92fd5d7 100755
--- a/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala
+++ b/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala
@@ -137,6 +137,7 @@
private val dynamicConfigPasswordEncoder = maybeCreatePasswordEncoder(kafkaConfig.passwordEncoderSecret)
private[server] def initialize(zkClient: KafkaZkClient): Unit = {
+ currentConfig = new KafkaConfig(kafkaConfig.props, false, None)
val adminZkClient = new AdminZkClient(zkClient)
updateDefaultConfig(adminZkClient.fetchEntityConfig(ConfigType.Broker, ConfigEntityName.Default))
val props = adminZkClient.fetchEntityConfig(ConfigType.Broker, kafkaConfig.brokerId.toString)
@@ -719,8 +720,8 @@
val oldListeners = listenersToMap(oldConfig.listeners)
if (!newAdvertisedListeners.keySet.subsetOf(newListeners.keySet))
throw new ConfigException(s"Advertised listeners '$newAdvertisedListeners' must be a subset of listeners '$newListeners'")
- if (newListeners.keySet != newConfig.listenerSecurityProtocolMap.keySet)
- throw new ConfigException(s"Listeners '$newListeners' and listener map '${newConfig.listenerSecurityProtocolMap}' don't match")
+ if (!newListeners.keySet.subsetOf(newConfig.listenerSecurityProtocolMap.keySet))
+ throw new ConfigException(s"Listeners '$newListeners' must be subset of listener map '${newConfig.listenerSecurityProtocolMap}'")
newListeners.keySet.intersect(oldListeners.keySet).foreach { listenerName =>
val prefix = listenerName.configPrefix
val newListenerProps = immutableListenerConfigs(newConfig, prefix)
diff --git a/core/src/test/scala/unit/kafka/server/DynamicBrokerConfigTest.scala b/core/src/test/scala/unit/kafka/server/DynamicBrokerConfigTest.scala
index 2e3e274..bca98d2 100755
--- a/core/src/test/scala/unit/kafka/server/DynamicBrokerConfigTest.scala
+++ b/core/src/test/scala/unit/kafka/server/DynamicBrokerConfigTest.scala
@@ -24,10 +24,12 @@
import org.apache.kafka.common.Reconfigurable
import org.apache.kafka.common.config.types.Password
import org.apache.kafka.common.config.{ConfigException, SslConfigs}
+import org.easymock.EasyMock
import org.junit.Assert._
import org.junit.Test
import scala.collection.JavaConverters._
+import scala.collection.Set
class DynamicBrokerConfigTest {
@@ -248,4 +250,56 @@
newConfigWithNewSecret.dynamicConfig.updateBrokerConfig(0, persistedProps)
assertEquals("staticLoginModule required;", newConfigWithNewSecret.values.get(KafkaConfig.SaslJaasConfigProp).asInstanceOf[Password].value)
}
+
+ @Test
+ def testDynamicListenerConfig(): Unit = {
+ val props = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect, port = 9092)
+ val oldConfig = KafkaConfig.fromProps(props)
+ val kafkaServer = EasyMock.createMock(classOf[kafka.server.KafkaServer])
+ EasyMock.expect(kafkaServer.config).andReturn(oldConfig).anyTimes()
+ EasyMock.replay(kafkaServer)
+
+ props.put(KafkaConfig.ListenersProp, "PLAINTEXT://hostname:9092,SASL_PLAINTEXT://hostname:9093")
+ val newConfig = KafkaConfig(props)
+
+ val dynamicListenerConfig = new DynamicListenerConfig(kafkaServer)
+ dynamicListenerConfig.validateReconfiguration(newConfig)
+ }
+
+ @Test
+ def testDynamicConfigInitializationWithoutConfigsInZK(): Unit = {
+ val zkClient = EasyMock.createMock(classOf[kafka.zk.KafkaZkClient])
+ EasyMock.expect(zkClient.getEntityConfigs(EasyMock.anyString(), EasyMock.anyString())).andReturn(new java.util.Properties()).anyTimes()
+ EasyMock.replay(zkClient)
+
+ val oldConfig = KafkaConfig.fromProps(TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect, port = 9092))
+ val dynamicBrokerConfig = new DynamicBrokerConfig(oldConfig)
+ dynamicBrokerConfig.initialize(zkClient)
+ dynamicBrokerConfig.addBrokerReconfigurable(new TestDynamicThreadPool)
+
+ val newprops = new Properties()
+ newprops.put(KafkaConfig.NumIoThreadsProp, "10")
+ newprops.put(KafkaConfig.BackgroundThreadsProp, "100")
+ dynamicBrokerConfig.updateBrokerConfig(0, newprops)
+ }
}
+
+class TestDynamicThreadPool() extends BrokerReconfigurable {
+
+ override def reconfigurableConfigs: Set[String] = {
+ DynamicThreadPool.ReconfigurableConfigs
+ }
+
+ override def reconfigure(oldConfig: KafkaConfig, newConfig: KafkaConfig): Unit = {
+ assertEquals(Defaults.NumIoThreads, oldConfig.numIoThreads)
+ assertEquals(Defaults.BackgroundThreads, oldConfig.backgroundThreads)
+
+ assertEquals(10, newConfig.numIoThreads)
+ assertEquals(100, newConfig.backgroundThreads)
+ }
+
+ override def validateReconfiguration(newConfig: KafkaConfig): Unit = {
+ assertEquals(10, newConfig.numIoThreads)
+ assertEquals(100, newConfig.backgroundThreads)
+ }
+}
\ No newline at end of file