blob: b37375360bcad8e3333089d95e8e3da63a1b9aab [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.samza.system.kafka
import kafka.common.TopicAndPartition
import org.apache.samza.util.Logging
import kafka.message.Message
import kafka.message.MessageAndOffset
import org.apache.samza.Partition
import kafka.utils.Utils
import org.apache.samza.util.Clock
import java.util.UUID
import kafka.serializer.DefaultDecoder
import kafka.serializer.Decoder
import org.apache.samza.util.BlockingEnvelopeMap
import org.apache.samza.system.SystemStreamPartition
import org.apache.samza.system.IncomingMessageEnvelope
import kafka.consumer.ConsumerConfig
import org.apache.samza.util.TopicMetadataStore
import kafka.api.TopicMetadata
import org.apache.samza.util.ExponentialSleepStrategy
import java.util.concurrent.ConcurrentHashMap
import scala.collection.JavaConversions._
import org.apache.samza.system.SystemAdmin
object KafkaSystemConsumer {
// Approximate additional shallow heap overhead per message in addition to the raw bytes
// received from Kafka 4 + 64 + 4 + 4 + 4 = 80 bytes overhead.
// As this overhead is a moving target, and not very large
// compared to the message size its being ignore in the computation for now.
val MESSAGE_SIZE_OVERHEAD = 4 + 64 + 4 + 4 + 4;
def toTopicAndPartition(systemStreamPartition: SystemStreamPartition) = {
val topic = systemStreamPartition.getStream
val partitionId = systemStreamPartition.getPartition.getPartitionId
TopicAndPartition(topic, partitionId)
}
}
/**
* Maintain a cache of BrokerProxies, returning the appropriate one for the
* requested topic and partition.
*/
private[kafka] class KafkaSystemConsumer(
systemName: String,
systemAdmin: SystemAdmin,
metrics: KafkaSystemConsumerMetrics,
metadataStore: TopicMetadataStore,
clientId: String = "undefined-client-id-%s" format UUID.randomUUID.toString,
timeout: Int = ConsumerConfig.ConsumerTimeoutMs,
bufferSize: Int = ConsumerConfig.SocketBufferSize,
fetchSize: StreamFetchSizes = new StreamFetchSizes,
consumerMinSize: Int = ConsumerConfig.MinFetchBytes,
consumerMaxWait: Int = ConsumerConfig.MaxFetchWaitMs,
/**
* Defines a low water mark for how many messages we buffer before we start
* executing fetch requests against brokers to get more messages. This value
* is divided equally among all registered SystemStreamPartitions. For
* example, if fetchThreshold is set to 50000, and there are 50
* SystemStreamPartitions registered, then the per-partition threshold is
* 1000. As soon as a SystemStreamPartition's buffered message count drops
* below 1000, a fetch request will be executed to get more data for it.
*
* Increasing this parameter will decrease the latency between when a queue
* is drained of messages and when new messages are enqueued, but also leads
* to an increase in memory usage since more messages will be held in memory.
*/
fetchThreshold: Int = 50000,
/**
* Defines a low water mark for how many bytes we buffer before we start
* executing fetch requests against brokers to get more messages. This
* value is divided by 2 because the messages are buffered twice, once in
* KafkaConsumer and then in SystemConsumers. This value
* is divided equally among all registered SystemStreamPartitions.
* However this is a soft limit per partition, as the
* bytes are cached at the message boundaries, and the actual usage can be
* 1000 bytes + size of max message in the partition for a given stream.
* The bytes if the size of the bytebuffer in Message. Hence, the
* Object overhead is not taken into consideration. In this codebase
* it seems to be quite small. Hence, even for 500000 messages this is around 4MB x 2 = 8MB,
* which is not considerable.
*
* For example,
* if fetchThresholdBytes is set to 100000 bytes, and there are 50
* SystemStreamPartitions registered, then the per-partition threshold is
* (100000 / 2) / 50 = 1000 bytes.
* As this is a soft limit, the actual usage can be 1000 bytes + size of max message.
* As soon as a SystemStreamPartition's buffered messages bytes drops
* below 1000, a fetch request will be executed to get more data for it.
*
* Increasing this parameter will decrease the latency between when a queue
* is drained of messages and when new messages are enqueued, but also leads
* to an increase in memory usage since more messages will be held in memory.
*
* The default value is -1, which means this is not used. When the value
* is > 0, then the fetchThreshold which is count based is ignored.
*/
fetchThresholdBytes: Long = -1,
/**
* if(fetchThresholdBytes > 0) true else false
*/
fetchLimitByBytesEnabled: Boolean = false,
offsetGetter: GetOffset = new GetOffset("fail"),
deserializer: Decoder[Object] = new DefaultDecoder().asInstanceOf[Decoder[Object]],
keyDeserializer: Decoder[Object] = new DefaultDecoder().asInstanceOf[Decoder[Object]],
retryBackoff: ExponentialSleepStrategy = new ExponentialSleepStrategy,
clock: () => Long = { System.currentTimeMillis }) extends BlockingEnvelopeMap(
metrics.registry,
new Clock {
def currentTimeMillis = clock()
},
classOf[KafkaSystemConsumerMetrics].getName,
fetchLimitByBytesEnabled) with Toss with Logging {
type HostPort = (String, Int)
val brokerProxies = scala.collection.mutable.Map[HostPort, BrokerProxy]()
val topicPartitionsAndOffsets: scala.collection.concurrent.Map[TopicAndPartition, String] = new ConcurrentHashMap[TopicAndPartition, String]()
var perPartitionFetchThreshold = fetchThreshold
var perPartitionFetchThresholdBytes = 0L
def start() {
if (topicPartitionsAndOffsets.size > 0) {
perPartitionFetchThreshold = fetchThreshold / topicPartitionsAndOffsets.size
// messages get double buffered, hence divide by 2
if(fetchLimitByBytesEnabled) {
perPartitionFetchThresholdBytes = (fetchThresholdBytes / 2) / topicPartitionsAndOffsets.size
}
}
refreshBrokers
}
override def register(systemStreamPartition: SystemStreamPartition, offset: String) {
super.register(systemStreamPartition, offset)
val topicAndPartition = KafkaSystemConsumer.toTopicAndPartition(systemStreamPartition)
val existingOffset = topicPartitionsAndOffsets.getOrElseUpdate(topicAndPartition, offset)
// register the older offset in the consumer
if (systemAdmin.offsetComparator(existingOffset, offset) >= 0) {
topicPartitionsAndOffsets.replace(topicAndPartition, offset)
}
metrics.registerTopicAndPartition(KafkaSystemConsumer.toTopicAndPartition(systemStreamPartition))
}
def stop() {
brokerProxies.values.foreach(_.stop)
}
protected def createBrokerProxy(host: String, port: Int): BrokerProxy = {
new BrokerProxy(host, port, systemName, clientId, metrics, sink, timeout, bufferSize, fetchSize, consumerMinSize, consumerMaxWait, offsetGetter)
}
protected def getHostPort(topicMetadata: TopicMetadata, partition: Int): Option[(String, Int)] = {
// Whatever we do, we can't say Broker, even though we're
// manipulating it here. Broker is a private type and Scala doesn't seem
// to care about that as long as you don't explicitly declare its type.
val brokerOption = topicMetadata
.partitionsMetadata
.find(_.partitionId == partition)
.flatMap(_.leader)
brokerOption match {
case Some(broker) => Some(broker.host, broker.port)
case _ => None
}
}
def refreshBrokers {
var tpToRefresh = topicPartitionsAndOffsets.keySet.toList
info("Refreshing brokers for: %s" format topicPartitionsAndOffsets)
retryBackoff.run(
loop => {
val topics = tpToRefresh.map(_.topic).toSet
val topicMetadata = TopicMetadataCache.getTopicMetadata(topics, systemName, (topics: Set[String]) => metadataStore.getTopicInfo(topics))
// addTopicPartition one at a time, leaving the to-be-done list intact in case of exceptions.
// This avoids trying to re-add the same topic partition repeatedly
def refresh(tp: List[TopicAndPartition]) = {
val head :: rest = tpToRefresh
// refreshBrokers can be called from abdicate and refreshDropped,
// both of which are triggered from BrokerProxy threads. To prevent
// accidentally creating multiple objects for the same broker, or
// accidentally not updating the topicPartitionsAndOffsets variable,
// we need to lock.
this.synchronized {
// Check if we still need this TopicAndPartition inside the
// critical section. If we don't, then skip it.
topicPartitionsAndOffsets.get(head) match {
case Some(nextOffset) =>
getHostPort(topicMetadata(head.topic), head.partition) match {
case Some((host, port)) =>
val brokerProxy = brokerProxies.getOrElseUpdate((host, port), createBrokerProxy(host, port))
brokerProxy.addTopicPartition(head, Option(nextOffset))
brokerProxy.start
debug("Claimed topic-partition (%s) for (%s)".format(head, brokerProxy))
topicPartitionsAndOffsets -= head
case None => info("No metadata available for: %s. Will try to refresh and add to a consumer thread later." format head)
}
case _ => debug("Ignoring refresh for %s because we already added it from another thread." format head)
}
}
rest
}
while (!tpToRefresh.isEmpty) {
tpToRefresh = refresh(tpToRefresh)
}
loop.done
},
(exception, loop) => {
warn("While refreshing brokers for %s: %s. Retrying." format (tpToRefresh.head, exception))
debug("Exception detail:", exception)
})
}
val sink = new MessageSink {
var lastDroppedRefresh = clock()
def refreshDropped() {
if (topicPartitionsAndOffsets.size > 0 && clock() - lastDroppedRefresh > 10000) {
refreshBrokers
lastDroppedRefresh = clock()
}
}
def setIsAtHighWatermark(tp: TopicAndPartition, isAtHighWatermark: Boolean) {
setIsAtHead(toSystemStreamPartition(tp), isAtHighWatermark)
}
def needsMoreMessages(tp: TopicAndPartition) = {
if(fetchLimitByBytesEnabled) {
getMessagesSizeInQueue(toSystemStreamPartition(tp)) < perPartitionFetchThresholdBytes
} else {
getNumMessagesInQueue(toSystemStreamPartition(tp)) < perPartitionFetchThreshold
}
}
def getMessageSize(message: Message): Integer = {
message.size + KafkaSystemConsumer.MESSAGE_SIZE_OVERHEAD
}
def addMessage(tp: TopicAndPartition, msg: MessageAndOffset, highWatermark: Long) = {
trace("Incoming message %s: %s." format (tp, msg))
val systemStreamPartition = toSystemStreamPartition(tp)
val isAtHead = highWatermark == msg.offset
val offset = msg.offset.toString
val key = if (msg.message.key != null) {
keyDeserializer.fromBytes(Utils.readBytes(msg.message.key))
} else {
null
}
val message = if (!msg.message.isNull) {
deserializer.fromBytes(Utils.readBytes(msg.message.payload))
} else {
null
}
if(fetchLimitByBytesEnabled ) {
put(systemStreamPartition, new IncomingMessageEnvelope(systemStreamPartition, offset, key, message, getMessageSize(msg.message)))
} else {
put(systemStreamPartition, new IncomingMessageEnvelope(systemStreamPartition, offset, key, message))
}
setIsAtHead(systemStreamPartition, isAtHead)
}
def abdicate(tp: TopicAndPartition, nextOffset: Long) {
info("Abdicating for %s" format (tp))
topicPartitionsAndOffsets += tp -> nextOffset.toString
refreshBrokers
}
private def toSystemStreamPartition(tp: TopicAndPartition) = {
new SystemStreamPartition(systemName, tp.topic, new Partition(tp.partition))
}
}
}