MINOR: Followup KAFKA-18193 for null check and error message (#20650)
This PR is a follow-up to KAFKA-18193. It addresses the need for a null
check and an improved error message. Please refer to the previous
comments and the review of
https://github.com/apache/kafka/pull/19955#pullrequestreview-3310028108
for more context.
Reviewers: Chia-Ping Tsai <chia7712@gmail.com>
diff --git a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
index 999befa..68abb91 100644
--- a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
+++ b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
@@ -1523,7 +1523,8 @@
}, clientId + "-CloseThread");
}
- private boolean close(final Optional<Long> timeout, final org.apache.kafka.streams.CloseOptions.GroupMembershipOperation operation) {
+ // visible for testing
+ boolean close(final Optional<Long> timeout, final org.apache.kafka.streams.CloseOptions.GroupMembershipOperation operation) {
final long timeoutMs;
if (timeout.isPresent()) {
timeoutMs = timeout.get();
@@ -1635,8 +1636,9 @@
public synchronized boolean close(final org.apache.kafka.streams.CloseOptions options) throws IllegalArgumentException {
Objects.requireNonNull(options, "options cannot be null");
final CloseOptionsInternal optionsInternal = new CloseOptionsInternal(options);
- final String msgPrefix = prepareMillisCheckFailMsgPrefix(optionsInternal.timeout(), "timeout");
- final long timeoutMs = validateMillisecondDuration(optionsInternal.timeout().get(), msgPrefix);
+ final Duration timeout = optionsInternal.timeout().orElseGet(() -> Duration.ofMillis(Long.MAX_VALUE));
+ final String msgPrefix = prepareMillisCheckFailMsgPrefix(timeout, "timeout");
+ final long timeoutMs = validateMillisecondDuration(timeout, msgPrefix);
if (timeoutMs < 0) {
throw new IllegalArgumentException("Timeout can't be negative.");
}
diff --git a/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java b/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java
index 1ceebab..83eeb0b 100644
--- a/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java
@@ -120,6 +120,7 @@
import static org.mockito.Mockito.anyLong;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.doNothing;
+import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.eq;
import static org.mockito.Mockito.isA;
import static org.mockito.Mockito.mock;
@@ -1298,6 +1299,28 @@
}
@Test
+ @SuppressWarnings("unchecked")
+ public void shouldUseDefaultTimeoutForCloseWithNullTimeout() throws Exception {
+ prepareStreams();
+ prepareStreamThread(streamThreadOne, 1);
+ prepareStreamThread(streamThreadTwo, 2);
+ prepareTerminableThread(streamThreadOne);
+
+ final MockClientSupplier mockClientSupplier = spy(MockClientSupplier.class);
+ when(mockClientSupplier.getAdmin(any())).thenReturn(adminClient);
+
+ final CloseOptions closeOptions = CloseOptions.timeout(null)
+ .withGroupMembershipOperation(CloseOptions.GroupMembershipOperation.LEAVE_GROUP);
+ final KafkaStreams streams = spy(new KafkaStreamsWithTerminableThread(
+ getBuilderWithSource().build(), props, mockClientSupplier, time));
+
+ doReturn(false).when(streams).close(any(Optional.class), any());
+ streams.close(closeOptions);
+
+ verify(streams).close(eq(Optional.of(Long.MAX_VALUE)), eq(CloseOptions.GroupMembershipOperation.LEAVE_GROUP));
+ }
+
+ @Test
public void shouldNotBlockInCloseWithCloseOptionLeaveGroupTrueForZeroDuration() throws Exception {
prepareStreams();
prepareStreamThread(streamThreadOne, 1);