KAFKA-7104: More consistent leader's state in fetch response (#5305)

Do not update LogReadResult after it is initially populated when returning fetches immediately (i.e. without hitting the purgatory). This was done in #3954 as an optimization so that the followers get the potentially updated high watermark. However, since many things can happen (like deleting old segments and advancing log start offset) between initial creation of LogReadResult and the update, we can hit issues like log start offset in fetch response being higher than the last offset in fetched records.

Reviewers: Jason Gustafson <jason@confluent.io>, Ismael Juma <ismael@juma.me.uk>
diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala
index 9e0c83e..2a9e3a6 100644
--- a/core/src/main/scala/kafka/server/ReplicaManager.scala
+++ b/core/src/main/scala/kafka/server/ReplicaManager.scala
@@ -90,11 +90,6 @@
     case Some(e) => Errors.forException(e)
   }
 
-  def updateLeaderReplicaInfo(leaderReplica: Replica): LogReadResult =
-    copy(highWatermark = leaderReplica.highWatermark.messageOffset,
-      leaderLogStartOffset = leaderReplica.logStartOffset,
-      leaderLogEndOffset = leaderReplica.logEndOffset.messageOffset)
-
   def withEmptyFetchInfo: LogReadResult =
     copy(info = FetchDataInfo(LogOffsetMetadata.UnknownOffsetMetadata, MemoryRecords.EMPTY))
 
@@ -1300,7 +1295,12 @@
 
   /**
    * Update the follower's fetch state in the leader based on the last fetch request and update `readResult`,
-   * if necessary.
+   * if the follower replica is not recognized to be one of the assigned replicas. Do not update
+   * `readResult` otherwise, so that log start/end offset and high watermark is consistent with
+   * records in fetch response. Log start/end offset and high watermark may change not only due to
+   * this fetch request, e.g., rolling new log segment and removing old log segment may move log
+   * start offset further than the last offset in the fetched records. The followers will get the
+   * updated leader's state in the next fetch response.
    */
   private def updateFollowerLogReadResults(replicaId: Int,
                                            readResults: Seq[(TopicPartition, LogReadResult)]): Seq[(TopicPartition, LogReadResult)] = {
@@ -1311,10 +1311,7 @@
         case Some(partition) =>
           partition.getReplica(replicaId) match {
             case Some(replica) =>
-              if (partition.updateReplicaLogReadResult(replica, readResult))
-                partition.leaderReplicaIfLocal.foreach { leaderReplica =>
-                  updatedReadResult = readResult.updateLeaderReplicaInfo(leaderReplica)
-                }
+              partition.updateReplicaLogReadResult(replica, readResult)
             case None =>
               warn(s"Leader $localBrokerId failed to record follower $replicaId's position " +
                 s"${readResult.info.fetchOffsetMetadata.messageOffset} since the replica is not recognized to be " +
diff --git a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
index 642cf04..359087d 100644
--- a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
@@ -460,7 +460,9 @@
 
         val tp0Status = responseStatusMap.get(tp0)
         assertTrue(tp0Status.isDefined)
-        assertEquals(1, tp0Status.get.highWatermark)
+        // the response contains high watermark on the leader before it is updated based
+        // on this fetch request
+        assertEquals(0, tp0Status.get.highWatermark)
         assertEquals(None, tp0Status.get.lastStableOffset)
         assertEquals(Errors.NONE, tp0Status.get.error)
         assertTrue(tp0Status.get.records.batches.iterator.hasNext)