blob: ee2d44b9ace2c24aae00b0ef054ea247e1c4dc20 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package kafka.producer
import async._
import java.util.Properties
import kafka.serializer.Encoder
import java.util.concurrent.{ConcurrentMap, ConcurrentHashMap}
import kafka.cluster.{Partition, Broker}
import kafka.api.ProducerRequest
import kafka.common.{UnavailableProducerException, InvalidConfigException}
import kafka.utils.{Utils, Logging}
import kafka.message.{NoCompressionCodec, ByteBufferMessageSet}
class ProducerPool[V](private val config: ProducerConfig,
private val serializer: Encoder[V],
private val syncProducers: ConcurrentMap[Int, SyncProducer],
private val asyncProducers: ConcurrentMap[Int, AsyncProducer[V]],
private val inputEventHandler: EventHandler[V] = null,
private val cbkHandler: CallbackHandler[V] = null) extends Logging {
private var eventHandler = inputEventHandler
if(eventHandler == null)
eventHandler = new DefaultEventHandler(config, cbkHandler)
if(serializer == null)
throw new InvalidConfigException("serializer passed in is null!")
private var sync: Boolean = true
config.producerType match {
case "sync" =>
case "async" => sync = false
case _ => throw new InvalidConfigException("Valid values for producer.type are sync/async")
}
def this(config: ProducerConfig, serializer: Encoder[V],
eventHandler: EventHandler[V], cbkHandler: CallbackHandler[V]) =
this(config, serializer,
new ConcurrentHashMap[Int, SyncProducer](),
new ConcurrentHashMap[Int, AsyncProducer[V]](),
eventHandler, cbkHandler)
def this(config: ProducerConfig, serializer: Encoder[V]) = this(config, serializer,
new ConcurrentHashMap[Int, SyncProducer](),
new ConcurrentHashMap[Int, AsyncProducer[V]](),
Utils.getObject(config.eventHandler),
Utils.getObject(config.cbkHandler))
/**
* add a new producer, either synchronous or asynchronous, connecting
* to the specified broker
* @param bid the id of the broker
* @param host the hostname of the broker
* @param port the port of the broker
*/
def addProducer(broker: Broker) {
val props = new Properties()
props.put("host", broker.host)
props.put("port", broker.port.toString)
props.putAll(config.props)
if(sync) {
val producer = new SyncProducer(new SyncProducerConfig(props))
info("Creating sync producer for broker id = " + broker.id + " at " + broker.host + ":" + broker.port)
syncProducers.put(broker.id, producer)
} else {
val producer = new AsyncProducer[V](new AsyncProducerConfig(props),
new SyncProducer(new SyncProducerConfig(props)),
serializer,
eventHandler, config.eventHandlerProps,
cbkHandler, config.cbkHandlerProps)
producer.start
info("Creating async producer for broker id = " + broker.id + " at " + broker.host + ":" + broker.port)
asyncProducers.put(broker.id, producer)
}
}
/**
* selects either a synchronous or an asynchronous producer, for
* the specified broker id and calls the send API on the selected
* producer to publish the data to the specified broker partition
* @param poolData the producer pool request object
*/
def send(poolData: ProducerPoolData[V]*) {
val distinctBrokers = poolData.map(pd => pd.getBidPid.brokerId).distinct
var remainingRequests = poolData.toSeq
distinctBrokers.foreach { bid =>
val requestsForThisBid = remainingRequests partition (_.getBidPid.brokerId == bid)
remainingRequests = requestsForThisBid._2
if(sync) {
val producerRequests = requestsForThisBid._1.map(req => new ProducerRequest(req.getTopic, req.getBidPid.partId,
new ByteBufferMessageSet(compressionCodec = config.compressionCodec,
messages = req.getData.map(d => serializer.toMessage(d)): _*)))
debug("Fetching sync producer for broker id: " + bid)
val producer = syncProducers.get(bid)
if(producer != null) {
if(producerRequests.size > 1)
producer.multiSend(producerRequests.toArray)
else
producer.send(topic = producerRequests(0).topic,
partition = producerRequests(0).partition,
messages = producerRequests(0).messages)
config.compressionCodec match {
case NoCompressionCodec => debug("Sending message to broker " + bid)
case _ => debug("Sending compressed messages to broker " + bid)
}
}else
throw new UnavailableProducerException("Producer pool has not been initialized correctly. " +
"Sync Producer for broker " + bid + " does not exist in the pool")
}else {
debug("Fetching async producer for broker id: " + bid)
val producer = asyncProducers.get(bid)
if(producer != null) {
requestsForThisBid._1.foreach { req =>
req.getData.foreach(d => producer.send(req.getTopic, d, req.getBidPid.partId))
}
if(logger.isDebugEnabled)
config.compressionCodec match {
case NoCompressionCodec => debug("Sending message")
case _ => debug("Sending compressed messages")
}
}
else
throw new UnavailableProducerException("Producer pool has not been initialized correctly. " +
"Async Producer for broker " + bid + " does not exist in the pool")
}
}
}
/**
* Closes all the producers in the pool
*/
def close() = {
config.producerType match {
case "sync" =>
info("Closing all sync producers")
val iter = syncProducers.values.iterator
while(iter.hasNext)
iter.next.close
case "async" =>
info("Closing all async producers")
val iter = asyncProducers.values.iterator
while(iter.hasNext)
iter.next.close
}
}
/**
* This constructs and returns the request object for the producer pool
* @param topic the topic to which the data should be published
* @param bidPid the broker id and partition id
* @param data the data to be published
*/
def getProducerPoolData(topic: String, bidPid: Partition, data: Seq[V]): ProducerPoolData[V] = {
new ProducerPoolData[V](topic, bidPid, data)
}
class ProducerPoolData[V](topic: String,
bidPid: Partition,
data: Seq[V]) {
def getTopic: String = topic
def getBidPid: Partition = bidPid
def getData: Seq[V] = data
}
}