* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* See the License for the specific language governing permissions and
* limitations under the License.
package kafka.server
import java.util.concurrent.TimeUnit
import kafka.metrics.KafkaMetricsGroup
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.errors.{NotLeaderForPartitionException, UnknownTopicOrPartitionException}
import org.apache.kafka.common.requests.FetchRequest.PartitionData
import scala.collection._
case class FetchPartitionStatus(startOffsetMetadata: LogOffsetMetadata, fetchInfo: PartitionData) {
override def toString = "[startOffsetMetadata: " + startOffsetMetadata + ", " +
"fetchInfo: " + fetchInfo + "]"
* The fetch metadata maintained by the delayed fetch operation
case class FetchMetadata(fetchMinBytes: Int,
fetchMaxBytes: Int,
hardMaxBytesLimit: Boolean,
fetchOnlyLeader: Boolean,
fetchOnlyCommitted: Boolean,
isFromFollower: Boolean,
replicaId: Int,
fetchPartitionStatus: Seq[(TopicPartition, FetchPartitionStatus)]) {
override def toString = s"[minBytes: $fetchMinBytes, onlyLeader: $fetchOnlyLeader, " +
s"onlyCommitted: $fetchOnlyCommitted, partitionStatus: $fetchPartitionStatus]"
* A delayed fetch operation that can be created by the replica manager and watched
* in the fetch operation purgatory
class DelayedFetch(delayMs: Long,
fetchMetadata: FetchMetadata,
replicaManager: ReplicaManager,
quota: ReplicaQuota,
responseCallback: Seq[(TopicPartition, FetchPartitionData)] => Unit)
extends DelayedOperation(delayMs) {
* The operation can be completed if:
* Case A: This broker is no longer the leader for some partitions it tries to fetch
* Case B: This broker does not know of some partitions it tries to fetch
* Case C: The fetch offset locates not on the last segment of the log
* Case D: The accumulated bytes from all the fetching partitions exceeds the minimum bytes
* Upon completion, should return whatever data is available for each valid partition
override def tryComplete() : Boolean = {
var accumulatedSize = 0
var accumulatedThrottledSize = 0
fetchMetadata.fetchPartitionStatus.foreach {
case (topicPartition, fetchStatus) =>
val fetchOffset = fetchStatus.startOffsetMetadata
try {
if (fetchOffset != LogOffsetMetadata.UnknownOffsetMetadata) {
val replica = replicaManager.getLeaderReplicaIfLocal(topicPartition)
val endOffset =
if (fetchMetadata.fetchOnlyCommitted)
// Go directly to the check for Case D if the message offsets are the same. If the log segment
// has just rolled, then the high watermark offset will remain the same but be on the old segment,
// which would incorrectly be seen as an instance of Case C.
if (endOffset.messageOffset != fetchOffset.messageOffset) {
if (endOffset.onOlderSegment(fetchOffset)) {
// Case C, this can happen when the new fetch operation is on a truncated leader
debug("Satisfying fetch %s since it is fetching later segments of partition %s.".format(fetchMetadata, topicPartition))
return forceComplete()
} else if (fetchOffset.onOlderSegment(endOffset)) {
// Case C, this can happen when the fetch operation is falling behind the current segment
// or the partition has just rolled a new segment
debug("Satisfying fetch %s immediately since it is fetching older segments.".format(fetchMetadata))
// We will not force complete the fetch request if a replica should be throttled.
if (!replicaManager.shouldLeaderThrottle(quota, topicPartition, fetchMetadata.replicaId))
return forceComplete()
} else if (fetchOffset.messageOffset < endOffset.messageOffset) {
// we take the partition fetch size as upper bound when accumulating the bytes (skip if a throttled partition)
val bytesAvailable = math.min(endOffset.positionDiff(fetchOffset), fetchStatus.fetchInfo.maxBytes)
if (quota.isThrottled(topicPartition))
accumulatedThrottledSize += bytesAvailable
accumulatedSize += bytesAvailable
} catch {
case _: UnknownTopicOrPartitionException => // Case B
debug("Broker no longer know of %s, satisfy %s immediately".format(topicPartition, fetchMetadata))
return forceComplete()
case _: NotLeaderForPartitionException => // Case A
debug("Broker is no longer the leader of %s, satisfy %s immediately".format(topicPartition, fetchMetadata))
return forceComplete()
// Case D
if (accumulatedSize >= fetchMetadata.fetchMinBytes
|| ((accumulatedSize + accumulatedThrottledSize) >= fetchMetadata.fetchMinBytes && !quota.isQuotaExceeded()))
override def onExpiration() {
if (fetchMetadata.isFromFollower)
* Upon completion, read whatever data is available and pass to the complete callback
override def onComplete() {
val logReadResults = replicaManager.readFromLocalLog(
replicaId = fetchMetadata.replicaId,
fetchOnlyFromLeader = fetchMetadata.fetchOnlyLeader,
readOnlyCommitted = fetchMetadata.fetchOnlyCommitted,
fetchMaxBytes = fetchMetadata.fetchMaxBytes,
hardMaxBytesLimit = fetchMetadata.hardMaxBytesLimit,
readPartitionInfo = { case (tp, status) => tp -> status.fetchInfo },
quota = quota
val fetchPartitionData = { case (tp, result) =>
tp -> FetchPartitionData(result.error, result.hw,
object DelayedFetchMetrics extends KafkaMetricsGroup {
private val FetcherTypeKey = "fetcherType"
val followerExpiredRequestMeter = newMeter("ExpiresPerSec", "requests", TimeUnit.SECONDS, tags = Map(FetcherTypeKey -> "follower"))
val consumerExpiredRequestMeter = newMeter("ExpiresPerSec", "requests", TimeUnit.SECONDS, tags = Map(FetcherTypeKey -> "consumer"))