KAFKA-4158; Reset quota to default value if quota override is deleted

Author: Dong Lin <lindong28@gmail.com>

Reviewers: Joel Koshy <jjkoshy.w@gmail.com>, Jiangjie Qin <becket.qin@gmail.com>

Closes #1851 from lindong28/KAFKA-4158
diff --git a/core/src/main/scala/kafka/server/ClientQuotaManager.scala b/core/src/main/scala/kafka/server/ClientQuotaManager.scala
index c99ba97..5e90080 100644
--- a/core/src/main/scala/kafka/server/ClientQuotaManager.scala
+++ b/core/src/main/scala/kafka/server/ClientQuotaManager.scala
@@ -243,6 +243,14 @@
   }
 
   /**
+   * Reset quotas to the default value for the given clientId
+   * @param clientId client to override
+   */
+  def resetQuota(clientId: String) = {
+    updateQuota(clientId, defaultQuota)
+  }
+
+  /**
    * Overrides quotas per clientId
    * @param clientId client to override
    * @param quota custom quota to apply
diff --git a/core/src/main/scala/kafka/server/ConfigHandler.scala b/core/src/main/scala/kafka/server/ConfigHandler.scala
index ab1d782..d07fdd8 100644
--- a/core/src/main/scala/kafka/server/ConfigHandler.scala
+++ b/core/src/main/scala/kafka/server/ConfigHandler.scala
@@ -82,11 +82,15 @@
     if (clientConfig.containsKey(ClientConfigOverride.ProducerOverride)) {
       quotaManagers(ApiKeys.PRODUCE.id).updateQuota(clientId,
         new Quota(clientConfig.getProperty(ClientConfigOverride.ProducerOverride).toLong, true))
+    } else {
+      quotaManagers(ApiKeys.PRODUCE.id).resetQuota(clientId)
     }
 
     if (clientConfig.containsKey(ClientConfigOverride.ConsumerOverride)) {
       quotaManagers(ApiKeys.FETCH.id).updateQuota(clientId,
         new Quota(clientConfig.getProperty(ClientConfigOverride.ConsumerOverride).toLong, true))
+    } else {
+      quotaManagers(ApiKeys.FETCH.id).resetQuota(clientId)
     }
   }
 }
diff --git a/core/src/test/scala/unit/kafka/server/DynamicConfigChangeTest.scala b/core/src/test/scala/unit/kafka/server/DynamicConfigChangeTest.scala
index af979e44..a9df929 100644
--- a/core/src/test/scala/unit/kafka/server/DynamicConfigChangeTest.scala
+++ b/core/src/test/scala/unit/kafka/server/DynamicConfigChangeTest.scala
@@ -65,10 +65,9 @@
     props.put(ClientConfigOverride.ProducerOverride, "1000")
     props.put(ClientConfigOverride.ConsumerOverride, "2000")
     AdminUtils.changeClientIdConfig(zkUtils, clientId, props)
+    val quotaManagers: Map[Short, ClientQuotaManager] = servers.head.apis.quotaManagers
 
     TestUtils.retry(10000) {
-      val configHandler = this.servers.head.dynamicConfigHandlers(ConfigType.Client).asInstanceOf[ClientIdConfigHandler]
-      val quotaManagers: Map[Short, ClientQuotaManager] = servers.head.apis.quotaManagers
       val overrideProducerQuota = quotaManagers.get(ApiKeys.PRODUCE.id).get.quota(clientId)
       val overrideConsumerQuota = quotaManagers.get(ApiKeys.FETCH.id).get.quota(clientId)
 
@@ -77,6 +76,22 @@
         assertEquals(s"ClientId $clientId must have overridden consumer quota of 2000",
         Quota.upperBound(2000), overrideConsumerQuota)
     }
+
+    val defaultProducerQuota = servers.head.apis.config.producerQuotaBytesPerSecondDefault.doubleValue
+    val defaultConsumerQuota = servers.head.apis.config.consumerQuotaBytesPerSecondDefault.doubleValue
+    assertNotEquals("defaultProducerQuota should be different from 1000", 1000, defaultProducerQuota)
+    assertNotEquals("defaultConsumerQuota should be different from 2000", 2000, defaultConsumerQuota)
+    AdminUtils.changeClientIdConfig(zkUtils, clientId, new Properties())
+
+    TestUtils.retry(10000) {
+      val producerQuota = quotaManagers.get(ApiKeys.PRODUCE.id).get.quota(clientId)
+      val consumerQuota = quotaManagers.get(ApiKeys.FETCH.id).get.quota(clientId)
+
+      assertEquals(s"ClientId $clientId must have reset producer quota to " + defaultProducerQuota,
+        Quota.upperBound(defaultProducerQuota), producerQuota)
+      assertEquals(s"ClientId $clientId must have reset consumer quota to " + defaultConsumerQuota,
+        Quota.upperBound(defaultConsumerQuota), consumerQuota)
+    }
   }
 
   @Test