| /** |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package kafka.server |
| |
| import java.util.concurrent.locks.ReentrantLock |
| |
| import kafka.cluster.BrokerEndPoint |
| import kafka.consumer.PartitionTopicInfo |
| import kafka.message.{MessageAndOffset, ByteBufferMessageSet} |
| import kafka.utils.{Pool, ShutdownableThread, DelayedItem} |
| import kafka.common.{KafkaException, ClientIdAndBroker, TopicAndPartition} |
| import kafka.metrics.KafkaMetricsGroup |
| import kafka.utils.CoreUtils.inLock |
| import org.apache.kafka.common.errors.CorruptRecordException |
| import org.apache.kafka.common.protocol.Errors |
| import AbstractFetcherThread._ |
| import scala.collection.{mutable, Set, Map} |
| import java.util.concurrent.TimeUnit |
| import java.util.concurrent.atomic.AtomicLong |
| |
| import com.yammer.metrics.core.Gauge |
| |
| /** |
| * Abstract class for fetching data from multiple partitions from the same broker. |
| */ |
| abstract class AbstractFetcherThread(name: String, |
| clientId: String, |
| sourceBroker: BrokerEndPoint, |
| fetchBackOffMs: Int = 0, |
| isInterruptible: Boolean = true) |
| extends ShutdownableThread(name, isInterruptible) { |
| |
| type REQ <: FetchRequest |
| type PD <: PartitionData |
| |
| private val partitionMap = new mutable.HashMap[TopicAndPartition, PartitionFetchState] // a (topic, partition) -> partitionFetchState map |
| private val partitionMapLock = new ReentrantLock |
| private val partitionMapCond = partitionMapLock.newCondition() |
| |
| private val metricId = new ClientIdAndBroker(clientId, sourceBroker.host, sourceBroker.port) |
| val fetcherStats = new FetcherStats(metricId) |
| val fetcherLagStats = new FetcherLagStats(metricId) |
| |
| /* callbacks to be defined in subclass */ |
| |
| // process fetched data |
| def processPartitionData(topicAndPartition: TopicAndPartition, fetchOffset: Long, partitionData: PD) |
| |
| // handle a partition whose offset is out of range and return a new fetch offset |
| def handleOffsetOutOfRange(topicAndPartition: TopicAndPartition): Long |
| |
| // deal with partitions with errors, potentially due to leadership changes |
| def handlePartitionsWithErrors(partitions: Iterable[TopicAndPartition]) |
| |
| protected def buildFetchRequest(partitionMap: Map[TopicAndPartition, PartitionFetchState]): REQ |
| |
| protected def fetch(fetchRequest: REQ): Map[TopicAndPartition, PD] |
| |
| override def shutdown(){ |
| initiateShutdown() |
| inLock(partitionMapLock) { |
| partitionMapCond.signalAll() |
| } |
| awaitShutdown() |
| } |
| |
| override def doWork() { |
| |
| val fetchRequest = inLock(partitionMapLock) { |
| val fetchRequest = buildFetchRequest(partitionMap) |
| if (fetchRequest.isEmpty) { |
| trace("There are no active partitions. Back off for %d ms before sending a fetch request".format(fetchBackOffMs)) |
| partitionMapCond.await(fetchBackOffMs, TimeUnit.MILLISECONDS) |
| } |
| fetchRequest |
| } |
| |
| if (!fetchRequest.isEmpty) |
| processFetchRequest(fetchRequest) |
| } |
| |
| private def processFetchRequest(fetchRequest: REQ) { |
| val partitionsWithError = new mutable.HashSet[TopicAndPartition] |
| var responseData: Map[TopicAndPartition, PD] = Map.empty |
| |
| try { |
| trace("Issuing to broker %d of fetch request %s".format(sourceBroker.id, fetchRequest)) |
| responseData = fetch(fetchRequest) |
| } catch { |
| case t: Throwable => |
| if (isRunning.get) { |
| warn(s"Error in fetch $fetchRequest", t) |
| inLock(partitionMapLock) { |
| partitionsWithError ++= partitionMap.keys |
| // there is an error occurred while fetching partitions, sleep a while |
| partitionMapCond.await(fetchBackOffMs, TimeUnit.MILLISECONDS) |
| } |
| } |
| } |
| fetcherStats.requestRate.mark() |
| |
| if (responseData.nonEmpty) { |
| // process fetched data |
| inLock(partitionMapLock) { |
| |
| responseData.foreach { case (topicAndPartition, partitionData) => |
| val TopicAndPartition(topic, partitionId) = topicAndPartition |
| partitionMap.get(topicAndPartition).foreach(currentPartitionFetchState => |
| // we append to the log if the current offset is defined and it is the same as the offset requested during fetch |
| if (fetchRequest.offset(topicAndPartition) == currentPartitionFetchState.offset) { |
| Errors.forCode(partitionData.errorCode) match { |
| case Errors.NONE => |
| try { |
| val messages = partitionData.toByteBufferMessageSet |
| val validBytes = messages.validBytes |
| val newOffset = messages.shallowIterator.toSeq.lastOption match { |
| case Some(m: MessageAndOffset) => m.nextOffset |
| case None => currentPartitionFetchState.offset |
| } |
| partitionMap.put(topicAndPartition, new PartitionFetchState(newOffset)) |
| fetcherLagStats.getFetcherLagStats(topic, partitionId).lag = Math.max(0L, partitionData.highWatermark - newOffset) |
| fetcherStats.byteRate.mark(validBytes) |
| // Once we hand off the partition data to the subclass, we can't mess with it any more in this thread |
| processPartitionData(topicAndPartition, currentPartitionFetchState.offset, partitionData) |
| } catch { |
| case ime: CorruptRecordException => |
| // we log the error and continue. This ensures two things |
| // 1. If there is a corrupt message in a topic partition, it does not bring the fetcher thread down and cause other topic partition to also lag |
| // 2. If the message is corrupt due to a transient state in the log (truncation, partial writes can cause this), we simply continue and |
| // should get fixed in the subsequent fetches |
| logger.error("Found invalid messages during fetch for partition [" + topic + "," + partitionId + "] offset " + currentPartitionFetchState.offset + " error " + ime.getMessage) |
| case e: Throwable => |
| throw new KafkaException("error processing data for partition [%s,%d] offset %d" |
| .format(topic, partitionId, currentPartitionFetchState.offset), e) |
| } |
| case Errors.OFFSET_OUT_OF_RANGE => |
| try { |
| val newOffset = handleOffsetOutOfRange(topicAndPartition) |
| partitionMap.put(topicAndPartition, new PartitionFetchState(newOffset)) |
| error("Current offset %d for partition [%s,%d] out of range; reset offset to %d" |
| .format(currentPartitionFetchState.offset, topic, partitionId, newOffset)) |
| } catch { |
| case e: Throwable => |
| error("Error getting offset for partition [%s,%d] to broker %d".format(topic, partitionId, sourceBroker.id), e) |
| partitionsWithError += topicAndPartition |
| } |
| case _ => |
| if (isRunning.get) { |
| error("Error for partition [%s,%d] to broker %d:%s".format(topic, partitionId, sourceBroker.id, |
| partitionData.exception.get)) |
| partitionsWithError += topicAndPartition |
| } |
| } |
| }) |
| } |
| } |
| } |
| |
| if (partitionsWithError.nonEmpty) { |
| debug("handling partitions with error for %s".format(partitionsWithError)) |
| handlePartitionsWithErrors(partitionsWithError) |
| } |
| } |
| |
| def addPartitions(partitionAndOffsets: Map[TopicAndPartition, Long]) { |
| partitionMapLock.lockInterruptibly() |
| try { |
| for ((topicAndPartition, offset) <- partitionAndOffsets) { |
| // If the partitionMap already has the topic/partition, then do not update the map with the old offset |
| if (!partitionMap.contains(topicAndPartition)) |
| partitionMap.put( |
| topicAndPartition, |
| if (PartitionTopicInfo.isOffsetInvalid(offset)) new PartitionFetchState(handleOffsetOutOfRange(topicAndPartition)) |
| else new PartitionFetchState(offset) |
| )} |
| partitionMapCond.signalAll() |
| } finally partitionMapLock.unlock() |
| } |
| |
| def delayPartitions(partitions: Iterable[TopicAndPartition], delay: Long) { |
| partitionMapLock.lockInterruptibly() |
| try { |
| for (partition <- partitions) { |
| partitionMap.get(partition).foreach (currentPartitionFetchState => |
| if (currentPartitionFetchState.isActive) |
| partitionMap.put(partition, new PartitionFetchState(currentPartitionFetchState.offset, new DelayedItem(delay))) |
| ) |
| } |
| partitionMapCond.signalAll() |
| } finally partitionMapLock.unlock() |
| } |
| |
| def removePartitions(topicAndPartitions: Set[TopicAndPartition]) { |
| partitionMapLock.lockInterruptibly() |
| try topicAndPartitions.foreach(partitionMap.remove) |
| finally partitionMapLock.unlock() |
| } |
| |
| def partitionCount() = { |
| partitionMapLock.lockInterruptibly() |
| try partitionMap.size |
| finally partitionMapLock.unlock() |
| } |
| |
| } |
| |
| object AbstractFetcherThread { |
| |
| trait FetchRequest { |
| def isEmpty: Boolean |
| def offset(topicAndPartition: TopicAndPartition): Long |
| } |
| |
| trait PartitionData { |
| def errorCode: Short |
| def exception: Option[Throwable] |
| def toByteBufferMessageSet: ByteBufferMessageSet |
| def highWatermark: Long |
| } |
| |
| } |
| |
| class FetcherLagMetrics(metricId: ClientIdTopicPartition) extends KafkaMetricsGroup { |
| private[this] val lagVal = new AtomicLong(-1L) |
| newGauge("ConsumerLag", |
| new Gauge[Long] { |
| def value = lagVal.get |
| }, |
| Map("clientId" -> metricId.clientId, |
| "topic" -> metricId.topic, |
| "partition" -> metricId.partitionId.toString) |
| ) |
| |
| def lag_=(newLag: Long) { |
| lagVal.set(newLag) |
| } |
| |
| def lag = lagVal.get |
| } |
| |
| class FetcherLagStats(metricId: ClientIdAndBroker) { |
| private val valueFactory = (k: ClientIdTopicPartition) => new FetcherLagMetrics(k) |
| val stats = new Pool[ClientIdTopicPartition, FetcherLagMetrics](Some(valueFactory)) |
| |
| def getFetcherLagStats(topic: String, partitionId: Int): FetcherLagMetrics = { |
| stats.getAndMaybePut(new ClientIdTopicPartition(metricId.clientId, topic, partitionId)) |
| } |
| } |
| |
| class FetcherStats(metricId: ClientIdAndBroker) extends KafkaMetricsGroup { |
| val tags = Map("clientId" -> metricId.clientId, |
| "brokerHost" -> metricId.brokerHost, |
| "brokerPort" -> metricId.brokerPort.toString) |
| |
| val requestRate = newMeter("RequestsPerSec", "requests", TimeUnit.SECONDS, tags) |
| |
| val byteRate = newMeter("BytesPerSec", "bytes", TimeUnit.SECONDS, tags) |
| } |
| |
| case class ClientIdTopicPartition(clientId: String, topic: String, partitionId: Int) { |
| override def toString = "%s-%s-%d".format(clientId, topic, partitionId) |
| } |
| |
| /** |
| * case class to keep partition offset and its state(active , inactive) |
| */ |
| case class PartitionFetchState(offset: Long, delay: DelayedItem) { |
| |
| def this(offset: Long) = this(offset, new DelayedItem(0)) |
| |
| def isActive: Boolean = { delay.getDelay(TimeUnit.MILLISECONDS) == 0 } |
| |
| override def toString = "%d-%b".format(offset, isActive) |
| } |