blob: e68f9218078693d556ef02be6207b5ef6ee77fab [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package kafka.server
import java.util.Properties
import kafka.log.LogConfig
import kafka.security.CredentialProvider
import org.apache.kafka.common.config.ConfigDef
import org.apache.kafka.common.config.ConfigDef.Importance._
import org.apache.kafka.common.config.ConfigDef.Range._
import org.apache.kafka.common.config.ConfigDef.Type._
import scala.collection.JavaConverters._
/**
* Class used to hold dynamic configs. These are configs which have no physical manifestation in the server.properties
* and can only be set dynamically.
*/
object DynamicConfig {
object Broker {
//Properties
val LeaderReplicationThrottledRateProp = "leader.replication.throttled.rate"
val FollowerReplicationThrottledRateProp = "follower.replication.throttled.rate"
//Defaults
val DefaultReplicationThrottledRate = ReplicationQuotaManagerConfig.QuotaBytesPerSecondDefault
//Documentation
val LeaderReplicationThrottledRateDoc = "A long representing the upper bound (bytes/sec) on replication traffic for leaders enumerated in the " +
s"property ${LogConfig.LeaderReplicationThrottledReplicasProp} (for each topic). This property can be only set dynamically. It is suggested that the " +
s"limit be kept above 1MB/s for accurate behaviour."
val FollowerReplicationThrottledRateDoc = "A long representing the upper bound (bytes/sec) on replication traffic for followers enumerated in the " +
s"property ${LogConfig.FollowerReplicationThrottledReplicasProp} (for each topic). This property can be only set dynamically. It is suggested that the " +
s"limit be kept above 1MB/s for accurate behaviour."
//Definitions
private val brokerConfigDef = new ConfigDef()
//round minimum value down, to make it easier for users.
.define(LeaderReplicationThrottledRateProp, LONG, DefaultReplicationThrottledRate, atLeast(0), MEDIUM, LeaderReplicationThrottledRateDoc)
.define(FollowerReplicationThrottledRateProp, LONG, DefaultReplicationThrottledRate, atLeast(0), MEDIUM, FollowerReplicationThrottledRateDoc)
def names = brokerConfigDef.names
def validate(props: Properties) = DynamicConfig.validate(brokerConfigDef, props)
}
object Client {
//Properties
val ProducerByteRateOverrideProp = "producer_byte_rate"
val ConsumerByteRateOverrideProp = "consumer_byte_rate"
//Defaults
val DefaultProducerOverride = ClientQuotaManagerConfig.QuotaBytesPerSecondDefault
val DefaultConsumerOverride = ClientQuotaManagerConfig.QuotaBytesPerSecondDefault
//Documentation
val ProducerOverrideDoc = "A rate representing the upper bound (bytes/sec) for producer traffic."
val ConsumerOverrideDoc = "A rate representing the upper bound (bytes/sec) for consumer traffic."
//Definitions
private val clientConfigs = new ConfigDef()
.define(ProducerByteRateOverrideProp, LONG, DefaultProducerOverride, MEDIUM, ProducerOverrideDoc)
.define(ConsumerByteRateOverrideProp, LONG, DefaultConsumerOverride, MEDIUM, ConsumerOverrideDoc)
def names = clientConfigs.names
def validate(props: Properties) = DynamicConfig.validate(clientConfigs, props)
}
object User {
//Definitions
private val userConfigs = CredentialProvider.userCredentialConfigs
.define(Client.ProducerByteRateOverrideProp, LONG, Client.DefaultProducerOverride, MEDIUM, Client.ProducerOverrideDoc)
.define(Client.ConsumerByteRateOverrideProp, LONG, Client.DefaultConsumerOverride, MEDIUM, Client.ConsumerOverrideDoc)
def names = userConfigs.names
def validate(props: Properties) = DynamicConfig.validate(userConfigs, props)
}
private def validate(configDef: ConfigDef, props: Properties) = {
//Validate Names
val names = configDef.names()
props.keys.asScala.foreach { name =>
require(names.contains(name), s"Unknown Dynamic Configuration '$name'.")
}
//ValidateValues
configDef.parse(props)
}
}