KAFKA-7837: Ensure offline partitions are picked up as soon as possible when shrinking ISR (#6202)

Check if a partition is offline while iterating all partitions.

Reviewers: Jun Rao <junrao@gmail.com>
(cherry picked from commit 646ec948794c927e4ffa5f96d60b5b9f7fe8f228)
diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala
index 84b2d48..efdde13 100644
--- a/core/src/main/scala/kafka/server/ReplicaManager.scala
+++ b/core/src/main/scala/kafka/server/ReplicaManager.scala
@@ -400,9 +400,13 @@
   def nonOfflinePartition(topicPartition: TopicPartition): Option[Partition] =
     getPartition(topicPartition).filter(_ ne ReplicaManager.OfflinePartition)
 
+  // An iterator over all non offline partitions. This is a weakly consistent iterator; a partition made offline after
+  // the iterator has been constructed could still be returned by this iterator.
   private def nonOfflinePartitionsIterator: Iterator[Partition] =
     allPartitions.values.iterator.filter(_ ne ReplicaManager.OfflinePartition)
 
+  // An iterator over all offline partitions. This is a weakly consistent iterator; a partition made offline after the
+  // iterator has been constructed may not be visible.
   private def offlinePartitionsIterator: Iterator[Partition] =
     allPartitions.values.iterator.filter(_ eq ReplicaManager.OfflinePartition)
 
@@ -1339,7 +1343,11 @@
 
   private def maybeShrinkIsr(): Unit = {
     trace("Evaluating ISR list of partitions to see which replicas can be removed from the ISR")
-    nonOfflinePartitionsIterator.foreach(_.maybeShrinkIsr(config.replicaLagTimeMaxMs))
+
+    // Shrink ISRs for non offline partitions
+    allPartitions.keys.foreach { topicPartition =>
+      nonOfflinePartition(topicPartition).foreach(_.maybeShrinkIsr(config.replicaLagTimeMaxMs))
+    }
   }
 
   /**