blob: a9df9294f0aed3f528975c300fb8cdee4418a357 [file] [log] [blame]
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* See the License for the specific language governing permissions and
* limitations under the License.
package kafka.server
import java.util.Properties
import org.apache.kafka.common.protocol.ApiKeys
import org.junit.Assert._
import org.apache.kafka.common.metrics.Quota
import org.easymock.{Capture, EasyMock}
import org.junit.Test
import kafka.integration.KafkaServerTestHarness
import kafka.utils._
import kafka.common._
import kafka.log.LogConfig
import kafka.admin.{AdminOperationException, AdminUtils}
import scala.collection.Map
class DynamicConfigChangeTest extends KafkaServerTestHarness {
def generateConfigs() = List(KafkaConfig.fromProps(TestUtils.createBrokerConfig(0, zkConnect)))
def testConfigChange() {
assertTrue("Should contain a ConfigHandler for topics",
val oldVal: java.lang.Long = 100000L
val newVal: java.lang.Long = 200000L
val tp = TopicAndPartition("test", 0)
val logProps = new Properties()
logProps.put(LogConfig.FlushMessagesProp, oldVal.toString)
AdminUtils.createTopic(zkUtils, tp.topic, 1, 1, logProps)
TestUtils.retry(10000) {
val logOpt = this.servers.head.logManager.getLog(tp)
assertEquals(oldVal, logOpt.get.config.flushInterval)
logProps.put(LogConfig.FlushMessagesProp, newVal.toString)
AdminUtils.changeTopicConfig(zkUtils, tp.topic, logProps)
TestUtils.retry(10000) {
assertEquals(newVal, this.servers.head.logManager.getLog(tp).get.config.flushInterval)
def testClientQuotaConfigChange() {
assertTrue("Should contain a ConfigHandler for topics",
val clientId = "testClient"
val props = new Properties()
props.put(ClientConfigOverride.ProducerOverride, "1000")
props.put(ClientConfigOverride.ConsumerOverride, "2000")
AdminUtils.changeClientIdConfig(zkUtils, clientId, props)
val quotaManagers: Map[Short, ClientQuotaManager] = servers.head.apis.quotaManagers
TestUtils.retry(10000) {
val overrideProducerQuota = quotaManagers.get(
val overrideConsumerQuota = quotaManagers.get(
assertEquals(s"ClientId $clientId must have overridden producer quota of 1000",
Quota.upperBound(1000), overrideProducerQuota)
assertEquals(s"ClientId $clientId must have overridden consumer quota of 2000",
Quota.upperBound(2000), overrideConsumerQuota)
val defaultProducerQuota = servers.head.apis.config.producerQuotaBytesPerSecondDefault.doubleValue
val defaultConsumerQuota = servers.head.apis.config.consumerQuotaBytesPerSecondDefault.doubleValue
assertNotEquals("defaultProducerQuota should be different from 1000", 1000, defaultProducerQuota)
assertNotEquals("defaultConsumerQuota should be different from 2000", 2000, defaultConsumerQuota)
AdminUtils.changeClientIdConfig(zkUtils, clientId, new Properties())
TestUtils.retry(10000) {
val producerQuota = quotaManagers.get(
val consumerQuota = quotaManagers.get(
assertEquals(s"ClientId $clientId must have reset producer quota to " + defaultProducerQuota,
Quota.upperBound(defaultProducerQuota), producerQuota)
assertEquals(s"ClientId $clientId must have reset consumer quota to " + defaultConsumerQuota,
Quota.upperBound(defaultConsumerQuota), consumerQuota)
def testConfigChangeOnNonExistingTopic() {
val topic = TestUtils.tempTopic
try {
val logProps = new Properties()
logProps.put(LogConfig.FlushMessagesProp, 10000: java.lang.Integer)
AdminUtils.changeTopicConfig(zkUtils, topic, logProps)
fail("Should fail with AdminOperationException for topic doesn't exist")
} catch {
case e: AdminOperationException => // expected
def testProcessNotification {
val props = new Properties()
props.put("a.b", "10")
// Create a mock ConfigHandler to record config changes it is asked to process
val entityArgument = EasyMock.newCapture[String]
val propertiesArgument = EasyMock.newCapture[Properties]
val handler = EasyMock.createNiceMock(classOf[ConfigHandler])
EasyMock.and(EasyMock.capture(entityArgument), EasyMock.isA(classOf[String])),
EasyMock.and(EasyMock.capture(propertiesArgument), EasyMock.isA(classOf[Properties])))
val configManager = new DynamicConfigManager(zkUtils, Map(ConfigType.Topic -> handler))
// Notifications created using the old TopicConfigManager are ignored.
configManager.ConfigChangedNotificationHandler.processNotification("not json")
// Incorrect Map. No version
try {
val jsonMap = Map("v" -> 1, "x" -> 2)
fail("Should have thrown an Exception while parsing incorrect notification " + jsonMap)
catch {
case t: Throwable =>
// Version is provided. EntityType is incorrect
try {
val jsonMap = Map("version" -> 1, "entity_type" -> "garbage", "entity_name" -> "x")
fail("Should have thrown an Exception while parsing incorrect notification " + jsonMap)
catch {
case t: Throwable =>
// EntityName isn't provided
try {
val jsonMap = Map("version" -> 1, "entity_type" -> ConfigType.Topic)
fail("Should have thrown an Exception while parsing incorrect notification " + jsonMap)
catch {
case t: Throwable =>
// Everything is provided
val jsonMap = Map("version" -> 1, "entity_type" -> ConfigType.Topic, "entity_name" -> "x")
// Verify that processConfigChanges was only called once