MINOR: Followup KAFKA-18193 for null check and error message (#20650)

This PR is a follow-up to KAFKA-18193. It addresses the need for a null
check and an improved error message. Please refer to the previous
comments and the review of
https://github.com/apache/kafka/pull/19955#pullrequestreview-3310028108
for more context.

Reviewers: Chia-Ping Tsai <chia7712@gmail.com>
diff --git a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
index 999befa..68abb91 100644
--- a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
+++ b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
@@ -1523,7 +1523,8 @@
         }, clientId + "-CloseThread");
     }
 
-    private boolean close(final Optional<Long> timeout, final org.apache.kafka.streams.CloseOptions.GroupMembershipOperation operation) {
+    // visible for testing
+    boolean close(final Optional<Long> timeout, final org.apache.kafka.streams.CloseOptions.GroupMembershipOperation operation) {
         final long timeoutMs;
         if (timeout.isPresent()) {
             timeoutMs = timeout.get();
@@ -1635,8 +1636,9 @@
     public synchronized boolean close(final org.apache.kafka.streams.CloseOptions options) throws IllegalArgumentException {
         Objects.requireNonNull(options, "options cannot be null");
         final CloseOptionsInternal optionsInternal = new CloseOptionsInternal(options);
-        final String msgPrefix = prepareMillisCheckFailMsgPrefix(optionsInternal.timeout(), "timeout");
-        final long timeoutMs = validateMillisecondDuration(optionsInternal.timeout().get(), msgPrefix);
+        final Duration timeout = optionsInternal.timeout().orElseGet(() -> Duration.ofMillis(Long.MAX_VALUE));
+        final String msgPrefix = prepareMillisCheckFailMsgPrefix(timeout, "timeout");
+        final long timeoutMs = validateMillisecondDuration(timeout, msgPrefix);
         if (timeoutMs < 0) {
             throw new IllegalArgumentException("Timeout can't be negative.");
         }
diff --git a/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java b/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java
index 1ceebab..83eeb0b 100644
--- a/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java
@@ -120,6 +120,7 @@
 import static org.mockito.Mockito.anyLong;
 import static org.mockito.Mockito.doAnswer;
 import static org.mockito.Mockito.doNothing;
+import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.eq;
 import static org.mockito.Mockito.isA;
 import static org.mockito.Mockito.mock;
@@ -1298,6 +1299,28 @@
     }
 
     @Test
+    @SuppressWarnings("unchecked")
+    public void shouldUseDefaultTimeoutForCloseWithNullTimeout() throws Exception {
+        prepareStreams();
+        prepareStreamThread(streamThreadOne, 1);
+        prepareStreamThread(streamThreadTwo, 2);
+        prepareTerminableThread(streamThreadOne);
+
+        final MockClientSupplier mockClientSupplier = spy(MockClientSupplier.class);
+        when(mockClientSupplier.getAdmin(any())).thenReturn(adminClient);
+
+        final CloseOptions closeOptions = CloseOptions.timeout(null)
+                .withGroupMembershipOperation(CloseOptions.GroupMembershipOperation.LEAVE_GROUP);
+        final KafkaStreams streams = spy(new KafkaStreamsWithTerminableThread(
+                getBuilderWithSource().build(), props, mockClientSupplier, time));
+
+        doReturn(false).when(streams).close(any(Optional.class), any());
+        streams.close(closeOptions);
+
+        verify(streams).close(eq(Optional.of(Long.MAX_VALUE)), eq(CloseOptions.GroupMembershipOperation.LEAVE_GROUP));
+    }
+
+    @Test
     public void shouldNotBlockInCloseWithCloseOptionLeaveGroupTrueForZeroDuration() throws Exception {
         prepareStreams();
         prepareStreamThread(streamThreadOne, 1);