KAFKA-17098: Re-add task to state updater if transit to RUNNING fails (#16570)
When Streams tries to transit a restored active task to RUNNING, the
first thing it does is getting the committed offsets for this task.
If getting the offsets expires a timeout, Streams does not re-throw
the error initially, but tries to get the committed offsets later
until a Streams-specific timeout is hit.
Restored active tasks from the state updater are removed from the
output queue of the restored tasks in the state updater. If a
timeout occurs, the restored task is neither added to the
task registry nor re-added to the state updater. The task is lost
since it is not maintained anywhere. This means the task is also
not closed. When the same task is created again on the same
stream thread since the stream thread does not know about this
lost task, the state stores are opened again and RocksDB will
throw the "No locks available" error.
This commit re-adds the task to the state updater if the
committed request times out.
Reviewers: Lucas Brutschy <lbrutschy@confluent.io>
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
index 9f49888..62e6487 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
@@ -979,9 +979,11 @@
task.clearTaskTimeout();
} catch (final TimeoutException timeoutException) {
task.maybeInitTaskTimeoutOrThrow(now, timeoutException);
+ stateUpdater.add(task);
log.debug(
String.format(
- "Could not complete restoration for %s due to the following exception; will retry",
+ "Could not complete restoration for %s due to the following exception; adding the task " +
+ "back to the state updater and will retry",
task.id()),
timeoutException
);
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java
index c819c3c..c6b958b 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java
@@ -1510,6 +1510,7 @@
taskManager.checkStateUpdater(time.milliseconds(), noOpResetter);
verify(task).maybeInitTaskTimeoutOrThrow(anyLong(), eq(timeoutException));
+ verify(stateUpdater).add(task);
verify(tasks, never()).addTask(task);
verify(task, never()).clearTaskTimeout();
verifyNoInteractions(consumer);