blob: 8b688b9885f3a3334c568db6e690184731c04f28 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package kafka.server
import java.util.concurrent.locks.ReentrantLock
import kafka.cluster.BrokerEndPoint
import kafka.consumer.PartitionTopicInfo
import kafka.message.{MessageAndOffset, ByteBufferMessageSet}
import kafka.utils.{Pool, ShutdownableThread, DelayedItem}
import kafka.common.{KafkaException, ClientIdAndBroker, TopicAndPartition}
import kafka.metrics.KafkaMetricsGroup
import kafka.utils.CoreUtils.inLock
import org.apache.kafka.common.errors.CorruptRecordException
import org.apache.kafka.common.protocol.Errors
import AbstractFetcherThread._
import scala.collection.{mutable, Set, Map}
import java.util.concurrent.TimeUnit
import java.util.concurrent.atomic.AtomicLong
import com.yammer.metrics.core.Gauge
/**
* Abstract class for fetching data from multiple partitions from the same broker.
*/
abstract class AbstractFetcherThread(name: String,
clientId: String,
sourceBroker: BrokerEndPoint,
fetchBackOffMs: Int = 0,
isInterruptible: Boolean = true)
extends ShutdownableThread(name, isInterruptible) {
type REQ <: FetchRequest
type PD <: PartitionData
private val partitionMap = new mutable.HashMap[TopicAndPartition, PartitionFetchState] // a (topic, partition) -> partitionFetchState map
private val partitionMapLock = new ReentrantLock
private val partitionMapCond = partitionMapLock.newCondition()
private val metricId = new ClientIdAndBroker(clientId, sourceBroker.host, sourceBroker.port)
val fetcherStats = new FetcherStats(metricId)
val fetcherLagStats = new FetcherLagStats(metricId)
/* callbacks to be defined in subclass */
// process fetched data
def processPartitionData(topicAndPartition: TopicAndPartition, fetchOffset: Long, partitionData: PD)
// handle a partition whose offset is out of range and return a new fetch offset
def handleOffsetOutOfRange(topicAndPartition: TopicAndPartition): Long
// deal with partitions with errors, potentially due to leadership changes
def handlePartitionsWithErrors(partitions: Iterable[TopicAndPartition])
protected def buildFetchRequest(partitionMap: Map[TopicAndPartition, PartitionFetchState]): REQ
protected def fetch(fetchRequest: REQ): Map[TopicAndPartition, PD]
override def shutdown(){
initiateShutdown()
inLock(partitionMapLock) {
partitionMapCond.signalAll()
}
awaitShutdown()
}
override def doWork() {
val fetchRequest = inLock(partitionMapLock) {
val fetchRequest = buildFetchRequest(partitionMap)
if (fetchRequest.isEmpty) {
trace("There are no active partitions. Back off for %d ms before sending a fetch request".format(fetchBackOffMs))
partitionMapCond.await(fetchBackOffMs, TimeUnit.MILLISECONDS)
}
fetchRequest
}
if (!fetchRequest.isEmpty)
processFetchRequest(fetchRequest)
}
private def processFetchRequest(fetchRequest: REQ) {
val partitionsWithError = new mutable.HashSet[TopicAndPartition]
var responseData: Map[TopicAndPartition, PD] = Map.empty
try {
trace("Issuing to broker %d of fetch request %s".format(sourceBroker.id, fetchRequest))
responseData = fetch(fetchRequest)
} catch {
case t: Throwable =>
if (isRunning.get) {
warn(s"Error in fetch $fetchRequest", t)
inLock(partitionMapLock) {
partitionsWithError ++= partitionMap.keys
// there is an error occurred while fetching partitions, sleep a while
partitionMapCond.await(fetchBackOffMs, TimeUnit.MILLISECONDS)
}
}
}
fetcherStats.requestRate.mark()
if (responseData.nonEmpty) {
// process fetched data
inLock(partitionMapLock) {
responseData.foreach { case (topicAndPartition, partitionData) =>
val TopicAndPartition(topic, partitionId) = topicAndPartition
partitionMap.get(topicAndPartition).foreach(currentPartitionFetchState =>
// we append to the log if the current offset is defined and it is the same as the offset requested during fetch
if (fetchRequest.offset(topicAndPartition) == currentPartitionFetchState.offset) {
Errors.forCode(partitionData.errorCode) match {
case Errors.NONE =>
try {
val messages = partitionData.toByteBufferMessageSet
val validBytes = messages.validBytes
val newOffset = messages.shallowIterator.toSeq.lastOption match {
case Some(m: MessageAndOffset) => m.nextOffset
case None => currentPartitionFetchState.offset
}
partitionMap.put(topicAndPartition, new PartitionFetchState(newOffset))
fetcherLagStats.getFetcherLagStats(topic, partitionId).lag = Math.max(0L, partitionData.highWatermark - newOffset)
fetcherStats.byteRate.mark(validBytes)
// Once we hand off the partition data to the subclass, we can't mess with it any more in this thread
processPartitionData(topicAndPartition, currentPartitionFetchState.offset, partitionData)
} catch {
case ime: CorruptRecordException =>
// we log the error and continue. This ensures two things
// 1. If there is a corrupt message in a topic partition, it does not bring the fetcher thread down and cause other topic partition to also lag
// 2. If the message is corrupt due to a transient state in the log (truncation, partial writes can cause this), we simply continue and
// should get fixed in the subsequent fetches
logger.error("Found invalid messages during fetch for partition [" + topic + "," + partitionId + "] offset " + currentPartitionFetchState.offset + " error " + ime.getMessage)
case e: Throwable =>
throw new KafkaException("error processing data for partition [%s,%d] offset %d"
.format(topic, partitionId, currentPartitionFetchState.offset), e)
}
case Errors.OFFSET_OUT_OF_RANGE =>
try {
val newOffset = handleOffsetOutOfRange(topicAndPartition)
partitionMap.put(topicAndPartition, new PartitionFetchState(newOffset))
error("Current offset %d for partition [%s,%d] out of range; reset offset to %d"
.format(currentPartitionFetchState.offset, topic, partitionId, newOffset))
} catch {
case e: Throwable =>
error("Error getting offset for partition [%s,%d] to broker %d".format(topic, partitionId, sourceBroker.id), e)
partitionsWithError += topicAndPartition
}
case _ =>
if (isRunning.get) {
error("Error for partition [%s,%d] to broker %d:%s".format(topic, partitionId, sourceBroker.id,
partitionData.exception.get))
partitionsWithError += topicAndPartition
}
}
})
}
}
}
if (partitionsWithError.nonEmpty) {
debug("handling partitions with error for %s".format(partitionsWithError))
handlePartitionsWithErrors(partitionsWithError)
}
}
def addPartitions(partitionAndOffsets: Map[TopicAndPartition, Long]) {
partitionMapLock.lockInterruptibly()
try {
for ((topicAndPartition, offset) <- partitionAndOffsets) {
// If the partitionMap already has the topic/partition, then do not update the map with the old offset
if (!partitionMap.contains(topicAndPartition))
partitionMap.put(
topicAndPartition,
if (PartitionTopicInfo.isOffsetInvalid(offset)) new PartitionFetchState(handleOffsetOutOfRange(topicAndPartition))
else new PartitionFetchState(offset)
)}
partitionMapCond.signalAll()
} finally partitionMapLock.unlock()
}
def delayPartitions(partitions: Iterable[TopicAndPartition], delay: Long) {
partitionMapLock.lockInterruptibly()
try {
for (partition <- partitions) {
partitionMap.get(partition).foreach (currentPartitionFetchState =>
if (currentPartitionFetchState.isActive)
partitionMap.put(partition, new PartitionFetchState(currentPartitionFetchState.offset, new DelayedItem(delay)))
)
}
partitionMapCond.signalAll()
} finally partitionMapLock.unlock()
}
def removePartitions(topicAndPartitions: Set[TopicAndPartition]) {
partitionMapLock.lockInterruptibly()
try topicAndPartitions.foreach(partitionMap.remove)
finally partitionMapLock.unlock()
}
def partitionCount() = {
partitionMapLock.lockInterruptibly()
try partitionMap.size
finally partitionMapLock.unlock()
}
}
object AbstractFetcherThread {
trait FetchRequest {
def isEmpty: Boolean
def offset(topicAndPartition: TopicAndPartition): Long
}
trait PartitionData {
def errorCode: Short
def exception: Option[Throwable]
def toByteBufferMessageSet: ByteBufferMessageSet
def highWatermark: Long
}
}
class FetcherLagMetrics(metricId: ClientIdTopicPartition) extends KafkaMetricsGroup {
private[this] val lagVal = new AtomicLong(-1L)
newGauge("ConsumerLag",
new Gauge[Long] {
def value = lagVal.get
},
Map("clientId" -> metricId.clientId,
"topic" -> metricId.topic,
"partition" -> metricId.partitionId.toString)
)
def lag_=(newLag: Long) {
lagVal.set(newLag)
}
def lag = lagVal.get
}
class FetcherLagStats(metricId: ClientIdAndBroker) {
private val valueFactory = (k: ClientIdTopicPartition) => new FetcherLagMetrics(k)
val stats = new Pool[ClientIdTopicPartition, FetcherLagMetrics](Some(valueFactory))
def getFetcherLagStats(topic: String, partitionId: Int): FetcherLagMetrics = {
stats.getAndMaybePut(new ClientIdTopicPartition(metricId.clientId, topic, partitionId))
}
}
class FetcherStats(metricId: ClientIdAndBroker) extends KafkaMetricsGroup {
val tags = Map("clientId" -> metricId.clientId,
"brokerHost" -> metricId.brokerHost,
"brokerPort" -> metricId.brokerPort.toString)
val requestRate = newMeter("RequestsPerSec", "requests", TimeUnit.SECONDS, tags)
val byteRate = newMeter("BytesPerSec", "bytes", TimeUnit.SECONDS, tags)
}
case class ClientIdTopicPartition(clientId: String, topic: String, partitionId: Int) {
override def toString = "%s-%s-%d".format(clientId, topic, partitionId)
}
/**
* case class to keep partition offset and its state(active , inactive)
*/
case class PartitionFetchState(offset: Long, delay: DelayedItem) {
def this(offset: Long) = this(offset, new DelayedItem(0))
def isActive: Boolean = { delay.getDelay(TimeUnit.MILLISECONDS) == 0 }
override def toString = "%d-%b".format(offset, isActive)
}