KAFKA-19585: Avoid noisy NPE logs when closing consumer after constructor failures (#20491)
If there's a failure in the kafka consumer constructor, we attempt to
close it
https://github.com/lianetm/kafka/blob/2329def2ff9ca4f7b9426af159b6fa19a839dc4d/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java#L540
In that case, it could be the case that some components may have not
been created, so we should consider some null checks to avoid noisy logs
about NPE.
This noisy logs have been reported with the console share consumer in a
similar scenario, so this task is to review and do a similar fix for the
Async if needed.
The fix is to check if handlers/invokers are null before trying to close
them. Similar to what was done here
https://github.com/apache/kafka/pull/20290
Reviewers: TengYao Chi <frankvicky@apache.org>, Lianet Magrans
<lmagrans@confluent.io>
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java
index 93ed987..5c72c2b 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java
@@ -1517,7 +1517,7 @@
}
private void autoCommitOnClose(final Timer timer) {
- if (groupMetadata.get().isEmpty())
+ if (groupMetadata.get().isEmpty() || applicationEventHandler == null)
return;
if (autoCommitEnabled)
@@ -1527,7 +1527,7 @@
}
private void runRebalanceCallbacksOnClose() {
- if (groupMetadata.get().isEmpty())
+ if (groupMetadata.get().isEmpty() || rebalanceListenerInvoker == null)
return;
int memberEpoch = groupMetadata.get().get().generationId();
@@ -1553,7 +1553,7 @@
}
private void leaveGroupOnClose(final Timer timer, final CloseOptions.GroupMembershipOperation membershipOperation) {
- if (groupMetadata.get().isEmpty())
+ if (groupMetadata.get().isEmpty() || applicationEventHandler == null)
return;
log.debug("Leaving the consumer group during consumer close");
@@ -1569,7 +1569,7 @@
}
private void stopFindCoordinatorOnClose() {
- if (groupMetadata.get().isEmpty())
+ if (groupMetadata.get().isEmpty() || applicationEventHandler == null)
return;
log.debug("Stop finding coordinator during consumer close");
applicationEventHandler.add(new StopFindCoordinatorOnCloseEvent());
@@ -1634,7 +1634,7 @@
}
private void awaitPendingAsyncCommitsAndExecuteCommitCallbacks(Timer timer, boolean enableWakeup) {
- if (lastPendingAsyncCommit == null) {
+ if (lastPendingAsyncCommit == null || offsetCommitCallbackInvoker == null) {
return;
}
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java
index 6a5cb87..dcf604d 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java
@@ -83,6 +83,7 @@
import org.apache.kafka.common.requests.MetadataResponse;
import org.apache.kafka.common.requests.RequestTestUtils;
import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.kafka.common.utils.LogCaptureAppender;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.common.utils.Time;
@@ -2029,6 +2030,28 @@
assertEquals(10, (double) metrics.metric(metrics.metricName("background-event-queue-time-max", CONSUMER_METRIC_GROUP)).metricValue());
}
+ @Test
+ public void testFailConstructor() {
+ final Properties props = requiredConsumerConfig();
+ props.put(ConsumerConfig.GROUP_ID_CONFIG, "group-id");
+ props.put(ConsumerConfig.METRIC_REPORTER_CLASSES_CONFIG, "an.invalid.class");
+ final ConsumerConfig config = new ConsumerConfig(props);
+
+ try (LogCaptureAppender appender = LogCaptureAppender.createAndRegister()) {
+ KafkaException ce = assertThrows(
+ KafkaException.class,
+ () -> newConsumer(config));
+ assertTrue(ce.getMessage().contains("Failed to construct kafka consumer"), "Unexpected exception message: " + ce.getMessage());
+ assertTrue(ce.getCause().getMessage().contains("Class an.invalid.class cannot be found"), "Unexpected cause: " + ce.getCause());
+
+ boolean npeLogged = appender.getEvents().stream()
+ .flatMap(event -> event.getThrowableInfo().stream())
+ .anyMatch(str -> str.contains("NullPointerException"));
+
+ assertFalse(npeLogged, "Unexpected NullPointerException during consumer construction");
+ }
+ }
+
private Map<TopicPartition, OffsetAndMetadata> mockTopicPartitionOffset() {
final TopicPartition t0 = new TopicPartition("t0", 2);
final TopicPartition t1 = new TopicPartition("t0", 3);