MINOR: Suppress warning logs for UnsupportedVersionException in ClientTelemetryReporter (#20722)
see https://github.com/apache/kafka/pull/20661/files#r2433576874
Suppress warning logs for `UnsupportedVersionException` in
`ClientTelemetryReporter`.
Reviewers: Andrew Schofield <aschofield@confluent.io>
diff --git a/clients/src/main/java/org/apache/kafka/common/telemetry/internals/ClientTelemetryReporter.java b/clients/src/main/java/org/apache/kafka/common/telemetry/internals/ClientTelemetryReporter.java
index ae60aef..6c80365 100644
--- a/clients/src/main/java/org/apache/kafka/common/telemetry/internals/ClientTelemetryReporter.java
+++ b/clients/src/main/java/org/apache/kafka/common/telemetry/internals/ClientTelemetryReporter.java
@@ -21,6 +21,7 @@
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.errors.InterruptException;
import org.apache.kafka.common.errors.RetriableException;
+import org.apache.kafka.common.errors.UnsupportedVersionException;
import org.apache.kafka.common.message.GetTelemetrySubscriptionsRequestData;
import org.apache.kafka.common.message.GetTelemetrySubscriptionsResponseData;
import org.apache.kafka.common.message.PushTelemetryRequestData;
@@ -527,13 +528,13 @@
@Override
public void handleFailedGetTelemetrySubscriptionsRequest(KafkaException maybeFatalException) {
log.debug("The broker generated an error for the get telemetry network API request", maybeFatalException);
- handleFailedRequest(isRetryable(maybeFatalException));
+ handleFailedRequest(maybeFatalException);
}
@Override
public void handleFailedPushTelemetryRequest(KafkaException maybeFatalException) {
log.debug("The broker generated an error for the push telemetry network API request", maybeFatalException);
- handleFailedRequest(isRetryable(maybeFatalException));
+ handleFailedRequest(maybeFatalException);
}
@Override
@@ -834,7 +835,7 @@
}
}
- private void handleFailedRequest(boolean shouldWait) {
+ private void handleFailedRequest(KafkaException maybeFatalException) {
final long nowMs = time.milliseconds();
lock.writeLock().lock();
try {
@@ -852,10 +853,12 @@
again. We may disconnect from the broker and connect to a broker that supports client
telemetry.
*/
- if (shouldWait) {
+ if (isRetryable(maybeFatalException)) {
updateErrorResult(DEFAULT_PUSH_INTERVAL_MS, nowMs);
} else {
- log.warn("Received unrecoverable error from broker, disabling telemetry");
+ if (!(maybeFatalException instanceof UnsupportedVersionException)) {
+ log.warn("Received unrecoverable error from broker, disabling telemetry");
+ }
updateErrorResult(Integer.MAX_VALUE, nowMs);
}