| /** |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package kafka.server |
| |
| import java.net.SocketTimeoutException |
| import java.util |
| |
| import kafka.admin.AdminUtils |
| import kafka.cluster.BrokerEndPoint |
| import kafka.log.LogConfig |
| import kafka.api.{KAFKA_0_10_0_IV0, KAFKA_0_10_1_IV1, KAFKA_0_10_1_IV2, KAFKA_0_9_0} |
| import kafka.common.KafkaStorageException |
| import ReplicaFetcherThread._ |
| import org.apache.kafka.clients.{ClientRequest, ClientResponse, ManualMetadataUpdater, NetworkClient} |
| import org.apache.kafka.common.network.{ChannelBuilders, LoginType, Mode, NetworkReceive, Selectable, Selector} |
| import org.apache.kafka.common.requests.{AbstractRequest, FetchResponse, ListOffsetRequest, ListOffsetResponse} |
| import org.apache.kafka.common.requests.{FetchRequest => JFetchRequest} |
| import org.apache.kafka.common.{Node, TopicPartition} |
| import org.apache.kafka.common.metrics.Metrics |
| import org.apache.kafka.common.protocol.{ApiKeys, Errors, ProtoUtils} |
| import org.apache.kafka.common.record.MemoryRecords |
| import org.apache.kafka.common.utils.Time |
| |
| import scala.collection.Map |
| import scala.collection.JavaConverters._ |
| |
| class ReplicaFetcherThread(name: String, |
| fetcherId: Int, |
| sourceBroker: BrokerEndPoint, |
| brokerConfig: KafkaConfig, |
| replicaMgr: ReplicaManager, |
| metrics: Metrics, |
| time: Time, |
| quota: ReplicationQuotaManager) |
| extends AbstractFetcherThread(name = name, |
| clientId = name, |
| sourceBroker = sourceBroker, |
| fetchBackOffMs = brokerConfig.replicaFetchBackoffMs, |
| isInterruptible = false) { |
| |
| type REQ = FetchRequest |
| type PD = PartitionData |
| |
| private val fetchRequestVersion: Short = |
| if (brokerConfig.interBrokerProtocolVersion >= KAFKA_0_10_1_IV1) 3 |
| else if (brokerConfig.interBrokerProtocolVersion >= KAFKA_0_10_0_IV0) 2 |
| else if (brokerConfig.interBrokerProtocolVersion >= KAFKA_0_9_0) 1 |
| else 0 |
| private val socketTimeout: Int = brokerConfig.replicaSocketTimeoutMs |
| private val replicaId = brokerConfig.brokerId |
| private val maxWait = brokerConfig.replicaFetchWaitMaxMs |
| private val minBytes = brokerConfig.replicaFetchMinBytes |
| private val maxBytes = brokerConfig.replicaFetchResponseMaxBytes |
| private val fetchSize = brokerConfig.replicaFetchMaxBytes |
| |
| private def clientId = name |
| |
| private val sourceNode = new Node(sourceBroker.id, sourceBroker.host, sourceBroker.port) |
| |
| // we need to include both the broker id and the fetcher id |
| // as the metrics tag to avoid metric name conflicts with |
| // more than one fetcher thread to the same broker |
| private val networkClient = { |
| val channelBuilder = ChannelBuilders.clientChannelBuilder( |
| brokerConfig.interBrokerSecurityProtocol, |
| LoginType.SERVER, |
| brokerConfig.values, |
| brokerConfig.saslMechanismInterBrokerProtocol, |
| brokerConfig.saslInterBrokerHandshakeRequestEnable |
| ) |
| val selector = new Selector( |
| NetworkReceive.UNLIMITED, |
| brokerConfig.connectionsMaxIdleMs, |
| metrics, |
| time, |
| "replica-fetcher", |
| Map("broker-id" -> sourceBroker.id.toString, "fetcher-id" -> fetcherId.toString).asJava, |
| false, |
| channelBuilder |
| ) |
| new NetworkClient( |
| selector, |
| new ManualMetadataUpdater(), |
| clientId, |
| 1, |
| 0, |
| Selectable.USE_DEFAULT_BUFFER_SIZE, |
| brokerConfig.replicaSocketReceiveBufferBytes, |
| brokerConfig.requestTimeoutMs, |
| time, |
| false |
| ) |
| } |
| |
| override def shutdown(): Unit = { |
| super.shutdown() |
| networkClient.close() |
| } |
| |
| // process fetched data |
| def processPartitionData(topicPartition: TopicPartition, fetchOffset: Long, partitionData: PartitionData) { |
| try { |
| val replica = replicaMgr.getReplica(topicPartition).get |
| val records = partitionData.toRecords |
| |
| maybeWarnIfOversizedRecords(records, topicPartition) |
| |
| if (fetchOffset != replica.logEndOffset.messageOffset) |
| throw new RuntimeException("Offset mismatch for partition %s: fetched offset = %d, log end offset = %d.".format(topicPartition, fetchOffset, replica.logEndOffset.messageOffset)) |
| if (logger.isTraceEnabled) |
| trace("Follower %d has replica log end offset %d for partition %s. Received %d messages and leader hw %d" |
| .format(replica.brokerId, replica.logEndOffset.messageOffset, topicPartition, records.sizeInBytes, partitionData.highWatermark)) |
| replica.log.get.append(records, assignOffsets = false) |
| if (logger.isTraceEnabled) |
| trace("Follower %d has replica log end offset %d after appending %d bytes of messages for partition %s" |
| .format(replica.brokerId, replica.logEndOffset.messageOffset, records.sizeInBytes, topicPartition)) |
| val followerHighWatermark = replica.logEndOffset.messageOffset.min(partitionData.highWatermark) |
| // for the follower replica, we do not need to keep |
| // its segment base offset the physical position, |
| // these values will be computed upon making the leader |
| replica.highWatermark = new LogOffsetMetadata(followerHighWatermark) |
| if (logger.isTraceEnabled) |
| trace(s"Follower ${replica.brokerId} set replica high watermark for partition $topicPartition to $followerHighWatermark") |
| if (quota.isThrottled(topicPartition)) |
| quota.record(records.sizeInBytes) |
| } catch { |
| case e: KafkaStorageException => |
| fatal(s"Disk error while replicating data for $topicPartition", e) |
| Runtime.getRuntime.halt(1) |
| } |
| } |
| |
| def maybeWarnIfOversizedRecords(records: MemoryRecords, topicPartition: TopicPartition): Unit = { |
| // oversized messages don't cause replication to fail from fetch request version 3 (KIP-74) |
| if (fetchRequestVersion <= 2 && records.sizeInBytes > 0 && records.validBytes <= 0) |
| error(s"Replication is failing due to a message that is greater than replica.fetch.max.bytes for partition $topicPartition. " + |
| "This generally occurs when the max.message.bytes has been overridden to exceed this value and a suitably large " + |
| "message has also been sent. To fix this problem increase replica.fetch.max.bytes in your broker config to be " + |
| "equal or larger than your settings for max.message.bytes, both at a broker and topic level.") |
| } |
| |
| /** |
| * Handle a partition whose offset is out of range and return a new fetch offset. |
| */ |
| def handleOffsetOutOfRange(topicPartition: TopicPartition): Long = { |
| val replica = replicaMgr.getReplica(topicPartition).get |
| |
| /** |
| * Unclean leader election: A follower goes down, in the meanwhile the leader keeps appending messages. The follower comes back up |
| * and before it has completely caught up with the leader's logs, all replicas in the ISR go down. The follower is now uncleanly |
| * elected as the new leader, and it starts appending messages from the client. The old leader comes back up, becomes a follower |
| * and it may discover that the current leader's end offset is behind its own end offset. |
| * |
| * In such a case, truncate the current follower's log to the current leader's end offset and continue fetching. |
| * |
| * There is a potential for a mismatch between the logs of the two replicas here. We don't fix this mismatch as of now. |
| */ |
| val leaderEndOffset: Long = earliestOrLatestOffset(topicPartition, ListOffsetRequest.LATEST_TIMESTAMP, |
| brokerConfig.brokerId) |
| |
| if (leaderEndOffset < replica.logEndOffset.messageOffset) { |
| // Prior to truncating the follower's log, ensure that doing so is not disallowed by the configuration for unclean leader election. |
| // This situation could only happen if the unclean election configuration for a topic changes while a replica is down. Otherwise, |
| // we should never encounter this situation since a non-ISR leader cannot be elected if disallowed by the broker configuration. |
| if (!LogConfig.fromProps(brokerConfig.originals, AdminUtils.fetchEntityConfig(replicaMgr.zkUtils, |
| ConfigType.Topic, topicPartition.topic)).uncleanLeaderElectionEnable) { |
| // Log a fatal error and shutdown the broker to ensure that data loss does not unexpectedly occur. |
| fatal("Exiting because log truncation is not allowed for partition %s,".format(topicPartition) + |
| " Current leader %d's latest offset %d is less than replica %d's latest offset %d" |
| .format(sourceBroker.id, leaderEndOffset, brokerConfig.brokerId, replica.logEndOffset.messageOffset)) |
| System.exit(1) |
| } |
| |
| warn("Replica %d for partition %s reset its fetch offset from %d to current leader %d's latest offset %d" |
| .format(brokerConfig.brokerId, topicPartition, replica.logEndOffset.messageOffset, sourceBroker.id, leaderEndOffset)) |
| replicaMgr.logManager.truncateTo(Map(topicPartition -> leaderEndOffset)) |
| leaderEndOffset |
| } else { |
| /** |
| * If the leader's log end offset is greater than the follower's log end offset, there are two possibilities: |
| * 1. The follower could have been down for a long time and when it starts up, its end offset could be smaller than the leader's |
| * start offset because the leader has deleted old logs (log.logEndOffset < leaderStartOffset). |
| * 2. When unclean leader election occurs, it is possible that the old leader's high watermark is greater than |
| * the new leader's log end offset. So when the old leader truncates its offset to its high watermark and starts |
| * to fetch from the new leader, an OffsetOutOfRangeException will be thrown. After that some more messages are |
| * produced to the new leader. While the old leader is trying to handle the OffsetOutOfRangeException and query |
| * the log end offset of the new leader, the new leader's log end offset becomes higher than the follower's log end offset. |
| * |
| * In the first case, the follower's current log end offset is smaller than the leader's log start offset. So the |
| * follower should truncate all its logs, roll out a new segment and start to fetch from the current leader's log |
| * start offset. |
| * In the second case, the follower should just keep the current log segments and retry the fetch. In the second |
| * case, there will be some inconsistency of data between old and new leader. We are not solving it here. |
| * If users want to have strong consistency guarantees, appropriate configurations needs to be set for both |
| * brokers and producers. |
| * |
| * Putting the two cases together, the follower should fetch from the higher one of its replica log end offset |
| * and the current leader's log start offset. |
| * |
| */ |
| val leaderStartOffset: Long = earliestOrLatestOffset(topicPartition, ListOffsetRequest.EARLIEST_TIMESTAMP, |
| brokerConfig.brokerId) |
| warn("Replica %d for partition %s reset its fetch offset from %d to current leader %d's start offset %d" |
| .format(brokerConfig.brokerId, topicPartition, replica.logEndOffset.messageOffset, sourceBroker.id, leaderStartOffset)) |
| val offsetToFetch = Math.max(leaderStartOffset, replica.logEndOffset.messageOffset) |
| // Only truncate log when current leader's log start offset is greater than follower's log end offset. |
| if (leaderStartOffset > replica.logEndOffset.messageOffset) |
| replicaMgr.logManager.truncateFullyAndStartAt(topicPartition, leaderStartOffset) |
| offsetToFetch |
| } |
| } |
| |
| // any logic for partitions whose leader has changed |
| def handlePartitionsWithErrors(partitions: Iterable[TopicPartition]) { |
| delayPartitions(partitions, brokerConfig.replicaFetchBackoffMs.toLong) |
| } |
| |
| protected def fetch(fetchRequest: FetchRequest): Seq[(TopicPartition, PartitionData)] = { |
| val clientResponse = sendRequest(fetchRequest.underlying) |
| val fetchResponse = clientResponse.responseBody.asInstanceOf[FetchResponse] |
| fetchResponse.responseData.asScala.toSeq.map { case (key, value) => |
| key -> new PartitionData(value) |
| } |
| } |
| |
| private def sendRequest(requestBuilder: AbstractRequest.Builder[_ <: AbstractRequest]): ClientResponse = { |
| import kafka.utils.NetworkClientBlockingOps._ |
| try { |
| if (!networkClient.blockingReady(sourceNode, socketTimeout)(time)) |
| throw new SocketTimeoutException(s"Failed to connect within $socketTimeout ms") |
| else { |
| val clientRequest = networkClient.newClientRequest(sourceBroker.id.toString, requestBuilder, |
| time.milliseconds(), true) |
| networkClient.blockingSendAndReceive(clientRequest)(time) |
| } |
| } |
| catch { |
| case e: Throwable => |
| networkClient.close(sourceBroker.id.toString) |
| throw e |
| } |
| |
| } |
| |
| private def earliestOrLatestOffset(topicPartition: TopicPartition, earliestOrLatest: Long, consumerId: Int): Long = { |
| val requestBuilder = if (brokerConfig.interBrokerProtocolVersion >= KAFKA_0_10_1_IV2) { |
| val partitions = Map(topicPartition -> (earliestOrLatest: java.lang.Long)) |
| new ListOffsetRequest.Builder(consumerId). |
| setTargetTimes(partitions.asJava). |
| setVersion(1) |
| } else { |
| val partitions = Map(topicPartition -> new ListOffsetRequest.PartitionData(earliestOrLatest, 1)) |
| new ListOffsetRequest.Builder(consumerId). |
| setOffsetData(partitions.asJava). |
| setVersion(0) |
| } |
| val clientResponse = sendRequest(requestBuilder) |
| val response = clientResponse.responseBody.asInstanceOf[ListOffsetResponse] |
| val partitionData = response.responseData.get(topicPartition) |
| Errors.forCode(partitionData.errorCode) match { |
| case Errors.NONE => |
| if (brokerConfig.interBrokerProtocolVersion >= KAFKA_0_10_1_IV2) |
| partitionData.offset |
| else |
| partitionData.offsets.get(0) |
| case errorCode => throw errorCode.exception |
| } |
| } |
| |
| protected def buildFetchRequest(partitionMap: Seq[(TopicPartition, PartitionFetchState)]): FetchRequest = { |
| val requestMap = new util.LinkedHashMap[TopicPartition, JFetchRequest.PartitionData] |
| |
| partitionMap.foreach { case (topicPartition, partitionFetchState) => |
| // We will not include a replica in the fetch request if it should be throttled. |
| if (partitionFetchState.isActive && !shouldFollowerThrottle(quota, topicPartition)) |
| requestMap.put(topicPartition, new JFetchRequest.PartitionData(partitionFetchState.offset, fetchSize)) |
| } |
| |
| val requestBuilder = new JFetchRequest.Builder(maxWait, minBytes, requestMap). |
| setReplicaId(replicaId).setMaxBytes(maxBytes) |
| requestBuilder.setVersion(fetchRequestVersion) |
| new FetchRequest(requestBuilder) |
| } |
| |
| /** |
| * To avoid ISR thrashing, we only throttle a replica on the follower if it's in the throttled replica list, |
| * the quota is exceeded and the replica is not in sync. |
| */ |
| private def shouldFollowerThrottle(quota: ReplicaQuota, topicPartition: TopicPartition): Boolean = { |
| val isReplicaInSync = fetcherLagStats.isReplicaInSync(topicPartition.topic, topicPartition.partition) |
| quota.isThrottled(topicPartition) && quota.isQuotaExceeded && !isReplicaInSync |
| } |
| } |
| |
| object ReplicaFetcherThread { |
| |
| private[server] class FetchRequest(val underlying: JFetchRequest.Builder) extends AbstractFetcherThread.FetchRequest { |
| def isEmpty: Boolean = underlying.fetchData().isEmpty |
| def offset(topicPartition: TopicPartition): Long = |
| underlying.fetchData().asScala(topicPartition).offset |
| } |
| |
| private[server] class PartitionData(val underlying: FetchResponse.PartitionData) extends AbstractFetcherThread.PartitionData { |
| |
| def errorCode: Short = underlying.errorCode |
| |
| def toRecords: MemoryRecords = { |
| underlying.records.asInstanceOf[MemoryRecords] |
| } |
| |
| def highWatermark: Long = underlying.highWatermark |
| |
| def exception: Option[Throwable] = Errors.forCode(errorCode) match { |
| case Errors.NONE => None |
| case e => Some(e.exception) |
| } |
| |
| } |
| |
| } |