blob: 561e9908887934739672c945d75a4a838f2bdbfe [file] [log] [blame]
/*
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
*/
package org.apache.samza.system.kafka
import kafka.api._
import kafka.common.{NotLeaderForPartitionException, UnknownTopicOrPartitionException, ErrorMapping, TopicAndPartition}
import java.util.concurrent.{ConcurrentHashMap, CountDownLatch}
import scala.collection.JavaConversions._
import kafka.message.MessageSet
import grizzled.slf4j.Logging
import java.nio.channels.ClosedByInterruptException
import java.util.Map.Entry
import scala.collection.mutable
import kafka.consumer.ConsumerConfig
import org.apache.samza.util.ThreadNamePrefix.SAMZA_THREAD_NAME_PREFIX
import org.apache.samza.util.ExponentialSleepStrategy
/**
* Companion object for class JvmMetrics encapsulating various constants
*/
object BrokerProxy {
val BROKER_PROXY_THREAD_NAME_PREFIX = "BROKER-PROXY-"
}
/**
* A BrokerProxy consolidates Kafka fetches meant for a particular broker and retrieves them all at once, providing
* a way for consumers to retrieve those messages by topic and partition.
*/
class BrokerProxy(
val host: String,
val port: Int,
val system: String,
val clientID: String,
val metrics: KafkaSystemConsumerMetrics,
val messageSink: MessageSink,
val timeout: Int = ConsumerConfig.SocketTimeout,
val bufferSize: Int = ConsumerConfig.SocketBufferSize,
val fetchSize:Int = ConsumerConfig.FetchSize,
val consumerMinSize:Int = ConsumerConfig.MinFetchBytes,
val consumerMaxWait:Int = ConsumerConfig.MaxFetchWaitMs,
offsetGetter: GetOffset = new GetOffset("fail")) extends Toss with Logging {
/**
* How long should the fetcher thread sleep before checking if any TopicPartitions has been added to its purview
*/
val sleepMSWhileNoTopicPartitions = 100
/** What's the next offset for a particular partition? **/
val nextOffsets:mutable.ConcurrentMap[TopicAndPartition, Long] = new ConcurrentHashMap[TopicAndPartition, Long]()
/** Block on the first call to get message if the fetcher has not yet returned its initial results **/
// TODO: It should be sufficient to just use the count down latch and await on it for each of the calls, but
// VisualVM was showing the consumer thread spending all its time in the await method rather than returning
// immediately, even though the process was proceeding normally. Hence the extra boolean. Should be investigated.
val firstCallBarrier = new CountDownLatch(1)
var firstCall = true
var simpleConsumer = createSimpleConsumer()
metrics.registerBrokerProxy(host, port)
def createSimpleConsumer() = {
val hostString = "%s:%d" format (host, port)
info("Creating new SimpleConsumer for host %s for system %s" format (hostString, system))
val sc = new DefaultFetchSimpleConsumer(host, port, timeout, bufferSize, clientID, fetchSize, consumerMinSize, consumerMaxWait)
sc
}
def addTopicPartition(tp: TopicAndPartition, nextOffset: Option[String]) = {
debug("Adding new topic and partition %s to queue for %s" format (tp, host))
if (nextOffsets.containsKey(tp)) {
toss("Already consuming TopicPartition %s" format tp)
}
val offset = if (nextOffset.isDefined && offsetGetter.isValidOffset(simpleConsumer, tp, nextOffset.get)) {
nextOffset
.get
.toLong
} else {
warn("It appears that we received an invalid or empty offset %s for %s. Attempting to use Kafka's auto.offset.reset setting. This can result in data loss if processing continues." format (nextOffset, tp))
offsetGetter.getResetOffset(simpleConsumer, tp)
}
debug("Got offset %s for new topic and partition %s." format (offset, tp))
nextOffsets += tp -> offset
metrics.topicPartitions(host, port).set(nextOffsets.size)
}
def removeTopicPartition(tp: TopicAndPartition) = {
if (nextOffsets.containsKey(tp)) {
val offset = nextOffsets.remove(tp)
metrics.topicPartitions(host, port).set(nextOffsets.size)
debug("Removed %s" format tp)
offset
} else {
warn("Asked to remove topic and partition %s, but not in map (keys = %s)" format (tp, nextOffsets.keys.mkString(",")))
None
}
}
val thread = new Thread(new Runnable {
def run {
var reconnect = false
try {
(new ExponentialSleepStrategy).run(
loop => {
if (reconnect) {
metrics.reconnects(host, port).inc
simpleConsumer.close()
simpleConsumer = createSimpleConsumer()
}
while (!Thread.currentThread.isInterrupted) {
if (nextOffsets.size == 0) {
debug("No TopicPartitions to fetch. Sleeping.")
Thread.sleep(sleepMSWhileNoTopicPartitions)
} else {
fetchMessages
// If we got here, fetchMessages didn't throw an exception, i.e. it was successful.
// In that case, reset the loop delay, so that the next time an error occurs,
// we start with a short retry delay.
loop.reset
}
}
},
(exception, loop) => {
warn("Restarting consumer due to %s. Turn on debugging to get a full stack trace." format exception)
debug("Exception detail:", exception)
reconnect = true
})
} catch {
case e: InterruptedException => info("Got interrupt exception in broker proxy thread.")
case e: ClosedByInterruptException => info("Got closed by interrupt exception in broker proxy thread.")
}
if (Thread.currentThread.isInterrupted) info("Shutting down due to interrupt.")
}
}, "BrokerProxy thread pointed at %s:%d for client %s" format (host, port, clientID))
private def fetchMessages(): Unit = {
val topicAndPartitionsToFetch = nextOffsets.filterKeys(messageSink.needsMoreMessages(_)).toList
if (topicAndPartitionsToFetch.size > 0) {
metrics.brokerReads(host, port).inc
val response: FetchResponse = simpleConsumer.defaultFetch(topicAndPartitionsToFetch: _*)
firstCall = false
firstCallBarrier.countDown()
// Split response into errors and non errors, processing the errors first
val (nonErrorResponses, errorResponses) = response.data.entrySet().partition(_.getValue.error == ErrorMapping.NoError)
handleErrors(errorResponses, response)
nonErrorResponses.foreach { nonError => moveMessagesToTheirQueue(nonError.getKey, nonError.getValue) }
} else {
debug("No topic/partitions need to be fetched for %s:%s right now. Sleeping %sms." format (host, port, sleepMSWhileNoTopicPartitions))
metrics.brokerSkippedFetchRequests(host, port).inc
Thread.sleep(sleepMSWhileNoTopicPartitions)
}
}
def handleErrors(errorResponses: mutable.Set[Entry[TopicAndPartition, FetchResponsePartitionData]], response:FetchResponse) = {
// Need to be mindful of a tp that was removed by another thread
def abdicate(tp:TopicAndPartition) = removeTopicPartition(tp) match {
case Some(offset) => messageSink.abdicate(tp, offset)
case None => warn("Tried to abdicate for topic partition not in map. Removed in interim?")
}
// FetchResponse should really return Option and a list of the errors so we don't have to find them ourselves
case class Error(tp: TopicAndPartition, code: Short, exception: Throwable)
// Now subdivide the errors into three types: non-recoverable, not leader (== abdicate) and offset out of range (== get new offset)
// Convert FetchResponse into easier-to-work-with Errors
val errors = for (
error <- errorResponses;
errorCode <- Option(response.errorCode(error.getKey.topic, error.getKey.partition)); // Scala's being cranky about referring to error.getKey values...
exception <- Option(ErrorMapping.exceptionFor(errorCode))
) yield new Error(error.getKey, errorCode, exception)
val (notLeaders, otherErrors) = errors.partition(_.code == ErrorMapping.NotLeaderForPartitionCode)
val (offsetOutOfRangeErrors, remainingErrors) = otherErrors.partition(_.code == ErrorMapping.OffsetOutOfRangeCode)
// Can recover from two types of errors: not leader (go find the new leader) and offset out of range (go get the new offset)
// However, we want to bail as quickly as possible if there are non recoverable errors so that the state of the other
// topic-partitions remains the same. That way, when we've rebuilt the simple consumer, we can come around and
// handle the recoverable errors.
remainingErrors.foreach(e => {
warn("Got non-recoverable error codes during multifetch. Throwing an exception to trigger reconnect. Errors: %s" format remainingErrors.mkString(","))
ErrorMapping.maybeThrowException(e.code) })
notLeaders.foreach(e => abdicate(e.tp))
offsetOutOfRangeErrors.foreach(e => {
warn("Received OffsetOutOfRange exception for %s. Current offset = %s" format (e.tp, nextOffsets.getOrElse(e.tp, "not found in map, likely removed in the interim")))
try {
val newOffset = offsetGetter.getResetOffset(simpleConsumer, e.tp)
// Put the new offset into the map (if the tp still exists). Will catch it on the next go-around
nextOffsets.replace(e.tp, newOffset)
} catch {
// UnknownTopic or NotLeader are routine events and handled via abdication. All others, bail.
case _ @ (_:UnknownTopicOrPartitionException | _: NotLeaderForPartitionException) => warn("Received (UnknownTopicOr|NotLeaderFor)Partition exception. Abdicating")
abdicate(e.tp)
}
})
}
def moveMessagesToTheirQueue(tp: TopicAndPartition, data: FetchResponsePartitionData) = {
val messageSet: MessageSet = data.messages
var nextOffset = nextOffsets(tp)
messageSink.setIsAtHighWatermark(tp, data.hw == 0 || data.hw == nextOffset)
require(messageSet != null)
for (message <- messageSet.iterator) {
messageSink.addMessage(tp, message, data.hw) // TODO: Verify this is correct
nextOffset = message.nextOffset
val bytesSize = message.message.payloadSize + message.message.keySize
metrics.reads(tp).inc
metrics.bytesRead(tp).inc(bytesSize)
metrics.brokerBytesRead(host, port).inc(bytesSize)
metrics.offsets(tp).set(nextOffset)
}
nextOffsets.replace(tp, nextOffset) // use replace rather than put in case this tp was removed while we were fetching.
// Update high water mark
val hw = data.hw
if (hw >= 0) {
metrics.lag(tp).set(hw - nextOffset)
} else {
debug("Got a high water mark less than 0 (%d) for %s, so skipping." format (hw, tp))
}
}
override def toString() = "BrokerProxy for %s:%d" format (host, port)
def start {
info("Starting " + toString)
if (!thread.isAlive) {
thread.setDaemon(true)
thread.setName(SAMZA_THREAD_NAME_PREFIX + BrokerProxy.BROKER_PROXY_THREAD_NAME_PREFIX + thread.getName)
thread.start
} else {
debug("Tried to start an already started broker proxy (%s). Ignoring." format toString)
}
}
def stop {
info("Shutting down " + toString)
thread.interrupt
thread.join
}
}