| /** |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package kafka.cluster |
| |
| import kafka.log.Log |
| import kafka.utils.Logging |
| import kafka.server.{LogOffsetMetadata, LogReadResult} |
| import kafka.common.KafkaException |
| import java.util.concurrent.atomic.AtomicLong |
| |
| import org.apache.kafka.common.utils.Time |
| |
| class Replica(val brokerId: Int, |
| val partition: Partition, |
| time: Time = Time.SYSTEM, |
| initialHighWatermarkValue: Long = 0L, |
| val log: Option[Log] = None) extends Logging { |
| // the high watermark offset value, in non-leader replicas only its message offsets are kept |
| @volatile private[this] var highWatermarkMetadata = new LogOffsetMetadata(initialHighWatermarkValue) |
| // the log end offset value, kept in all replicas; |
| // for local replica it is the log's end offset, for remote replicas its value is only updated by follower fetch |
| @volatile private[this] var logEndOffsetMetadata = LogOffsetMetadata.UnknownOffsetMetadata |
| |
| // The log end offset value at the time the leader received the last FetchRequest from this follower |
| // This is used to determine the lastCaughtUpTimeMs of the follower |
| @volatile private[this] var lastFetchLeaderLogEndOffset = 0L |
| |
| // The time when the leader received the last FetchRequest from this follower |
| // This is used to determine the lastCaughtUpTimeMs of the follower |
| @volatile private[this] var lastFetchTimeMs = 0L |
| |
| // lastCaughtUpTimeMs is the largest time t such that the offset of most recent FetchRequest from this follower >= |
| // the LEO of leader at time t. This is used to determine the lag of this follower and ISR of this partition. |
| @volatile private[this] var _lastCaughtUpTimeMs = 0L |
| |
| val topicPartition = partition.topicPartition |
| |
| def isLocal: Boolean = log.isDefined |
| |
| def lastCaughtUpTimeMs = _lastCaughtUpTimeMs |
| |
| /* |
| * If the FetchRequest reads up to the log end offset of the leader when the current fetch request is received, |
| * set `lastCaughtUpTimeMs` to the time when the current fetch request was received. |
| * |
| * Else if the FetchRequest reads up to the log end offset of the leader when the previous fetch request was received, |
| * set `lastCaughtUpTimeMs` to the time when the previous fetch request was received. |
| * |
| * This is needed to enforce the semantics of ISR, i.e. a replica is in ISR if and only if it lags behind leader's LEO |
| * by at most `replicaLagTimeMaxMs`. These semantics allow a follower to be added to the ISR even if the offset of its |
| * fetch request is always smaller than the leader's LEO, which can happen if small produce requests are received at |
| * high frequency. |
| */ |
| def updateLogReadResult(logReadResult : LogReadResult) { |
| if (logReadResult.info.fetchOffsetMetadata.messageOffset >= logReadResult.leaderLogEndOffset) |
| _lastCaughtUpTimeMs = math.max(_lastCaughtUpTimeMs, logReadResult.fetchTimeMs) |
| else if (logReadResult.info.fetchOffsetMetadata.messageOffset >= lastFetchLeaderLogEndOffset) |
| _lastCaughtUpTimeMs = math.max(_lastCaughtUpTimeMs, lastFetchTimeMs) |
| |
| logEndOffset = logReadResult.info.fetchOffsetMetadata |
| lastFetchLeaderLogEndOffset = logReadResult.leaderLogEndOffset |
| lastFetchTimeMs = logReadResult.fetchTimeMs |
| } |
| |
| def resetLastCaughtUpTime(curLeaderLogEndOffset: Long, curTimeMs: Long, lastCaughtUpTimeMs: Long) { |
| lastFetchLeaderLogEndOffset = curLeaderLogEndOffset |
| lastFetchTimeMs = curTimeMs |
| _lastCaughtUpTimeMs = lastCaughtUpTimeMs |
| } |
| |
| private def logEndOffset_=(newLogEndOffset: LogOffsetMetadata) { |
| if (isLocal) { |
| throw new KafkaException(s"Should not set log end offset on partition $topicPartition's local replica $brokerId") |
| } else { |
| logEndOffsetMetadata = newLogEndOffset |
| trace(s"Setting log end offset for replica $brokerId for partition $topicPartition to [$logEndOffsetMetadata]") |
| } |
| } |
| |
| def logEndOffset = |
| if (isLocal) |
| log.get.logEndOffsetMetadata |
| else |
| logEndOffsetMetadata |
| |
| def highWatermark_=(newHighWatermark: LogOffsetMetadata) { |
| if (isLocal) { |
| highWatermarkMetadata = newHighWatermark |
| trace(s"Setting high watermark for replica $brokerId partition $topicPartition to [$newHighWatermark]") |
| } else { |
| throw new KafkaException(s"Should not set high watermark on partition $topicPartition's non-local replica $brokerId") |
| } |
| } |
| |
| def highWatermark = highWatermarkMetadata |
| |
| def convertHWToLocalOffsetMetadata() = { |
| if (isLocal) { |
| highWatermarkMetadata = log.get.convertToOffsetMetadata(highWatermarkMetadata.messageOffset) |
| } else { |
| throw new KafkaException(s"Should not construct complete high watermark on partition $topicPartition's non-local replica $brokerId") |
| } |
| } |
| |
| override def equals(that: Any): Boolean = that match { |
| case other: Replica => brokerId == other.brokerId && topicPartition == other.topicPartition |
| case _ => false |
| } |
| |
| override def hashCode: Int = 31 + topicPartition.hashCode + 17 * brokerId |
| |
| override def toString: String = { |
| val replicaString = new StringBuilder |
| replicaString.append("ReplicaId: " + brokerId) |
| replicaString.append("; Topic: " + partition.topic) |
| replicaString.append("; Partition: " + partition.partitionId) |
| replicaString.append("; isLocal: " + isLocal) |
| replicaString.append("; lastCaughtUpTimeMs: " + lastCaughtUpTimeMs) |
| if (isLocal) replicaString.append("; Highwatermark: " + highWatermark) |
| replicaString.toString |
| } |
| } |