blob: ed6800bfcf1426a3fd2007ec2840b59102c60c69 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package kafka.server
import java.util
import java.util.{Collections, Properties}
import java.util.concurrent.CopyOnWriteArrayList
import java.util.concurrent.locks.ReentrantReadWriteLock
import kafka.cluster.EndPoint
import kafka.log.{LogCleaner, LogManager}
import kafka.network.{DataPlaneAcceptor, SocketServer}
import kafka.server.DynamicBrokerConfig._
import kafka.utils.{CoreUtils, Logging}
import kafka.utils.Implicits._
import kafka.zk.{AdminZkClient, KafkaZkClient}
import org.apache.kafka.common.Reconfigurable
import org.apache.kafka.common.config.{AbstractConfig, ConfigDef, ConfigException, SslConfigs}
import org.apache.kafka.common.metrics.{JmxReporter, Metrics, MetricsReporter}
import org.apache.kafka.common.config.types.Password
import org.apache.kafka.common.network.{ListenerName, ListenerReconfigurable}
import org.apache.kafka.common.security.authenticator.LoginManager
import org.apache.kafka.common.utils.{ConfigUtils, Utils}
import org.apache.kafka.coordinator.transaction.TransactionLogConfigs
import org.apache.kafka.security.PasswordEncoder
import org.apache.kafka.server.ProcessRole
import org.apache.kafka.server.config.{ConfigType, KafkaSecurityConfigs, ServerTopicConfigSynonyms, ZooKeeperInternals}
import org.apache.kafka.server.log.remote.storage.RemoteLogManagerConfig
import org.apache.kafka.server.metrics.ClientMetricsReceiverPlugin
import org.apache.kafka.server.telemetry.ClientTelemetry
import org.apache.kafka.storage.internals.log.{LogConfig, ProducerStateManagerConfig}
import scala.annotation.nowarn
import scala.collection._
import scala.jdk.CollectionConverters._
/**
* Dynamic broker configurations are stored in ZooKeeper and may be defined at two levels:
* <ul>
* <li>Per-broker configs persisted at <tt>/configs/brokers/{brokerId}</tt>: These can be described/altered
* using AdminClient using the resource name brokerId.</li>
* <li>Cluster-wide defaults persisted at <tt>/configs/brokers/&lt;default&gt;</tt>: These can be described/altered
* using AdminClient using an empty resource name.</li>
* </ul>
* The order of precedence for broker configs is:
* <ol>
* <li>DYNAMIC_BROKER_CONFIG: stored in ZK at /configs/brokers/{brokerId}</li>
* <li>DYNAMIC_DEFAULT_BROKER_CONFIG: stored in ZK at /configs/brokers/&lt;default&gt;</li>
* <li>STATIC_BROKER_CONFIG: properties that broker is started up with, typically from server.properties file</li>
* <li>DEFAULT_CONFIG: Default configs defined in KafkaConfig</li>
* </ol>
* Log configs use topic config overrides if defined and fallback to broker defaults using the order of precedence above.
* Topic config overrides may use a different config name from the default broker config.
* See [[org.apache.kafka.storage.internals.log.LogConfig#TopicConfigSynonyms]] for the mapping.
* <p>
* AdminClient returns all config synonyms in the order of precedence when configs are described with
* <code>includeSynonyms</code>. In addition to configs that may be defined with the same name at different levels,
* some configs have additional synonyms.
* </p>
* <ul>
* <li>Listener configs may be defined using the prefix <tt>listener.name.{listenerName}.{configName}</tt>. These may be
* configured as dynamic or static broker configs. Listener configs have higher precedence than the base configs
* that don't specify the listener name. Listeners without a listener config use the base config. Base configs
* may be defined only as STATIC_BROKER_CONFIG or DEFAULT_CONFIG and cannot be updated dynamically.<li>
* <li>Some configs may be defined using multiple properties. For example, <tt>log.roll.ms</tt> and
* <tt>log.roll.hours</tt> refer to the same config that may be defined in milliseconds or hours. The order of
* precedence of these synonyms is described in the docs of these configs in [[kafka.server.KafkaConfig]].</li>
* </ul>
*
*/
object DynamicBrokerConfig {
private[server] val DynamicSecurityConfigs = SslConfigs.RECONFIGURABLE_CONFIGS.asScala
private[server] val DynamicProducerStateManagerConfig = Set(TransactionLogConfigs.PRODUCER_ID_EXPIRATION_MS_CONFIG, TransactionLogConfigs.TRANSACTION_PARTITION_VERIFICATION_ENABLE_CONFIG)
val AllDynamicConfigs = DynamicSecurityConfigs ++
LogCleaner.ReconfigurableConfigs ++
DynamicLogConfig.ReconfigurableConfigs ++
DynamicThreadPool.ReconfigurableConfigs ++
Set(KafkaConfig.MetricReporterClassesProp) ++
DynamicListenerConfig.ReconfigurableConfigs ++
SocketServer.ReconfigurableConfigs ++
DynamicProducerStateManagerConfig ++
DynamicRemoteLogConfig.ReconfigurableConfigs
private val ClusterLevelListenerConfigs = Set(KafkaConfig.MaxConnectionsProp, KafkaConfig.MaxConnectionCreationRateProp, KafkaConfig.NumNetworkThreadsProp)
private val PerBrokerConfigs = (DynamicSecurityConfigs ++ DynamicListenerConfig.ReconfigurableConfigs).diff(
ClusterLevelListenerConfigs)
private val ListenerMechanismConfigs = Set(KafkaSecurityConfigs.SASL_JAAS_CONFIG,
KafkaSecurityConfigs.SASL_LOGIN_CALLBACK_HANDLER_CLASS_CONFIG,
KafkaSecurityConfigs.SASL_LOGIN_CLASS_CONFIG,
KafkaSecurityConfigs.SASL_SERVER_CALLBACK_HANDLER_CLASS_CONFIG,
KafkaSecurityConfigs.CONNECTIONS_MAX_REAUTH_MS_CONFIG)
private val ReloadableFileConfigs = Set(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG, SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG)
val ListenerConfigRegex = """listener\.name\.[^.]*\.(.*)""".r
private val DynamicPasswordConfigs = {
val passwordConfigs = KafkaConfig.configKeys.filter(_._2.`type` == ConfigDef.Type.PASSWORD).keySet
AllDynamicConfigs.intersect(passwordConfigs)
}
def isPasswordConfig(name: String): Boolean = DynamicBrokerConfig.DynamicPasswordConfigs.exists(name.endsWith)
def brokerConfigSynonyms(name: String, matchListenerOverride: Boolean): List[String] = {
name match {
case KafkaConfig.LogRollTimeMillisProp | KafkaConfig.LogRollTimeHoursProp =>
List(KafkaConfig.LogRollTimeMillisProp, KafkaConfig.LogRollTimeHoursProp)
case KafkaConfig.LogRollTimeJitterMillisProp | KafkaConfig.LogRollTimeJitterHoursProp =>
List(KafkaConfig.LogRollTimeJitterMillisProp, KafkaConfig.LogRollTimeJitterHoursProp)
case KafkaConfig.LogFlushIntervalMsProp => // LogFlushSchedulerIntervalMsProp is used as default
List(KafkaConfig.LogFlushIntervalMsProp, KafkaConfig.LogFlushSchedulerIntervalMsProp)
case KafkaConfig.LogRetentionTimeMillisProp | KafkaConfig.LogRetentionTimeMinutesProp | KafkaConfig.LogRetentionTimeHoursProp =>
List(KafkaConfig.LogRetentionTimeMillisProp, KafkaConfig.LogRetentionTimeMinutesProp, KafkaConfig.LogRetentionTimeHoursProp)
case ListenerConfigRegex(baseName) if matchListenerOverride =>
// `ListenerMechanismConfigs` are specified as listenerPrefix.mechanism.<configName>
// and other listener configs are specified as listenerPrefix.<configName>
// Add <configName> as a synonym in both cases.
val mechanismConfig = ListenerMechanismConfigs.find(baseName.endsWith)
List(name, mechanismConfig.getOrElse(baseName))
case _ => List(name)
}
}
def validateConfigs(props: Properties, perBrokerConfig: Boolean): Unit = {
def checkInvalidProps(invalidPropNames: Set[String], errorMessage: String): Unit = {
if (invalidPropNames.nonEmpty)
throw new ConfigException(s"$errorMessage: $invalidPropNames")
}
checkInvalidProps(nonDynamicConfigs(props), "Cannot update these configs dynamically")
checkInvalidProps(securityConfigsWithoutListenerPrefix(props),
"These security configs can be dynamically updated only per-listener using the listener prefix")
validateConfigTypes(props)
if (!perBrokerConfig) {
checkInvalidProps(perBrokerConfigs(props),
"Cannot update these configs at default cluster level, broker id must be specified")
}
}
private def perBrokerConfigs(props: Properties): Set[String] = {
val configNames = props.asScala.keySet
def perBrokerListenerConfig(name: String): Boolean = {
name match {
case ListenerConfigRegex(baseName) => !ClusterLevelListenerConfigs.contains(baseName)
case _ => false
}
}
configNames.intersect(PerBrokerConfigs) ++ configNames.filter(perBrokerListenerConfig)
}
private def nonDynamicConfigs(props: Properties): Set[String] = {
props.asScala.keySet.intersect(DynamicConfig.Broker.nonDynamicProps)
}
private def securityConfigsWithoutListenerPrefix(props: Properties): Set[String] = {
DynamicSecurityConfigs.filter(props.containsKey)
}
private def validateConfigTypes(props: Properties): Unit = {
val baseProps = new Properties
props.asScala.foreach {
case (ListenerConfigRegex(baseName), v) => baseProps.put(baseName, v)
case (k, v) => baseProps.put(k, v)
}
DynamicConfig.Broker.validate(baseProps)
}
private[server] def addDynamicConfigs(configDef: ConfigDef): Unit = {
KafkaConfig.configKeys.forKeyValue { (configName, config) =>
if (AllDynamicConfigs.contains(configName)) {
configDef.define(config.name, config.`type`, config.defaultValue, config.validator,
config.importance, config.documentation, config.group, config.orderInGroup, config.width,
config.displayName, config.dependents, config.recommender)
}
}
}
private[server] def dynamicConfigUpdateModes: util.Map[String, String] = {
AllDynamicConfigs.map { name =>
val mode = if (PerBrokerConfigs.contains(name)) "per-broker" else "cluster-wide"
name -> mode
}.toMap.asJava
}
private[server] def resolveVariableConfigs(propsOriginal: Properties): Properties = {
val props = new Properties
val config = new AbstractConfig(new ConfigDef(), propsOriginal, Utils.castToStringObjectMap(propsOriginal), false)
config.originals.forEach { (key, value) =>
if (!key.startsWith(AbstractConfig.CONFIG_PROVIDERS_CONFIG)) {
props.put(key, value)
}
}
props
}
}
class DynamicBrokerConfig(private val kafkaConfig: KafkaConfig) extends Logging {
private[server] val staticBrokerConfigs = ConfigDef.convertToStringMapWithPasswordValues(kafkaConfig.originalsFromThisConfig).asScala
private[server] val staticDefaultConfigs = ConfigDef.convertToStringMapWithPasswordValues(KafkaConfig.defaultValues.asJava).asScala
private val dynamicBrokerConfigs = mutable.Map[String, String]()
private val dynamicDefaultConfigs = mutable.Map[String, String]()
// Use COWArrayList to prevent concurrent modification exception when an item is added by one thread to these
// collections, while another thread is iterating over them.
private[server] val reconfigurables = new CopyOnWriteArrayList[Reconfigurable]()
private val brokerReconfigurables = new CopyOnWriteArrayList[BrokerReconfigurable]()
private val lock = new ReentrantReadWriteLock
private var metricsReceiverPluginOpt: Option[ClientMetricsReceiverPlugin] = _
private var currentConfig: KafkaConfig = _
private val dynamicConfigPasswordEncoder = if (kafkaConfig.processRoles.isEmpty) {
maybeCreatePasswordEncoder(kafkaConfig.passwordEncoderSecret)
} else {
Some(PasswordEncoder.NOOP)
}
private[server] def initialize(zkClientOpt: Option[KafkaZkClient], clientMetricsReceiverPluginOpt: Option[ClientMetricsReceiverPlugin]): Unit = {
currentConfig = new KafkaConfig(kafkaConfig.props, false, None)
metricsReceiverPluginOpt = clientMetricsReceiverPluginOpt
zkClientOpt.foreach { zkClient =>
val adminZkClient = new AdminZkClient(zkClient)
updateDefaultConfig(adminZkClient.fetchEntityConfig(ConfigType.BROKER, ZooKeeperInternals.DEFAULT_STRING), false)
val props = adminZkClient.fetchEntityConfig(ConfigType.BROKER, kafkaConfig.brokerId.toString)
val brokerConfig = maybeReEncodePasswords(props, adminZkClient)
updateBrokerConfig(kafkaConfig.brokerId, brokerConfig)
}
}
/**
* Clear all cached values. This is used to clear state on broker shutdown to avoid
* exceptions in tests when broker is restarted. These fields are re-initialized when
* broker starts up.
*/
private[server] def clear(): Unit = {
dynamicBrokerConfigs.clear()
dynamicDefaultConfigs.clear()
reconfigurables.clear()
brokerReconfigurables.clear()
}
/**
* Add reconfigurables to be notified when a dynamic broker config is updated.
*
* `Reconfigurable` is the public API used by configurable plugins like metrics reporter
* and quota callbacks. These are reconfigured before `KafkaConfig` is updated so that
* the update can be aborted if `reconfigure()` fails with an exception.
*
* `BrokerReconfigurable` is used for internal reconfigurable classes. These are
* reconfigured after `KafkaConfig` is updated so that they can access `KafkaConfig`
* directly. They are provided both old and new configs.
*/
def addReconfigurables(kafkaServer: KafkaBroker): Unit = {
kafkaServer.authorizer match {
case Some(authz: Reconfigurable) => addReconfigurable(authz)
case _ =>
}
addReconfigurable(kafkaServer.kafkaYammerMetrics)
addReconfigurable(new DynamicMetricsReporters(kafkaConfig.brokerId, kafkaServer.config, kafkaServer.metrics, kafkaServer.clusterId))
addReconfigurable(new DynamicClientQuotaCallback(kafkaServer.quotaManagers, kafkaServer.config))
addBrokerReconfigurable(new BrokerDynamicThreadPool(kafkaServer))
addBrokerReconfigurable(new DynamicLogConfig(kafkaServer.logManager, kafkaServer))
addBrokerReconfigurable(new DynamicListenerConfig(kafkaServer))
addBrokerReconfigurable(kafkaServer.socketServer)
addBrokerReconfigurable(new DynamicProducerStateManagerConfig(kafkaServer.logManager.producerStateManagerConfig))
addBrokerReconfigurable(new DynamicRemoteLogConfig(kafkaServer))
}
/**
* Add reconfigurables to be notified when a dynamic controller config is updated.
*/
def addReconfigurables(controller: ControllerServer): Unit = {
controller.authorizer match {
case Some(authz: Reconfigurable) => addReconfigurable(authz)
case _ =>
}
if (!kafkaConfig.processRoles.contains(ProcessRole.BrokerRole)) {
// only add these if the controller isn't also running the broker role
// because these would already be added via the broker in that case
addReconfigurable(controller.kafkaYammerMetrics)
addReconfigurable(new DynamicMetricsReporters(kafkaConfig.nodeId, controller.config, controller.metrics, controller.clusterId))
}
addReconfigurable(new DynamicClientQuotaCallback(controller.quotaManagers, controller.config))
addBrokerReconfigurable(new ControllerDynamicThreadPool(controller))
// TODO: addBrokerReconfigurable(new DynamicListenerConfig(controller))
addBrokerReconfigurable(controller.socketServer)
}
def addReconfigurable(reconfigurable: Reconfigurable): Unit = CoreUtils.inWriteLock(lock) {
verifyReconfigurableConfigs(reconfigurable.reconfigurableConfigs.asScala)
reconfigurables.add(reconfigurable)
}
def addBrokerReconfigurable(reconfigurable: BrokerReconfigurable): Unit = CoreUtils.inWriteLock(lock) {
verifyReconfigurableConfigs(reconfigurable.reconfigurableConfigs)
brokerReconfigurables.add(reconfigurable)
}
def removeReconfigurable(reconfigurable: Reconfigurable): Unit = CoreUtils.inWriteLock(lock) {
reconfigurables.remove(reconfigurable)
}
private def verifyReconfigurableConfigs(configNames: Set[String]): Unit = CoreUtils.inWriteLock(lock) {
val nonDynamic = configNames.filter(DynamicConfig.Broker.nonDynamicProps.contains)
require(nonDynamic.isEmpty, s"Reconfigurable contains non-dynamic configs $nonDynamic")
}
// Visibility for testing
private[server] def currentKafkaConfig: KafkaConfig = CoreUtils.inReadLock(lock) {
currentConfig
}
private[server] def currentDynamicBrokerConfigs: Map[String, String] = CoreUtils.inReadLock(lock) {
dynamicBrokerConfigs.clone()
}
private[server] def currentDynamicDefaultConfigs: Map[String, String] = CoreUtils.inReadLock(lock) {
dynamicDefaultConfigs.clone()
}
private[server] def clientMetricsReceiverPlugin: Option[ClientMetricsReceiverPlugin] = CoreUtils.inReadLock(lock) {
metricsReceiverPluginOpt
}
private[server] def updateBrokerConfig(brokerId: Int, persistentProps: Properties, doLog: Boolean = true): Unit = CoreUtils.inWriteLock(lock) {
try {
val props = fromPersistentProps(persistentProps, perBrokerConfig = true)
dynamicBrokerConfigs.clear()
dynamicBrokerConfigs ++= props.asScala
updateCurrentConfig(doLog)
} catch {
case e: Exception => error(s"Per-broker configs of $brokerId could not be applied: ${persistentProps.keys()}", e)
}
}
private[server] def updateDefaultConfig(persistentProps: Properties, doLog: Boolean = true): Unit = CoreUtils.inWriteLock(lock) {
try {
val props = fromPersistentProps(persistentProps, perBrokerConfig = false)
dynamicDefaultConfigs.clear()
dynamicDefaultConfigs ++= props.asScala
updateCurrentConfig(doLog)
} catch {
case e: Exception => error(s"Cluster default configs could not be applied: ${persistentProps.keys()}", e)
}
}
/**
* All config updates through ZooKeeper are triggered through actual changes in values stored in ZooKeeper.
* For some configs like SSL keystores and truststores, we also want to reload the store if it was modified
* in-place, even though the actual value of the file path and password haven't changed. This scenario alone
* is handled here when a config update request using admin client is processed by ZkAdminManager. If any of
* the SSL configs have changed, then the update will not be done here, but will be handled later when ZK
* changes are processed. At the moment, only listener configs are considered for reloading.
*/
private[server] def reloadUpdatedFilesWithoutConfigChange(newProps: Properties): Unit = CoreUtils.inWriteLock(lock) {
reconfigurables.asScala
.filter(reconfigurable => ReloadableFileConfigs.exists(reconfigurable.reconfigurableConfigs.contains))
.foreach {
case reconfigurable: ListenerReconfigurable =>
val kafkaProps = validatedKafkaProps(newProps, perBrokerConfig = true)
val newConfig = new KafkaConfig(kafkaProps.asJava, false, None)
processListenerReconfigurable(reconfigurable, newConfig, Collections.emptyMap(), validateOnly = false, reloadOnly = true)
case reconfigurable =>
trace(s"Files will not be reloaded without config change for $reconfigurable")
}
}
private def maybeCreatePasswordEncoder(secret: Option[Password]): Option[PasswordEncoder] = {
secret.map { secret =>
PasswordEncoder.encrypting(secret,
kafkaConfig.passwordEncoderKeyFactoryAlgorithm,
kafkaConfig.passwordEncoderCipherAlgorithm,
kafkaConfig.passwordEncoderKeyLength,
kafkaConfig.passwordEncoderIterations)
}
}
private def passwordEncoder: PasswordEncoder = {
dynamicConfigPasswordEncoder.getOrElse(throw new ConfigException("Password encoder secret not configured"))
}
private[server] def toPersistentProps(configProps: Properties, perBrokerConfig: Boolean): Properties = {
val props = configProps.clone().asInstanceOf[Properties]
def encodePassword(configName: String, value: String): Unit = {
if (value != null) {
if (!perBrokerConfig)
throw new ConfigException("Password config can be defined only at broker level")
props.setProperty(configName, passwordEncoder.encode(new Password(value)))
}
}
configProps.asScala.forKeyValue { (name, value) =>
if (isPasswordConfig(name))
encodePassword(name, value)
}
props
}
private[server] def fromPersistentProps(persistentProps: Properties,
perBrokerConfig: Boolean): Properties = {
val props = persistentProps.clone().asInstanceOf[Properties]
// Remove all invalid configs from `props`
removeInvalidConfigs(props, perBrokerConfig)
def removeInvalidProps(invalidPropNames: Set[String], errorMessage: String): Unit = {
if (invalidPropNames.nonEmpty) {
invalidPropNames.foreach(props.remove)
error(s"$errorMessage: $invalidPropNames")
}
}
removeInvalidProps(nonDynamicConfigs(props), "Non-dynamic configs configured in ZooKeeper will be ignored")
removeInvalidProps(securityConfigsWithoutListenerPrefix(props),
"Security configs can be dynamically updated only using listener prefix, base configs will be ignored")
if (!perBrokerConfig)
removeInvalidProps(perBrokerConfigs(props), "Per-broker configs defined at default cluster level will be ignored")
def decodePassword(configName: String, value: String): Unit = {
if (value != null) {
try {
props.setProperty(configName, passwordEncoder.decode(value).value)
} catch {
case e: Exception =>
error(s"Dynamic password config $configName could not be decoded, ignoring.", e)
props.remove(configName)
}
}
}
props.asScala.forKeyValue { (name, value) =>
if (isPasswordConfig(name))
decodePassword(name, value)
}
props
}
// If the secret has changed, password.encoder.old.secret contains the old secret that was used
// to encode the configs in ZK. Decode passwords using the old secret and update ZK with values
// encoded using the current secret. Ignore any errors during decoding since old secret may not
// have been removed during broker restart.
private def maybeReEncodePasswords(persistentProps: Properties, adminZkClient: AdminZkClient): Properties = {
val props = persistentProps.clone().asInstanceOf[Properties]
if (props.asScala.keySet.exists(isPasswordConfig)) {
maybeCreatePasswordEncoder(kafkaConfig.passwordEncoderOldSecret).foreach { passwordDecoder =>
persistentProps.asScala.forKeyValue { (configName, value) =>
if (isPasswordConfig(configName) && value != null) {
val decoded = try {
Some(passwordDecoder.decode(value).value)
} catch {
case _: Exception =>
debug(s"Dynamic password config $configName could not be decoded using old secret, new secret will be used.")
None
}
decoded.foreach(value => props.put(configName, passwordEncoder.encode(new Password(value))))
}
}
adminZkClient.changeBrokerConfig(Some(kafkaConfig.brokerId), props)
}
}
props
}
/**
* Validate the provided configs `propsOverride` and return the full Kafka configs with
* the configured defaults and these overrides.
*
* Note: The caller must acquire the read or write lock before invoking this method.
*/
private def validatedKafkaProps(propsOverride: Properties, perBrokerConfig: Boolean): Map[String, String] = {
val propsResolved = DynamicBrokerConfig.resolveVariableConfigs(propsOverride)
validateConfigs(propsResolved, perBrokerConfig)
val newProps = mutable.Map[String, String]()
newProps ++= staticBrokerConfigs
if (perBrokerConfig) {
overrideProps(newProps, dynamicDefaultConfigs)
overrideProps(newProps, propsResolved.asScala)
} else {
overrideProps(newProps, propsResolved.asScala)
overrideProps(newProps, dynamicBrokerConfigs)
}
newProps
}
private[server] def validate(props: Properties, perBrokerConfig: Boolean): Unit = CoreUtils.inReadLock(lock) {
val newProps = validatedKafkaProps(props, perBrokerConfig)
processReconfiguration(newProps, validateOnly = true)
}
private def removeInvalidConfigs(props: Properties, perBrokerConfig: Boolean): Unit = {
try {
validateConfigTypes(props)
props.asScala
} catch {
case e: Exception =>
val invalidProps = props.asScala.filter { case (k, v) =>
val props1 = new Properties
props1.put(k, v)
try {
validateConfigTypes(props1)
false
} catch {
case _: Exception => true
}
}
invalidProps.keys.foreach(props.remove)
val configSource = if (perBrokerConfig) "broker" else "default cluster"
error(s"Dynamic $configSource config contains invalid values in: ${invalidProps.keys}, these configs will be ignored", e)
}
}
private[server] def maybeReconfigure(reconfigurable: Reconfigurable, oldConfig: KafkaConfig, newConfig: util.Map[String, _]): Unit = {
if (reconfigurable.reconfigurableConfigs.asScala.exists(key => oldConfig.originals.get(key) != newConfig.get(key)))
reconfigurable.reconfigure(newConfig)
}
/**
* Returns the change in configurations between the new props and current props by returning a
* map of the changed configs, as well as the set of deleted keys
*/
private def updatedConfigs(newProps: java.util.Map[String, _],
currentProps: java.util.Map[String, _]): (mutable.Map[String, _], Set[String]) = {
val changeMap = newProps.asScala.filter {
case (k, v) => v != currentProps.get(k)
}
val deletedKeySet = currentProps.asScala.filter {
case (k, _) => !newProps.containsKey(k)
}.keySet
(changeMap, deletedKeySet)
}
/**
* Updates values in `props` with the new values from `propsOverride`. Synonyms of updated configs
* are removed from `props` to ensure that the config with the higher precedence is applied. For example,
* if `log.roll.ms` was defined in server.properties and `log.roll.hours` is configured dynamically,
* `log.roll.hours` from the dynamic configuration will be used and `log.roll.ms` will be removed from
* `props` (even though `log.roll.hours` is secondary to `log.roll.ms`).
*/
private def overrideProps(props: mutable.Map[String, String], propsOverride: mutable.Map[String, String]): Unit = {
propsOverride.forKeyValue { (k, v) =>
// Remove synonyms of `k` to ensure the right precedence is applied. But disable `matchListenerOverride`
// so that base configs corresponding to listener configs are not removed. Base configs should not be removed
// since they may be used by other listeners. It is ok to retain them in `props` since base configs cannot be
// dynamically updated and listener-specific configs have the higher precedence.
brokerConfigSynonyms(k, matchListenerOverride = false).foreach(props.remove)
props.put(k, v)
}
}
private def updateCurrentConfig(doLog: Boolean): Unit = {
val newProps = mutable.Map[String, String]()
newProps ++= staticBrokerConfigs
overrideProps(newProps, dynamicDefaultConfigs)
overrideProps(newProps, dynamicBrokerConfigs)
val oldConfig = currentConfig
val (newConfig, brokerReconfigurablesToUpdate) = processReconfiguration(newProps, validateOnly = false, doLog)
if (newConfig ne currentConfig) {
currentConfig = newConfig
kafkaConfig.updateCurrentConfig(newConfig)
// Process BrokerReconfigurable updates after current config is updated
brokerReconfigurablesToUpdate.foreach(_.reconfigure(oldConfig, newConfig))
}
}
private def processReconfiguration(newProps: Map[String, String], validateOnly: Boolean, doLog: Boolean = false): (KafkaConfig, List[BrokerReconfigurable]) = {
val newConfig = new KafkaConfig(newProps.asJava, doLog, None)
val (changeMap, deletedKeySet) = updatedConfigs(newConfig.originalsFromThisConfig, currentConfig.originals)
if (changeMap.nonEmpty || deletedKeySet.nonEmpty) {
try {
val customConfigs = new util.HashMap[String, Object](newConfig.originalsFromThisConfig) // non-Kafka configs
newConfig.valuesFromThisConfig.keySet.forEach(k => customConfigs.remove(k))
reconfigurables.forEach {
case listenerReconfigurable: ListenerReconfigurable =>
processListenerReconfigurable(listenerReconfigurable, newConfig, customConfigs, validateOnly, reloadOnly = false)
case reconfigurable =>
if (needsReconfiguration(reconfigurable.reconfigurableConfigs, changeMap.keySet, deletedKeySet))
processReconfigurable(reconfigurable, changeMap.keySet, newConfig.valuesFromThisConfig, customConfigs, validateOnly)
}
// BrokerReconfigurable updates are processed after config is updated. Only do the validation here.
val brokerReconfigurablesToUpdate = mutable.Buffer[BrokerReconfigurable]()
brokerReconfigurables.forEach { reconfigurable =>
if (needsReconfiguration(reconfigurable.reconfigurableConfigs.asJava, changeMap.keySet, deletedKeySet)) {
reconfigurable.validateReconfiguration(newConfig)
if (!validateOnly)
brokerReconfigurablesToUpdate += reconfigurable
}
}
(newConfig, brokerReconfigurablesToUpdate.toList)
} catch {
case e: Exception =>
if (!validateOnly)
error(s"Failed to update broker configuration with configs : " +
s"${ConfigUtils.configMapToRedactedString(newConfig.originalsFromThisConfig, KafkaConfig.configDef)}", e)
throw new ConfigException("Invalid dynamic configuration", e)
}
}
else
(currentConfig, List.empty)
}
private def needsReconfiguration(reconfigurableConfigs: util.Set[String], updatedKeys: Set[String], deletedKeys: Set[String]): Boolean = {
reconfigurableConfigs.asScala.intersect(updatedKeys).nonEmpty ||
reconfigurableConfigs.asScala.intersect(deletedKeys).nonEmpty
}
private def processListenerReconfigurable(listenerReconfigurable: ListenerReconfigurable,
newConfig: KafkaConfig,
customConfigs: util.Map[String, Object],
validateOnly: Boolean,
reloadOnly: Boolean): Unit = {
val listenerName = listenerReconfigurable.listenerName
val oldValues = currentConfig.valuesWithPrefixOverride(listenerName.configPrefix)
val newValues = newConfig.valuesFromThisConfigWithPrefixOverride(listenerName.configPrefix)
val (changeMap, deletedKeys) = updatedConfigs(newValues, oldValues)
val updatedKeys = changeMap.keySet
val configsChanged = needsReconfiguration(listenerReconfigurable.reconfigurableConfigs, updatedKeys, deletedKeys)
// if `reloadOnly`, reconfigure if configs haven't changed. Otherwise reconfigure if configs have changed
if (reloadOnly != configsChanged)
processReconfigurable(listenerReconfigurable, updatedKeys, newValues, customConfigs, validateOnly)
}
private def processReconfigurable(reconfigurable: Reconfigurable,
updatedConfigNames: Set[String],
allNewConfigs: util.Map[String, _],
newCustomConfigs: util.Map[String, Object],
validateOnly: Boolean): Unit = {
val newConfigs = new util.HashMap[String, Object]
allNewConfigs.forEach((k, v) => newConfigs.put(k, v.asInstanceOf[AnyRef]))
newConfigs.putAll(newCustomConfigs)
try {
reconfigurable.validateReconfiguration(newConfigs)
} catch {
case e: ConfigException => throw e
case _: Exception =>
throw new ConfigException(s"Validation of dynamic config update of $updatedConfigNames failed with class ${reconfigurable.getClass}")
}
if (!validateOnly) {
info(s"Reconfiguring $reconfigurable, updated configs: $updatedConfigNames " +
s"custom configs: ${ConfigUtils.configMapToRedactedString(newCustomConfigs, KafkaConfig.configDef)}")
reconfigurable.reconfigure(newConfigs)
}
}
}
trait BrokerReconfigurable {
def reconfigurableConfigs: Set[String]
def validateReconfiguration(newConfig: KafkaConfig): Unit
def reconfigure(oldConfig: KafkaConfig, newConfig: KafkaConfig): Unit
}
object DynamicLogConfig {
// Exclude message.format.version for now since we need to check that the version
// is supported on all brokers in the cluster.
@nowarn("cat=deprecation")
val ExcludedConfigs = Set(KafkaConfig.LogMessageFormatVersionProp)
val ReconfigurableConfigs = ServerTopicConfigSynonyms.TOPIC_CONFIG_SYNONYMS.values.asScala.toSet -- ExcludedConfigs
val KafkaConfigToLogConfigName = ServerTopicConfigSynonyms.TOPIC_CONFIG_SYNONYMS.asScala.map { case (k, v) => (v, k) }
}
class DynamicLogConfig(logManager: LogManager, server: KafkaBroker) extends BrokerReconfigurable with Logging {
override def reconfigurableConfigs: Set[String] = {
DynamicLogConfig.ReconfigurableConfigs
}
override def validateReconfiguration(newConfig: KafkaConfig): Unit = {
// For update of topic config overrides, only config names and types are validated
// Names and types have already been validated. For consistency with topic config
// validation, no additional validation is performed.
def validateLogLocalRetentionMs(): Unit = {
val logRetentionMs = newConfig.logRetentionTimeMillis
val logLocalRetentionMs: java.lang.Long = newConfig.logLocalRetentionMs
if (logRetentionMs != -1L && logLocalRetentionMs != -2L) {
if (logLocalRetentionMs == -1L) {
throw new ConfigException(RemoteLogManagerConfig.LOG_LOCAL_RETENTION_MS_PROP, logLocalRetentionMs,
s"Value must not be -1 as ${KafkaConfig.LogRetentionTimeMillisProp} value is set as $logRetentionMs.")
}
if (logLocalRetentionMs > logRetentionMs) {
throw new ConfigException(RemoteLogManagerConfig.LOG_LOCAL_RETENTION_MS_PROP, logLocalRetentionMs,
s"Value must not be more than ${KafkaConfig.LogRetentionTimeMillisProp} property value: $logRetentionMs")
}
}
}
def validateLogLocalRetentionBytes(): Unit = {
val logRetentionBytes = newConfig.logRetentionBytes
val logLocalRetentionBytes: java.lang.Long = newConfig.logLocalRetentionBytes
if (logRetentionBytes > -1 && logLocalRetentionBytes != -2) {
if (logLocalRetentionBytes == -1) {
throw new ConfigException(RemoteLogManagerConfig.LOG_LOCAL_RETENTION_BYTES_PROP, logLocalRetentionBytes,
s"Value must not be -1 as ${KafkaConfig.LogRetentionBytesProp} value is set as $logRetentionBytes.")
}
if (logLocalRetentionBytes > logRetentionBytes) {
throw new ConfigException(RemoteLogManagerConfig.LOG_LOCAL_RETENTION_BYTES_PROP, logLocalRetentionBytes,
s"Value must not be more than ${KafkaConfig.LogRetentionBytesProp} property value: $logRetentionBytes")
}
}
}
validateLogLocalRetentionMs()
validateLogLocalRetentionBytes()
}
private def updateLogsConfig(newBrokerDefaults: Map[String, Object]): Unit = {
logManager.brokerConfigUpdated()
logManager.allLogs.foreach { log =>
val props = mutable.Map.empty[Any, Any]
props ++= newBrokerDefaults
props ++= log.config.originals.asScala.filter { case (k, _) =>
log.config.overriddenConfigs.contains(k)
}
val logConfig = new LogConfig(props.asJava, log.config.overriddenConfigs)
log.updateConfig(logConfig)
}
}
override def reconfigure(oldConfig: KafkaConfig, newConfig: KafkaConfig): Unit = {
val originalLogConfig = logManager.currentDefaultConfig
val originalUncleanLeaderElectionEnable = originalLogConfig.uncleanLeaderElectionEnable
val newBrokerDefaults = new util.HashMap[String, Object](originalLogConfig.originals)
newConfig.valuesFromThisConfig.forEach { (k, v) =>
if (DynamicLogConfig.ReconfigurableConfigs.contains(k)) {
DynamicLogConfig.KafkaConfigToLogConfigName.get(k).foreach { configName =>
if (v == null)
newBrokerDefaults.remove(configName)
else
newBrokerDefaults.put(configName, v.asInstanceOf[AnyRef])
}
}
}
logManager.reconfigureDefaultLogConfig(new LogConfig(newBrokerDefaults))
updateLogsConfig(newBrokerDefaults.asScala)
if (logManager.currentDefaultConfig.uncleanLeaderElectionEnable && !originalUncleanLeaderElectionEnable) {
server match {
case kafkaServer: KafkaServer => kafkaServer.kafkaController.enableDefaultUncleanLeaderElection()
case _ =>
}
}
}
}
object DynamicThreadPool {
val ReconfigurableConfigs = Set(
KafkaConfig.NumIoThreadsProp,
KafkaConfig.NumReplicaFetchersProp,
KafkaConfig.NumRecoveryThreadsPerDataDirProp,
KafkaConfig.BackgroundThreadsProp)
def validateReconfiguration(currentConfig: KafkaConfig, newConfig: KafkaConfig): Unit = {
newConfig.values.forEach { (k, v) =>
if (ReconfigurableConfigs.contains(k)) {
val newValue = v.asInstanceOf[Int]
val oldValue = getValue(currentConfig, k)
if (newValue != oldValue) {
val errorMsg = s"Dynamic thread count update validation failed for $k=$v"
if (newValue <= 0)
throw new ConfigException(s"$errorMsg, value should be at least 1")
if (newValue < oldValue / 2)
throw new ConfigException(s"$errorMsg, value should be at least half the current value $oldValue")
if (newValue > oldValue * 2)
throw new ConfigException(s"$errorMsg, value should not be greater than double the current value $oldValue")
}
}
}
}
def getValue(config: KafkaConfig, name: String): Int = {
name match {
case KafkaConfig.NumIoThreadsProp => config.numIoThreads
case KafkaConfig.NumReplicaFetchersProp => config.numReplicaFetchers
case KafkaConfig.NumRecoveryThreadsPerDataDirProp => config.numRecoveryThreadsPerDataDir
case KafkaConfig.BackgroundThreadsProp => config.backgroundThreads
case n => throw new IllegalStateException(s"Unexpected config $n")
}
}
}
class ControllerDynamicThreadPool(controller: ControllerServer) extends BrokerReconfigurable {
override def reconfigurableConfigs: Set[String] = {
Set(KafkaConfig.NumIoThreadsProp)
}
override def validateReconfiguration(newConfig: KafkaConfig): Unit = {
DynamicThreadPool.validateReconfiguration(controller.config, newConfig) // common validation
}
override def reconfigure(oldConfig: KafkaConfig, newConfig: KafkaConfig): Unit = {
if (newConfig.numIoThreads != oldConfig.numIoThreads)
controller.controllerApisHandlerPool.resizeThreadPool(newConfig.numIoThreads)
}
}
class BrokerDynamicThreadPool(server: KafkaBroker) extends BrokerReconfigurable {
override def reconfigurableConfigs: Set[String] = {
DynamicThreadPool.ReconfigurableConfigs
}
override def validateReconfiguration(newConfig: KafkaConfig): Unit = {
DynamicThreadPool.validateReconfiguration(server.config, newConfig)
}
override def reconfigure(oldConfig: KafkaConfig, newConfig: KafkaConfig): Unit = {
if (newConfig.numIoThreads != oldConfig.numIoThreads)
server.dataPlaneRequestHandlerPool.resizeThreadPool(newConfig.numIoThreads)
if (newConfig.numReplicaFetchers != oldConfig.numReplicaFetchers)
server.replicaManager.resizeFetcherThreadPool(newConfig.numReplicaFetchers)
if (newConfig.numRecoveryThreadsPerDataDir != oldConfig.numRecoveryThreadsPerDataDir)
server.logManager.resizeRecoveryThreadPool(newConfig.numRecoveryThreadsPerDataDir)
if (newConfig.backgroundThreads != oldConfig.backgroundThreads)
server.kafkaScheduler.resizeThreadPool(newConfig.backgroundThreads)
}
}
class DynamicMetricsReporters(brokerId: Int, config: KafkaConfig, metrics: Metrics, clusterId: String) extends Reconfigurable {
private val reporterState = new DynamicMetricReporterState(brokerId, config, metrics, clusterId)
private[server] val currentReporters = reporterState.currentReporters
private val dynamicConfig = reporterState.dynamicConfig
private def metricsReporterClasses(configs: util.Map[String, _]): mutable.Buffer[String] =
reporterState.metricsReporterClasses(configs)
private def createReporters(reporterClasses: util.List[String], updatedConfigs: util.Map[String, _]): Unit =
reporterState.createReporters(reporterClasses, updatedConfigs)
private def removeReporter(className: String): Unit = reporterState.removeReporter(className)
override def configure(configs: util.Map[String, _]): Unit = {}
override def reconfigurableConfigs(): util.Set[String] = {
val configs = new util.HashSet[String]()
configs.add(KafkaConfig.MetricReporterClassesProp)
currentReporters.values.foreach {
case reporter: Reconfigurable => configs.addAll(reporter.reconfigurableConfigs)
case _ =>
}
configs
}
override def validateReconfiguration(configs: util.Map[String, _]): Unit = {
val updatedMetricsReporters = metricsReporterClasses(configs)
// Ensure all the reporter classes can be loaded and have a default constructor
updatedMetricsReporters.foreach { className =>
val clazz = Utils.loadClass(className, classOf[MetricsReporter])
clazz.getConstructor()
}
// Validate the new configuration using every reconfigurable reporter instance that is not being deleted
currentReporters.values.foreach {
case reporter: Reconfigurable =>
if (updatedMetricsReporters.contains(reporter.getClass.getName))
reporter.validateReconfiguration(configs)
case _ =>
}
}
override def reconfigure(configs: util.Map[String, _]): Unit = {
val updatedMetricsReporters = metricsReporterClasses(configs)
val deleted = currentReporters.keySet.toSet -- updatedMetricsReporters
deleted.foreach(removeReporter)
currentReporters.values.foreach {
case reporter: Reconfigurable => dynamicConfig.maybeReconfigure(reporter, dynamicConfig.currentKafkaConfig, configs)
case _ =>
}
val added = updatedMetricsReporters.filterNot(currentReporters.keySet)
createReporters(added.asJava, configs)
}
}
class DynamicMetricReporterState(brokerId: Int, config: KafkaConfig, metrics: Metrics, clusterId: String) {
private[server] val dynamicConfig = config.dynamicConfig
private val propsOverride = Map[String, AnyRef](KafkaConfig.BrokerIdProp -> brokerId.toString)
private[server] val currentReporters = mutable.Map[String, MetricsReporter]()
createReporters(config, clusterId, metricsReporterClasses(dynamicConfig.currentKafkaConfig.values()).asJava,
Collections.emptyMap[String, Object])
private[server] def createReporters(reporterClasses: util.List[String],
updatedConfigs: util.Map[String, _]): Unit = {
createReporters(config, clusterId, reporterClasses, updatedConfigs)
}
private def createReporters(config: KafkaConfig,
clusterId: String,
reporterClasses: util.List[String],
updatedConfigs: util.Map[String, _]): Unit = {
val props = new util.HashMap[String, AnyRef]
updatedConfigs.forEach((k, v) => props.put(k, v.asInstanceOf[AnyRef]))
propsOverride.forKeyValue((k, v) => props.put(k, v))
val reporters = dynamicConfig.currentKafkaConfig.getConfiguredInstances(reporterClasses, classOf[MetricsReporter], props)
// Call notifyMetricsReporters first to satisfy the contract for MetricsReporter.contextChange,
// which provides that MetricsReporter.contextChange must be called before the first call to MetricsReporter.init.
// The first call to MetricsReporter.init is done when we call metrics.addReporter below.
KafkaBroker.notifyMetricsReporters(clusterId, config, reporters.asScala)
reporters.forEach { reporter =>
metrics.addReporter(reporter)
currentReporters += reporter.getClass.getName -> reporter
val clientTelemetryReceiver = reporter match {
case telemetry: ClientTelemetry => telemetry.clientReceiver()
case _ => null
}
if (clientTelemetryReceiver != null) {
dynamicConfig.clientMetricsReceiverPlugin match {
case Some(receiverPlugin) =>
receiverPlugin.add(clientTelemetryReceiver)
case None =>
// Do nothing
}
}
}
KafkaBroker.notifyClusterListeners(clusterId, reporters.asScala)
}
private[server] def removeReporter(className: String): Unit = {
currentReporters.remove(className).foreach(metrics.removeReporter)
}
@nowarn("cat=deprecation")
private[server] def metricsReporterClasses(configs: util.Map[String, _]): mutable.Buffer[String] = {
val reporters = mutable.Buffer[String]()
reporters ++= configs.get(KafkaConfig.MetricReporterClassesProp).asInstanceOf[util.List[String]].asScala
if (configs.get(KafkaConfig.AutoIncludeJmxReporterProp).asInstanceOf[Boolean] &&
!reporters.contains(classOf[JmxReporter].getName)) {
reporters += classOf[JmxReporter].getName
}
reporters
}
}
object DynamicListenerConfig {
/**
* The set of configurations which the DynamicListenerConfig object listens for. Many of
* these are also monitored by other objects such as ChannelBuilders and SocketServers.
*/
val ReconfigurableConfigs = Set(
// Listener configs
KafkaConfig.AdvertisedListenersProp,
KafkaConfig.ListenersProp,
KafkaConfig.ListenerSecurityProtocolMapProp,
// SSL configs
KafkaSecurityConfigs.PRINCIPAL_BUILDER_CLASS_CONFIG,
KafkaSecurityConfigs.SSL_PROTOCOL_CONFIG,
KafkaSecurityConfigs.SSL_PROVIDER_CONFIG,
KafkaSecurityConfigs.SSL_CIPHER_SUITES_CONFIG,
KafkaSecurityConfigs.SSL_ENABLED_PROTOCOLS_CONFIG,
KafkaSecurityConfigs.SSL_KEYSTORE_TYPE_CONFIG,
KafkaSecurityConfigs.SSL_KEYSTORE_LOCATION_CONFIG,
KafkaSecurityConfigs.SSL_KEYSTORE_PASSWORD_CONFIG,
KafkaSecurityConfigs.SSL_KEY_PASSWORD_CONFIG,
KafkaSecurityConfigs.SSL_TRUSTSTORE_TYPE_CONFIG,
KafkaSecurityConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG,
KafkaSecurityConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG,
KafkaSecurityConfigs.SSL_KEYMANAGER_ALGORITHM_CONFIG,
KafkaSecurityConfigs.SSL_TRUSTMANAGER_ALGORITHM_CONFIG,
KafkaSecurityConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG,
KafkaSecurityConfigs.SSL_SECURE_RANDOM_IMPLEMENTATION_CONFIG,
KafkaSecurityConfigs.SSL_CLIENT_AUTH_CONFIG,
KafkaSecurityConfigs.SSL_ENGINE_FACTORY_CLASS_CONFIG,
// SASL configs
KafkaSecurityConfigs.SASL_MECHANISM_INTER_BROKER_PROTOCOL_CONFIG,
KafkaSecurityConfigs.SASL_JAAS_CONFIG,
KafkaSecurityConfigs.SASL_ENABLED_MECHANISMS_CONFIG,
KafkaSecurityConfigs.SASL_KERBEROS_SERVICE_NAME_CONFIG,
KafkaSecurityConfigs.SASL_KERBEROS_KINIT_CMD_CONFIG,
KafkaSecurityConfigs.SASL_KERBEROS_TICKET_RENEW_WINDOW_FACTOR_CONFIG,
KafkaSecurityConfigs.SASL_KERBEROS_TICKET_RENEW_JITTER_CONFIG,
KafkaSecurityConfigs.SASL_KERBEROS_MIN_TIME_BEFORE_RELOGIN_CONFIG,
KafkaSecurityConfigs.SASL_KERBEROS_PRINCIPAL_TO_LOCAL_RULES_CONFIG,
KafkaSecurityConfigs.SASL_LOGIN_REFRESH_WINDOW_FACTOR_CONFIG,
KafkaSecurityConfigs.SASL_LOGIN_REFRESH_WINDOW_JITTER_CONFIG,
KafkaSecurityConfigs.SASL_LOGIN_REFRESH_MIN_PERIOD_SECONDS_CONFIG,
KafkaSecurityConfigs.SASL_LOGIN_REFRESH_BUFFER_SECONDS_CONFIG,
// Connection limit configs
KafkaConfig.MaxConnectionsProp,
KafkaConfig.MaxConnectionCreationRateProp,
// Network threads
KafkaConfig.NumNetworkThreadsProp
)
}
class DynamicClientQuotaCallback(
quotaManagers: QuotaFactory.QuotaManagers,
serverConfig: KafkaConfig
) extends Reconfigurable {
override def configure(configs: util.Map[String, _]): Unit = {}
override def reconfigurableConfigs(): util.Set[String] = {
val configs = new util.HashSet[String]()
quotaManagers.clientQuotaCallback.foreach {
case callback: Reconfigurable => configs.addAll(callback.reconfigurableConfigs)
case _ =>
}
configs
}
override def validateReconfiguration(configs: util.Map[String, _]): Unit = {
quotaManagers.clientQuotaCallback.foreach {
case callback: Reconfigurable => callback.validateReconfiguration(configs)
case _ =>
}
}
override def reconfigure(configs: util.Map[String, _]): Unit = {
quotaManagers.clientQuotaCallback.foreach {
case callback: Reconfigurable =>
serverConfig.dynamicConfig.maybeReconfigure(callback, serverConfig.dynamicConfig.currentKafkaConfig, configs)
true
case _ => false
}
}
}
class DynamicListenerConfig(server: KafkaBroker) extends BrokerReconfigurable with Logging {
override def reconfigurableConfigs: Set[String] = {
DynamicListenerConfig.ReconfigurableConfigs
}
private def listenerRegistrationsAltered(
oldAdvertisedListeners: Map[ListenerName, EndPoint],
newAdvertisedListeners: Map[ListenerName, EndPoint]
): Boolean = {
if (oldAdvertisedListeners.size != newAdvertisedListeners.size) return true
oldAdvertisedListeners.forKeyValue {
case (oldListenerName, oldEndpoint) =>
newAdvertisedListeners.get(oldListenerName) match {
case None => return true
case Some(newEndpoint) => if (!newEndpoint.equals(oldEndpoint)) {
return true
}
}
}
false
}
private def verifyListenerRegistrationAlterationSupported(): Unit = {
if (!server.config.requiresZookeeper) {
throw new ConfigException("Advertised listeners cannot be altered when using a " +
"Raft-based metadata quorum.")
}
}
def validateReconfiguration(newConfig: KafkaConfig): Unit = {
val oldConfig = server.config
val newListeners = listenersToMap(newConfig.listeners)
val newAdvertisedListeners = listenersToMap(newConfig.effectiveAdvertisedListeners)
val oldListeners = listenersToMap(oldConfig.listeners)
if (!newAdvertisedListeners.keySet.subsetOf(newListeners.keySet))
throw new ConfigException(s"Advertised listeners '$newAdvertisedListeners' must be a subset of listeners '$newListeners'")
if (!newListeners.keySet.subsetOf(newConfig.effectiveListenerSecurityProtocolMap.keySet))
throw new ConfigException(s"Listeners '$newListeners' must be subset of listener map '${newConfig.effectiveListenerSecurityProtocolMap}'")
newListeners.keySet.intersect(oldListeners.keySet).foreach { listenerName =>
def immutableListenerConfigs(kafkaConfig: KafkaConfig, prefix: String): Map[String, AnyRef] = {
kafkaConfig.originalsWithPrefix(prefix, true).asScala.filter { case (key, _) =>
// skip the reconfigurable configs
!DynamicSecurityConfigs.contains(key) && !SocketServer.ListenerReconfigurableConfigs.contains(key) && !DataPlaneAcceptor.ListenerReconfigurableConfigs.contains(key)
}
}
if (immutableListenerConfigs(newConfig, listenerName.configPrefix) != immutableListenerConfigs(oldConfig, listenerName.configPrefix))
throw new ConfigException(s"Configs cannot be updated dynamically for existing listener $listenerName, " +
"restart broker or create a new listener for update")
if (oldConfig.effectiveListenerSecurityProtocolMap(listenerName) != newConfig.effectiveListenerSecurityProtocolMap(listenerName))
throw new ConfigException(s"Security protocol cannot be updated for existing listener $listenerName")
}
if (!newAdvertisedListeners.contains(newConfig.interBrokerListenerName))
throw new ConfigException(s"Advertised listener must be specified for inter-broker listener ${newConfig.interBrokerListenerName}")
// Currently, we do not support adding or removing listeners when in KRaft mode.
// However, we support changing other listener configurations (max connections, etc.)
if (listenerRegistrationsAltered(listenersToMap(oldConfig.effectiveAdvertisedListeners),
listenersToMap(newConfig.effectiveAdvertisedListeners))) {
verifyListenerRegistrationAlterationSupported()
}
}
def reconfigure(oldConfig: KafkaConfig, newConfig: KafkaConfig): Unit = {
val newListeners = newConfig.listeners
val newListenerMap = listenersToMap(newListeners)
val oldListeners = oldConfig.listeners
val oldListenerMap = listenersToMap(oldListeners)
val listenersRemoved = oldListeners.filterNot(e => newListenerMap.contains(e.listenerName))
val listenersAdded = newListeners.filterNot(e => oldListenerMap.contains(e.listenerName))
if (listenersRemoved.nonEmpty || listenersAdded.nonEmpty) {
LoginManager.closeAll() // Clear SASL login cache to force re-login
if (listenersRemoved.nonEmpty) server.socketServer.removeListeners(listenersRemoved)
if (listenersAdded.nonEmpty) server.socketServer.addListeners(listenersAdded)
}
if (listenerRegistrationsAltered(listenersToMap(oldConfig.effectiveAdvertisedListeners),
listenersToMap(newConfig.effectiveAdvertisedListeners))) {
verifyListenerRegistrationAlterationSupported()
server match {
case kafkaServer: KafkaServer => kafkaServer.kafkaController.updateBrokerInfo(kafkaServer.createBrokerInfo)
case _ => throw new RuntimeException("Unable to handle non-kafkaServer")
}
}
}
private def listenersToMap(listeners: Seq[EndPoint]): Map[ListenerName, EndPoint] =
listeners.map(e => (e.listenerName, e)).toMap
}
class DynamicProducerStateManagerConfig(val producerStateManagerConfig: ProducerStateManagerConfig) extends BrokerReconfigurable with Logging {
def reconfigure(oldConfig: KafkaConfig, newConfig: KafkaConfig): Unit = {
if (producerStateManagerConfig.producerIdExpirationMs != newConfig.producerIdExpirationMs) {
info(s"Reconfigure ${TransactionLogConfigs.PRODUCER_ID_EXPIRATION_MS_CONFIG} from ${producerStateManagerConfig.producerIdExpirationMs} to ${newConfig.producerIdExpirationMs}")
producerStateManagerConfig.setProducerIdExpirationMs(newConfig.producerIdExpirationMs)
}
if (producerStateManagerConfig.transactionVerificationEnabled != newConfig.transactionPartitionVerificationEnable) {
info(s"Reconfigure ${TransactionLogConfigs.TRANSACTION_PARTITION_VERIFICATION_ENABLE_CONFIG} from ${producerStateManagerConfig.transactionVerificationEnabled} to ${newConfig.transactionPartitionVerificationEnable}")
producerStateManagerConfig.setTransactionVerificationEnabled(newConfig.transactionPartitionVerificationEnable)
}
}
def validateReconfiguration(newConfig: KafkaConfig): Unit = {
if (newConfig.producerIdExpirationMs < 0)
throw new ConfigException(s"${TransactionLogConfigs.PRODUCER_ID_EXPIRATION_MS_CONFIG} cannot be less than 0, current value is ${producerStateManagerConfig.producerIdExpirationMs}, and new value is ${newConfig.producerIdExpirationMs}")
}
override def reconfigurableConfigs: Set[String] = DynamicProducerStateManagerConfig
}
class DynamicRemoteLogConfig(server: KafkaBroker) extends BrokerReconfigurable with Logging {
override def reconfigurableConfigs: Set[String] = {
DynamicRemoteLogConfig.ReconfigurableConfigs
}
override def validateReconfiguration(newConfig: KafkaConfig): Unit = {
newConfig.values.forEach { (k, v) =>
if (reconfigurableConfigs.contains(k)) {
if (k.equals(RemoteLogManagerConfig.REMOTE_LOG_INDEX_FILE_CACHE_TOTAL_SIZE_BYTES_PROP)) {
val newValue = v.asInstanceOf[Long]
val oldValue = getValue(server.config, k)
if (newValue != oldValue && newValue <= 0) {
val errorMsg = s"Dynamic remote log manager config update validation failed for $k=$v"
throw new ConfigException(s"$errorMsg, value should be at least 1")
}
}
}
}
}
override def reconfigure(oldConfig: KafkaConfig, newConfig: KafkaConfig): Unit = {
val oldValue = oldConfig.getLong(RemoteLogManagerConfig.REMOTE_LOG_INDEX_FILE_CACHE_TOTAL_SIZE_BYTES_PROP)
val newValue = newConfig.getLong(RemoteLogManagerConfig.REMOTE_LOG_INDEX_FILE_CACHE_TOTAL_SIZE_BYTES_PROP)
if (oldValue != newValue) {
val remoteLogManager = server.remoteLogManagerOpt
if (remoteLogManager.nonEmpty) {
remoteLogManager.get.resizeCacheSize(newValue)
info(s"Dynamic remote log manager config: ${RemoteLogManagerConfig.REMOTE_LOG_INDEX_FILE_CACHE_TOTAL_SIZE_BYTES_PROP} updated, " +
s"old value: $oldValue, new value: $newValue")
}
}
}
private def getValue(config: KafkaConfig, name: String): Long = {
name match {
case RemoteLogManagerConfig.REMOTE_LOG_INDEX_FILE_CACHE_TOTAL_SIZE_BYTES_PROP =>
config.getLong(RemoteLogManagerConfig.REMOTE_LOG_INDEX_FILE_CACHE_TOTAL_SIZE_BYTES_PROP)
case n => throw new IllegalStateException(s"Unexpected dynamic remote log manager config $n")
}
}
}
object DynamicRemoteLogConfig {
val ReconfigurableConfigs = Set(
RemoteLogManagerConfig.REMOTE_LOG_INDEX_FILE_CACHE_TOTAL_SIZE_BYTES_PROP
)
}