blob: 55cfc2d69fc0aeb1880cc0a2b85ee8a6c16e6059 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.samza.system.kafka
import kafka.common.TopicAndPartition
import grizzled.slf4j.Logging
import kafka.message.MessageAndOffset
import org.apache.samza.Partition
import kafka.utils.Utils
import org.apache.samza.util.Clock
import java.util.UUID
import kafka.serializer.DefaultDecoder
import kafka.serializer.Decoder
import org.apache.samza.util.BlockingEnvelopeMap
import org.apache.samza.system.SystemStreamPartition
import org.apache.samza.system.IncomingMessageEnvelope
import kafka.consumer.ConsumerConfig
import org.apache.samza.util.ExponentialSleepStrategy
import org.apache.samza.SamzaException
import org.apache.samza.util.TopicMetadataStore
import org.apache.samza.util.ExponentialSleepStrategy
import kafka.api.TopicMetadata
import org.apache.samza.util.ExponentialSleepStrategy
object KafkaSystemConsumer {
def toTopicAndPartition(systemStreamPartition: SystemStreamPartition) = {
val topic = systemStreamPartition.getStream
val partitionId = systemStreamPartition.getPartition.getPartitionId
TopicAndPartition(topic, partitionId)
}
}
/**
* Maintain a cache of BrokerProxies, returning the appropriate one for the
* requested topic and partition.
*/
private[kafka] class KafkaSystemConsumer(
systemName: String,
brokerListString: String,
metrics: KafkaSystemConsumerMetrics,
metadataStore: TopicMetadataStore,
clientId: String = "undefined-client-id-%s" format UUID.randomUUID.toString,
timeout: Int = ConsumerConfig.ConsumerTimeoutMs,
bufferSize: Int = ConsumerConfig.SocketBufferSize,
fetchSize: Int = ConsumerConfig.MaxFetchSize,
consumerMinSize: Int = ConsumerConfig.MinFetchBytes,
consumerMaxWait: Int = ConsumerConfig.MaxFetchWaitMs,
/**
* Defines a low water mark for how many messages we buffer before we start
* executing fetch requests against brokers to get more messages. This value
* is divided equally among all registered SystemStreamPartitions. For
* example, if fetchThreshold is set to 50000, and there are 50
* SystemStreamPartitions registered, then the per-partition threshold is
* 1000. As soon as a SystemStreamPartition's buffered message count drops
* below 1000, a fetch request will be executed to get more data for it.
*
* Increasing this parameter will decrease the latency between when a queue
* is drained of messages and when new messages are enqueued, but also leads
* to an increase in memory usage since more messages will be held in memory.
*/
fetchThreshold: Int = 50000,
offsetGetter: GetOffset = new GetOffset("fail"),
deserializer: Decoder[Object] = new DefaultDecoder().asInstanceOf[Decoder[Object]],
keyDeserializer: Decoder[Object] = new DefaultDecoder().asInstanceOf[Decoder[Object]],
retryBackoff: ExponentialSleepStrategy = new ExponentialSleepStrategy,
clock: () => Long = { System.currentTimeMillis }) extends BlockingEnvelopeMap(metrics.registry, new Clock {
def currentTimeMillis = clock()
}, classOf[KafkaSystemConsumerMetrics].getName) with Toss with Logging {
type HostPort = (String, Int)
val brokerProxies = scala.collection.mutable.Map[HostPort, BrokerProxy]()
var nextOffsets = Map[SystemStreamPartition, String]()
var perPartitionFetchThreshold = fetchThreshold
def start() {
if (nextOffsets.size > 0) {
perPartitionFetchThreshold = fetchThreshold / nextOffsets.size
}
val topicPartitionsAndOffsets = nextOffsets.map {
case (systemStreamPartition, offset) =>
val topicAndPartition = KafkaSystemConsumer.toTopicAndPartition(systemStreamPartition)
(topicAndPartition, offset)
}
refreshBrokers(topicPartitionsAndOffsets)
}
override def register(systemStreamPartition: SystemStreamPartition, offset: String) {
super.register(systemStreamPartition, offset)
nextOffsets += systemStreamPartition -> offset
metrics.registerTopicAndPartition(KafkaSystemConsumer.toTopicAndPartition(systemStreamPartition))
}
def stop() {
brokerProxies.values.foreach(_.stop)
}
protected def createBrokerProxy(host: String, port: Int): BrokerProxy = {
new BrokerProxy(host, port, systemName, clientId, metrics, sink, timeout, bufferSize, fetchSize, consumerMinSize, consumerMaxWait, offsetGetter)
}
protected def getHostPort(topicMetadata: TopicMetadata, partition: Int): Option[(String, Int)] = {
// Whatever we do, we can't say Broker, even though we're
// manipulating it here. Broker is a private type and Scala doesn't seem
// to care about that as long as you don't explicitly declare its type.
val brokerOption = topicMetadata
.partitionsMetadata
.find(_.partitionId == partition)
.flatMap(_.leader)
brokerOption match {
case Some(broker) => Some(broker.host, broker.port)
case _ => None
}
}
def refreshBrokers(topicPartitionsAndOffsets: Map[TopicAndPartition, String]) {
var tpToRefresh = topicPartitionsAndOffsets.keySet.toList
retryBackoff.run(
loop => {
val topics = tpToRefresh.map(_.topic).toSet
val topicMetadata = TopicMetadataCache.getTopicMetadata(topics, systemName, (topics: Set[String]) => metadataStore.getTopicInfo(topics))
// addTopicPartition one at a time, leaving the to-be-done list intact in case of exceptions.
// This avoids trying to re-add the same topic partition repeatedly
def refresh(tp: List[TopicAndPartition]) = {
val head :: rest = tpToRefresh
val nextOffset = topicPartitionsAndOffsets.get(head).get
getHostPort(topicMetadata(head.topic), head.partition) match {
case Some((host, port)) =>
val brokerProxy = brokerProxies.getOrElseUpdate((host, port), createBrokerProxy(host, port))
brokerProxy.addTopicPartition(head, Option(nextOffset))
brokerProxy.start
case None => warn("No such topic-partition: %s, dropping." format head)
}
rest
}
while (!tpToRefresh.isEmpty) {
tpToRefresh = refresh(tpToRefresh)
}
loop.done
},
(exception, loop) => {
warn("While refreshing brokers for %s: %s. Retrying." format (tpToRefresh.head, exception))
debug("Exception detail:", exception)
})
}
val sink = new MessageSink {
def setIsAtHighWatermark(tp: TopicAndPartition, isAtHighWatermark: Boolean) {
setIsAtHead(toSystemStreamPartition(tp), isAtHighWatermark)
}
def needsMoreMessages(tp: TopicAndPartition) = {
getNumMessagesInQueue(toSystemStreamPartition(tp)) <= perPartitionFetchThreshold
}
def addMessage(tp: TopicAndPartition, msg: MessageAndOffset, highWatermark: Long) = {
trace("Incoming message %s: %s." format (tp, msg))
val systemStreamPartition = toSystemStreamPartition(tp)
val isAtHead = highWatermark == msg.offset
val offset = msg.offset.toString
val key = if (msg.message.key != null) {
keyDeserializer.fromBytes(Utils.readBytes(msg.message.key))
} else {
null
}
val message = if (!msg.message.isNull) {
deserializer.fromBytes(Utils.readBytes(msg.message.payload))
} else {
null
}
put(systemStreamPartition, new IncomingMessageEnvelope(systemStreamPartition, offset, key, message))
setIsAtHead(systemStreamPartition, isAtHead)
}
def abdicate(tp: TopicAndPartition, nextOffset: Long) {
info("Abdicating for %s" format (tp))
refreshBrokers(Map(tp -> nextOffset.toString))
}
private def toSystemStreamPartition(tp: TopicAndPartition) = {
new SystemStreamPartition(systemName, tp.topic, new Partition(tp.partition))
}
}
}