blob: dafa6d2133bec72ae2183ca0c891db65154fcec7 [file] [log] [blame]
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* See the License for the specific language governing permissions and
* limitations under the License.
package kafka.producer
import async.{CallbackHandler, EventHandler}
import kafka.serializer.Encoder
import kafka.utils._
import java.util.Properties
import kafka.cluster.{Partition, Broker}
import java.util.concurrent.atomic.AtomicBoolean
import kafka.common.{NoBrokersForPartitionException, InvalidPartitionException}
import kafka.api.ProducerRequest
class Producer[K,V](config: ProducerConfig,
partitioner: Partitioner[K],
producerPool: ProducerPool[V],
populateProducerPool: Boolean,
private var brokerPartitionInfo: BrokerPartitionInfo) /* for testing purpose only. Applications should ideally */
/* use the other constructor*/
extends Logging {
private val hasShutdown = new AtomicBoolean(false)
private val random = new java.util.Random
// check if zookeeper based auto partition discovery is enabled
private val zkEnabled = Utils.propertyExists(config.zkConnect)
if(brokerPartitionInfo == null) {
zkEnabled match {
case true =>
val zkProps = new Properties()
zkProps.put("zk.connect", config.zkConnect)
zkProps.put("", config.zkSessionTimeoutMs.toString)
zkProps.put("", config.zkConnectionTimeoutMs.toString)
zkProps.put("", config.zkSyncTimeMs.toString)
brokerPartitionInfo = new ZKBrokerPartitionInfo(new ZKConfig(zkProps), producerCbk)
case false =>
brokerPartitionInfo = new ConfigBrokerPartitionInfo(config)
// pool of producers, one per broker
if(populateProducerPool) {
val allBrokers = brokerPartitionInfo.getAllBrokerInfo
allBrokers.foreach(b => producerPool.addProducer(new Broker(b._1,,, b._2.port)))
* This constructor can be used when all config parameters will be specified through the
* ProducerConfig object
* @param config Producer Configuration object
def this(config: ProducerConfig) = this(config, Utils.getObject(config.partitionerClass),
new ProducerPool[V](config, Utils.getObject(config.serializerClass)), true, null)
* This constructor can be used to provide pre-instantiated objects for all config parameters
* that would otherwise be instantiated via reflection. i.e. encoder, partitioner, event handler and
* callback handler. If you use this constructor, encoder, eventHandler, callback handler and partitioner
* will not be picked up from the config.
* @param config Producer Configuration object
* @param encoder Encoder used to convert an object of type V to a kafka.message.Message. If this is null it
* throws an InvalidConfigException
* @param eventHandler the class that implements kafka.producer.async.IEventHandler[T] used to
* dispatch a batch of produce requests, using an instance of kafka.producer.SyncProducer. If this is null, it
* uses the DefaultEventHandler
* @param cbkHandler the class that implements kafka.producer.async.CallbackHandler[T] used to inject
* callbacks at various stages of the kafka.producer.AsyncProducer pipeline. If this is null, the producer does
* not use the callback handler and hence does not invoke any callbacks
* @param partitioner class that implements the kafka.producer.Partitioner[K], used to supply a custom
* partitioning strategy on the message key (of type K) that is specified through the ProducerData[K, T]
* object in the send API. If this is null, producer uses DefaultPartitioner
def this(config: ProducerConfig,
encoder: Encoder[V],
eventHandler: EventHandler[V],
cbkHandler: CallbackHandler[V],
partitioner: Partitioner[K]) =
this(config, if(partitioner == null) new DefaultPartitioner[K] else partitioner,
new ProducerPool[V](config, encoder, eventHandler, cbkHandler), true, null)
* Sends the data, partitioned by key to the topic using either the
* synchronous or the asynchronous producer
* @param producerData the producer data object that encapsulates the topic, key and message data
def send(producerData: ProducerData[K,V]*) {
zkEnabled match {
case true => zkSend(producerData: _*)
case false => configSend(producerData: _*)
private def zkSend(producerData: ProducerData[K,V]*) {
val producerPoolRequests = { pd =>
var brokerIdPartition: Option[Partition] = None
var brokerInfoOpt: Option[Broker] = None
var numRetries: Int = 0
while(numRetries <= config.zkReadRetries && brokerInfoOpt.isEmpty) {
if(numRetries > 0) {
info("Try #" + numRetries + " ZK producer cache is stale. Refreshing it by reading from ZK again")
val topicPartitionsList = getPartitionListForTopic(pd)
val totalNumPartitions = topicPartitionsList.length
val partitionId = getPartition(pd.getKey, totalNumPartitions)
brokerIdPartition = Some(topicPartitionsList(partitionId))
brokerInfoOpt = brokerPartitionInfo.getBrokerInfo(brokerIdPartition.get.brokerId)
numRetries += 1
brokerInfoOpt match {
case Some(brokerInfo) =>
debug("Sending message to broker " + + ":" + brokerInfo.port +
" on partition " + brokerIdPartition.get.partId)
case None =>
throw new NoBrokersForPartitionException("Invalid Zookeeper state. Failed to get partition for topic: " +
pd.getTopic + " and key: " + pd.getKey)
new Partition(brokerIdPartition.get.brokerId, brokerIdPartition.get.partId),
producerPool.send(producerPoolRequests: _*)
private def configSend(producerData: ProducerData[K,V]*) {
val producerPoolRequests = { pd =>
// find the broker partitions registered for this topic
val topicPartitionsList = getPartitionListForTopic(pd)
val totalNumPartitions = topicPartitionsList.length
val randomBrokerId = random.nextInt(totalNumPartitions)
val brokerIdPartition = topicPartitionsList(randomBrokerId)
val brokerInfo = brokerPartitionInfo.getBrokerInfo(brokerIdPartition.brokerId).get
debug("Sending message to broker " + + ":" + brokerInfo.port +
" on a randomly chosen partition")
val partition = ProducerRequest.RandomPartition
debug("Sending message to broker " + + ":" + brokerInfo.port + " on a partition " +
new Partition(brokerIdPartition.brokerId, partition),
producerPool.send(producerPoolRequests: _*)
private def getPartitionListForTopic(pd: ProducerData[K,V]): Seq[Partition] = {
debug("Getting the number of broker partitions registered for topic: " + pd.getTopic)
val topicPartitionsList = brokerPartitionInfo.getBrokerPartitionInfo(pd.getTopic).toSeq
debug("Broker partitions registered for topic: " + pd.getTopic + " = " + topicPartitionsList)
val totalNumPartitions = topicPartitionsList.length
if(totalNumPartitions == 0) throw new NoBrokersForPartitionException("Partition = " + pd.getKey)
* Retrieves the partition id and throws an InvalidPartitionException if
* the value of partition is not between 0 and numPartitions-1
* @param key the partition key
* @param numPartitions the total number of available partitions
* @returns the partition id
private def getPartition(key: K, numPartitions: Int): Int = {
if(numPartitions <= 0)
throw new InvalidPartitionException("Invalid number of partitions: " + numPartitions +
"\n Valid values are > 0")
val partition = if(key == null) random.nextInt(numPartitions)
else partitioner.partition(key , numPartitions)
if(partition < 0 || partition >= numPartitions)
throw new InvalidPartitionException("Invalid partition id : " + partition +
"\n Valid values are in the range inclusive [0, " + (numPartitions-1) + "]")
* Callback to add a new producer to the producer pool. Used by ZKBrokerPartitionInfo
* on registration of new broker in zookeeper
* @param bid the id of the broker
* @param host the hostname of the broker
* @param port the port of the broker
private def producerCbk(bid: Int, host: String, port: Int) = {
if(populateProducerPool) producerPool.addProducer(new Broker(bid, host, host, port))
else debug("Skipping the callback since populateProducerPool = false")
* Close API to close the producer pool connections to all Kafka brokers. Also closes
* the zookeeper client connection if one exists
def close() = {
val canShutdown = hasShutdown.compareAndSet(false, true)
if(canShutdown) {