KAFKA-18810 fix flaky ClientIdQuotaTest.testQuotaOverrideDelete (#22329)
Ref : https://issues.apache.org/jira/browse/KAFKA-18810 Marked the
producer-throttle assertion as "wait until it's true" instead of
checking immediately
Reviewers: nileshkumar3 <nileshkumar3@gmail.com>, Chia-Ping Tsai
<chia7712@gmail.com>
diff --git a/core/src/test/scala/integration/kafka/api/BaseQuotaTest.scala b/core/src/test/scala/integration/kafka/api/BaseQuotaTest.scala
index 13eb169..806f957 100644
--- a/core/src/test/scala/integration/kafka/api/BaseQuotaTest.scala
+++ b/core/src/test/scala/integration/kafka/api/BaseQuotaTest.scala
@@ -142,7 +142,6 @@
quotaTestClients.verifyConsumeThrottle(expectThrottle = true)
}
- @Flaky("KAFKA-18810")
@ParameterizedTest(name = TestInfoUtils.TestWithParameterizedGroupProtocolNames)
@MethodSource(Array("getTestGroupProtocolParametersAll"))
def testQuotaOverrideDelete(groupProtocol: String): Unit = {
@@ -288,10 +287,15 @@
}
private def verifyThrottleTimeMetric(quotaType: QuotaType, clientId: String, expectThrottle: Boolean): Unit = {
- val throttleMetricValue = metricValue(throttleMetric(quotaType, clientId))
if (expectThrottle) {
- assertTrue(throttleMetricValue > 0, s"Client with id=$clientId should have been throttled")
+ // Poll until at least one metric is recorded to give the broker thread time to flush the throttled value
+ // after the response is sent
+ TestUtils.waitUntilTrue(() => {
+ val metric = throttleMetric(quotaType, clientId)
+ metric != null && metricValue(metric) > 0
+ }, s"Client with id=$clientId should have been throttled")
} else {
+ val throttleMetricValue = metricValue(throttleMetric(quotaType, clientId))
assertTrue(throttleMetricValue.isNaN, s"Client with id=$clientId should not have been throttled")
}
}