KAFKA-16029: Fix "Unable to find FetchSessionHandler for node X" bug (#15186)
Change `AbstractFetcher`/`Fetcher` to _not_ clear the `sessionHandlers` cache during `prepareCloseFetchSessionRequests()`.
During `close()`, `Fetcher` calls `maybeCloseFetchSessions()` which, in turn, calls `prepareCloseFetchSessionRequests()` and then calls `NetworkClient.poll()` to complete the requests. Since `prepareCloseFetchSessionRequests()` (erroneously) clears the `sessionHandlers` cache, when the response is processed, the sessions are missing, and the warning is logged.
Reviewers: Andrew Schofield <aschofield@confluent.io>, David Jacot <djacot@confluent.io>
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractFetch.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractFetch.java
index 1d0c9b0..73584ce 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractFetch.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractFetch.java
@@ -376,25 +376,21 @@
final Cluster cluster = metadata.fetch();
Map<Node, FetchSessionHandler.Builder> fetchable = new HashMap<>();
- try {
- sessionHandlers.forEach((fetchTargetNodeId, sessionHandler) -> {
- // set the session handler to notify close. This will set the next metadata request to send close message.
- sessionHandler.notifyClose();
+ sessionHandlers.forEach((fetchTargetNodeId, sessionHandler) -> {
+ // set the session handler to notify close. This will set the next metadata request to send close message.
+ sessionHandler.notifyClose();
- // FetchTargetNode may not be available as it may have disconnected the connection. In such cases, we will
- // skip sending the close request.
- final Node fetchTarget = cluster.nodeById(fetchTargetNodeId);
+ // FetchTargetNode may not be available as it may have disconnected the connection. In such cases, we will
+ // skip sending the close request.
+ final Node fetchTarget = cluster.nodeById(fetchTargetNodeId);
- if (fetchTarget == null || isUnavailable(fetchTarget)) {
- log.debug("Skip sending close session request to broker {} since it is not reachable", fetchTarget);
- return;
- }
+ if (fetchTarget == null || isUnavailable(fetchTarget)) {
+ log.debug("Skip sending close session request to broker {} since it is not reachable", fetchTarget);
+ return;
+ }
- fetchable.put(fetchTarget, sessionHandler.newBuilder());
- });
- } finally {
- sessionHandlers.clear();
- }
+ fetchable.put(fetchTarget, sessionHandler.newBuilder());
+ });
return fetchable.entrySet().stream().collect(Collectors.toMap(Map.Entry::getKey, e -> e.getValue().build()));
}
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
index 51b2532..3c8a9c1 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
@@ -150,10 +150,19 @@
* be executed once the first time that any of the {@link #close()} methods are called. Subclasses can override
* this method without the need for extra synchronization at the instance level.
*
+ * <p/>
+ *
+ * <em>Note</em>: this method is <code>synchronized</code> to reinstitute the 3.5 behavior:
+ *
+ * <blockquote>
+ * Shared states (e.g. sessionHandlers) could be accessed by multiple threads (such as heartbeat thread), hence,
+ * it is necessary to acquire a lock on the fetcher instance before modifying the states.
+ * </blockquote>
+ *
* @param timer Timer to enforce time limit
*/
// Visible for testing
- protected void closeInternal(Timer timer) {
+ protected synchronized void closeInternal(Timer timer) {
// we do not need to re-enable wake-ups since we are closing already
client.disableWakeups();
maybeCloseFetchSessions(timer);