KAFKA-16029: Fix "Unable to find FetchSessionHandler for node X" bug (#15186)

Change `AbstractFetcher`/`Fetcher` to _not_ clear the `sessionHandlers` cache during `prepareCloseFetchSessionRequests()`.

During `close()`, `Fetcher` calls `maybeCloseFetchSessions()` which, in turn, calls `prepareCloseFetchSessionRequests()` and then calls `NetworkClient.poll()` to complete the requests. Since `prepareCloseFetchSessionRequests()` (erroneously) clears the `sessionHandlers` cache, when the response is processed, the sessions are missing, and the warning is logged.

Reviewers: Andrew Schofield <aschofield@confluent.io>, David Jacot <djacot@confluent.io>
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractFetch.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractFetch.java
index 1d0c9b0..73584ce 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractFetch.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractFetch.java
@@ -376,25 +376,21 @@
         final Cluster cluster = metadata.fetch();
         Map<Node, FetchSessionHandler.Builder> fetchable = new HashMap<>();
 
-        try {
-            sessionHandlers.forEach((fetchTargetNodeId, sessionHandler) -> {
-                // set the session handler to notify close. This will set the next metadata request to send close message.
-                sessionHandler.notifyClose();
+        sessionHandlers.forEach((fetchTargetNodeId, sessionHandler) -> {
+            // set the session handler to notify close. This will set the next metadata request to send close message.
+            sessionHandler.notifyClose();
 
-                // FetchTargetNode may not be available as it may have disconnected the connection. In such cases, we will
-                // skip sending the close request.
-                final Node fetchTarget = cluster.nodeById(fetchTargetNodeId);
+            // FetchTargetNode may not be available as it may have disconnected the connection. In such cases, we will
+            // skip sending the close request.
+            final Node fetchTarget = cluster.nodeById(fetchTargetNodeId);
 
-                if (fetchTarget == null || isUnavailable(fetchTarget)) {
-                    log.debug("Skip sending close session request to broker {} since it is not reachable", fetchTarget);
-                    return;
-                }
+            if (fetchTarget == null || isUnavailable(fetchTarget)) {
+                log.debug("Skip sending close session request to broker {} since it is not reachable", fetchTarget);
+                return;
+            }
 
-                fetchable.put(fetchTarget, sessionHandler.newBuilder());
-            });
-        } finally {
-            sessionHandlers.clear();
-        }
+            fetchable.put(fetchTarget, sessionHandler.newBuilder());
+        });
 
         return fetchable.entrySet().stream().collect(Collectors.toMap(Map.Entry::getKey, e -> e.getValue().build()));
     }
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
index 51b2532..3c8a9c1 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
@@ -150,10 +150,19 @@
      * be executed once the first time that any of the {@link #close()} methods are called. Subclasses can override
      * this method without the need for extra synchronization at the instance level.
      *
+     * <p/>
+     *
+     * <em>Note</em>: this method is <code>synchronized</code> to reinstitute the 3.5 behavior:
+     *
+     * <blockquote>
+     * Shared states (e.g. sessionHandlers) could be accessed by multiple threads (such as heartbeat thread), hence,
+     * it is necessary to acquire a lock on the fetcher instance before modifying the states.
+     * </blockquote>
+     *
      * @param timer Timer to enforce time limit
      */
     // Visible for testing
-    protected void closeInternal(Timer timer) {
+    protected synchronized void closeInternal(Timer timer) {
         // we do not need to re-enable wake-ups since we are closing already
         client.disableWakeups();
         maybeCloseFetchSessions(timer);