KAFKA-18810 fix flaky ClientIdQuotaTest.testQuotaOverrideDelete (#22329)

Ref : https://issues.apache.org/jira/browse/KAFKA-18810 Marked the
producer-throttle assertion as "wait until it's true" instead of
checking immediately

Reviewers: nileshkumar3 <nileshkumar3@gmail.com>, Chia-Ping Tsai
 <chia7712@gmail.com>
diff --git a/core/src/test/scala/integration/kafka/api/BaseQuotaTest.scala b/core/src/test/scala/integration/kafka/api/BaseQuotaTest.scala
index 13eb169..806f957 100644
--- a/core/src/test/scala/integration/kafka/api/BaseQuotaTest.scala
+++ b/core/src/test/scala/integration/kafka/api/BaseQuotaTest.scala
@@ -142,7 +142,6 @@
     quotaTestClients.verifyConsumeThrottle(expectThrottle = true)
   }
 
-  @Flaky("KAFKA-18810")
   @ParameterizedTest(name = TestInfoUtils.TestWithParameterizedGroupProtocolNames)
   @MethodSource(Array("getTestGroupProtocolParametersAll"))
   def testQuotaOverrideDelete(groupProtocol: String): Unit = {
@@ -288,10 +287,15 @@
   }
 
   private def verifyThrottleTimeMetric(quotaType: QuotaType, clientId: String, expectThrottle: Boolean): Unit = {
-    val throttleMetricValue = metricValue(throttleMetric(quotaType, clientId))
     if (expectThrottle) {
-      assertTrue(throttleMetricValue > 0, s"Client with id=$clientId should have been throttled")
+      // Poll until at least one metric is recorded to give the broker thread time to flush the throttled value
+      // after the response is sent
+      TestUtils.waitUntilTrue(() => {
+        val metric = throttleMetric(quotaType, clientId)
+        metric != null && metricValue(metric) > 0
+      }, s"Client with id=$clientId should have been throttled")
     } else {
+      val throttleMetricValue = metricValue(throttleMetric(quotaType, clientId))
       assertTrue(throttleMetricValue.isNaN, s"Client with id=$clientId should not have been throttled")
     }
   }