KAFKA-7837: Ensure offline partitions are picked up as soon as possible when shrinking ISR (#6202)
Check if a partition is offline while iterating all partitions.
Reviewers: Jun Rao <junrao@gmail.com>
(cherry picked from commit 646ec948794c927e4ffa5f96d60b5b9f7fe8f228)
diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala
index 84b2d48..efdde13 100644
--- a/core/src/main/scala/kafka/server/ReplicaManager.scala
+++ b/core/src/main/scala/kafka/server/ReplicaManager.scala
@@ -400,9 +400,13 @@
def nonOfflinePartition(topicPartition: TopicPartition): Option[Partition] =
getPartition(topicPartition).filter(_ ne ReplicaManager.OfflinePartition)
+ // An iterator over all non offline partitions. This is a weakly consistent iterator; a partition made offline after
+ // the iterator has been constructed could still be returned by this iterator.
private def nonOfflinePartitionsIterator: Iterator[Partition] =
allPartitions.values.iterator.filter(_ ne ReplicaManager.OfflinePartition)
+ // An iterator over all offline partitions. This is a weakly consistent iterator; a partition made offline after the
+ // iterator has been constructed may not be visible.
private def offlinePartitionsIterator: Iterator[Partition] =
allPartitions.values.iterator.filter(_ eq ReplicaManager.OfflinePartition)
@@ -1339,7 +1343,11 @@
private def maybeShrinkIsr(): Unit = {
trace("Evaluating ISR list of partitions to see which replicas can be removed from the ISR")
- nonOfflinePartitionsIterator.foreach(_.maybeShrinkIsr(config.replicaLagTimeMaxMs))
+
+ // Shrink ISRs for non offline partitions
+ allPartitions.keys.foreach { topicPartition =>
+ nonOfflinePartition(topicPartition).foreach(_.maybeShrinkIsr(config.replicaLagTimeMaxMs))
+ }
}
/**