KAFKA-4158; Reset quota to default value if quota override is deleted
Author: Dong Lin <lindong28@gmail.com>
Reviewers: Joel Koshy <jjkoshy.w@gmail.com>, Jiangjie Qin <becket.qin@gmail.com>
Closes #1851 from lindong28/KAFKA-4158
diff --git a/core/src/main/scala/kafka/server/ClientQuotaManager.scala b/core/src/main/scala/kafka/server/ClientQuotaManager.scala
index c99ba97..5e90080 100644
--- a/core/src/main/scala/kafka/server/ClientQuotaManager.scala
+++ b/core/src/main/scala/kafka/server/ClientQuotaManager.scala
@@ -243,6 +243,14 @@
}
/**
+ * Reset quotas to the default value for the given clientId
+ * @param clientId client to override
+ */
+ def resetQuota(clientId: String) = {
+ updateQuota(clientId, defaultQuota)
+ }
+
+ /**
* Overrides quotas per clientId
* @param clientId client to override
* @param quota custom quota to apply
diff --git a/core/src/main/scala/kafka/server/ConfigHandler.scala b/core/src/main/scala/kafka/server/ConfigHandler.scala
index ab1d782..d07fdd8 100644
--- a/core/src/main/scala/kafka/server/ConfigHandler.scala
+++ b/core/src/main/scala/kafka/server/ConfigHandler.scala
@@ -82,11 +82,15 @@
if (clientConfig.containsKey(ClientConfigOverride.ProducerOverride)) {
quotaManagers(ApiKeys.PRODUCE.id).updateQuota(clientId,
new Quota(clientConfig.getProperty(ClientConfigOverride.ProducerOverride).toLong, true))
+ } else {
+ quotaManagers(ApiKeys.PRODUCE.id).resetQuota(clientId)
}
if (clientConfig.containsKey(ClientConfigOverride.ConsumerOverride)) {
quotaManagers(ApiKeys.FETCH.id).updateQuota(clientId,
new Quota(clientConfig.getProperty(ClientConfigOverride.ConsumerOverride).toLong, true))
+ } else {
+ quotaManagers(ApiKeys.FETCH.id).resetQuota(clientId)
}
}
}
diff --git a/core/src/test/scala/unit/kafka/server/DynamicConfigChangeTest.scala b/core/src/test/scala/unit/kafka/server/DynamicConfigChangeTest.scala
index af979e44..a9df929 100644
--- a/core/src/test/scala/unit/kafka/server/DynamicConfigChangeTest.scala
+++ b/core/src/test/scala/unit/kafka/server/DynamicConfigChangeTest.scala
@@ -65,10 +65,9 @@
props.put(ClientConfigOverride.ProducerOverride, "1000")
props.put(ClientConfigOverride.ConsumerOverride, "2000")
AdminUtils.changeClientIdConfig(zkUtils, clientId, props)
+ val quotaManagers: Map[Short, ClientQuotaManager] = servers.head.apis.quotaManagers
TestUtils.retry(10000) {
- val configHandler = this.servers.head.dynamicConfigHandlers(ConfigType.Client).asInstanceOf[ClientIdConfigHandler]
- val quotaManagers: Map[Short, ClientQuotaManager] = servers.head.apis.quotaManagers
val overrideProducerQuota = quotaManagers.get(ApiKeys.PRODUCE.id).get.quota(clientId)
val overrideConsumerQuota = quotaManagers.get(ApiKeys.FETCH.id).get.quota(clientId)
@@ -77,6 +76,22 @@
assertEquals(s"ClientId $clientId must have overridden consumer quota of 2000",
Quota.upperBound(2000), overrideConsumerQuota)
}
+
+ val defaultProducerQuota = servers.head.apis.config.producerQuotaBytesPerSecondDefault.doubleValue
+ val defaultConsumerQuota = servers.head.apis.config.consumerQuotaBytesPerSecondDefault.doubleValue
+ assertNotEquals("defaultProducerQuota should be different from 1000", 1000, defaultProducerQuota)
+ assertNotEquals("defaultConsumerQuota should be different from 2000", 2000, defaultConsumerQuota)
+ AdminUtils.changeClientIdConfig(zkUtils, clientId, new Properties())
+
+ TestUtils.retry(10000) {
+ val producerQuota = quotaManagers.get(ApiKeys.PRODUCE.id).get.quota(clientId)
+ val consumerQuota = quotaManagers.get(ApiKeys.FETCH.id).get.quota(clientId)
+
+ assertEquals(s"ClientId $clientId must have reset producer quota to " + defaultProducerQuota,
+ Quota.upperBound(defaultProducerQuota), producerQuota)
+ assertEquals(s"ClientId $clientId must have reset consumer quota to " + defaultConsumerQuota,
+ Quota.upperBound(defaultConsumerQuota), consumerQuota)
+ }
}
@Test