package kafka.perf
import java.util.concurrent.{CountDownLatch, Executors}
import java.util.concurrent.atomic.AtomicLong
import kafka.producer._
import org.apache.log4j.Logger
import kafka.message.{CompressionCodec, Message}
import java.text.SimpleDateFormat
import java.util.{Random, Properties}
import kafka.utils.Logging
* Load test for the producer
object ProducerPerformance extends Logging {
def main(args: Array[String]) {
val logger = Logger.getLogger(getClass)
val config = new ProducerPerfConfig(args)
if(!config.isFixSize)"WARN: Throughput will be slower due to changing message size per request")
val totalBytesSent = new AtomicLong(0)
val totalMessagesSent = new AtomicLong(0)
val executor = Executors.newFixedThreadPool(config.numThreads)
val allDone = new CountDownLatch(config.numThreads)
val startMs = System.currentTimeMillis
val rand = new java.util.Random
if(!config.hideHeader) {
println("start.time, end.time, compression, message.size, batch.size,, MB.sec, " +
", nMsg.sec")
println("time, compression,, message.size, batch.size,, MB.sec, " +
", nMsg.sec")
for(i <- 0 until config.numThreads) {
executor.execute(new ProducerThread(i, config, totalBytesSent, totalMessagesSent, allDone, rand))
val endMs = System.currentTimeMillis
val elapsedSecs = (endMs - startMs) / 1000.0
if(!config.showDetailedStats) {
val totalMBSent = (totalBytesSent.get * 1.0)/ (1024 * 1024)
println(("%s, %s, %d, %d, %d, %.2f, %.4f, %d, %.4f").format(config.dateFormat.format(startMs),
config.dateFormat.format(endMs), config.compressionCodec.codec, config.messageSize, config.batchSize,
totalMBSent, totalMBSent/elapsedSecs, totalMessagesSent.get, totalMessagesSent.get/elapsedSecs))
class ProducerPerfConfig(args: Array[String]) extends PerfConfig(args) {
val brokerInfoOpt = parser.accepts("brokerinfo", "REQUIRED: broker info (either from zookeeper or a list.")
.describedAs("broker.list=brokerid:hostname:port or zk.connect=host:port")
val messageSizeOpt = parser.accepts("message-size", "The size of each message.")
val varyMessageSizeOpt = parser.accepts("vary-message-size", "If set, message size will vary up to the given maximum.")
val asyncOpt = parser.accepts("async", "If set, messages are sent asynchronously.")
val batchSizeOpt = parser.accepts("batch-size", "Number of messages to send in a single batch.")
val numThreadsOpt = parser.accepts("threads", "Number of sending threads.")
val compressionCodecOption = parser.accepts("compression-codec", "If set, messages are sent compressed")
.describedAs("compression codec ")
val initialMessageIdOpt = parser.accepts("initial-message-id", "The is used for generating test data, If set, messages will be tagged with an " +
"ID and sent by producer starting from this ID sequentially. Message content will be String type and " +
"in the form of 'Message:000...1:xxx...'")
.describedAs("initial message id")
val options = parser.parse(args : _*)
for(arg <- List(topicOpt, brokerInfoOpt, numMessagesOpt)) {
if(!options.has(arg)) {
System.err.println("Missing required argument \"" + arg + "\"")
val topic = options.valueOf(topicOpt)
val numMessages = options.valueOf(numMessagesOpt).longValue
val reportingInterval = options.valueOf(reportingIntervalOpt).intValue
val showDetailedStats = options.has(showDetailedStatsOpt)
val dateFormat = new SimpleDateFormat(options.valueOf(dateFormatOpt))
val hideHeader = options.has(hideHeaderOpt)
val brokerInfo = options.valueOf(brokerInfoOpt)
val messageSize = options.valueOf(messageSizeOpt).intValue
val isFixSize = !options.has(varyMessageSizeOpt)
val isAsync = options.has(asyncOpt)
var batchSize = options.valueOf(batchSizeOpt).intValue
val numThreads = options.valueOf(numThreadsOpt).intValue
val compressionCodec = CompressionCodec.getCompressionCodec(options.valueOf(compressionCodecOption).intValue)
var initialMessageId = options.valueOf(initialMessageIdOpt).intValue()
private def getStringOfLength(len: Int) : String = {
val strArray = new Array[Char](len)
for (i <- 0 until len)
strArray(i) = 'x'
return new String(strArray)
private def getByteArrayOfLength(len: Int): Array[Byte] = {
//new Array[Byte](len)
new Array[Byte]( if (len == 0) 5 else len )
class ProducerThread(val threadId: Int,
val config: ProducerPerfConfig,
val totalBytesSent: AtomicLong,
val totalMessagesSent: AtomicLong,
val allDone: CountDownLatch,
val rand: Random) extends Runnable {
val props = new Properties()
val brokerInfoList = config.brokerInfo.split("=")
if (brokerInfoList(0) == "zk.connect") {
props.put("zk.connect", brokerInfoList(1))
props.put("", "300000")
props.put("broker.list", brokerInfoList(1))
props.put("compression.codec", config.compressionCodec.codec.toString)
props.put("reconnect.interval", Integer.MAX_VALUE.toString)
props.put("buffer.size", (64*1024).toString)
if(config.isAsync) {
props.put("batch.size", config.batchSize.toString)
props.put("", "-1")
val producerConfig = new ProducerConfig(props)
val producer = new Producer[Message, Message](producerConfig)
val seqIdNumDigit = 10 // no. of digits for max int value
// generate the sequential message ID
private val SEP = ":" // message field separator
private val messageIdLabel = "MessageID"
private val threadIdLabel = "ThreadID"
private val topicLabel = "Topic"
private var leftPaddedSeqId : String = ""
private def generateMessageWithSeqId(topic: String, msgId: Long, msgSize: Int): Array[Byte] = {
// Each thread gets a unique range of sequential no. for its ids.
// Eg. 1000 msg in 10 threads => 100 msg per thread
// thread 0 IDs : 0 ~ 99
// thread 1 IDs : 100 ~ 199
// thread 2 IDs : 200 ~ 299
// . . .
leftPaddedSeqId = String.format("%0"+seqIdNumDigit+"d", long2Long(msgId))
val msgHeader = topicLabel + SEP +
topic + SEP +
threadIdLabel + SEP +
threadId + SEP +
messageIdLabel + SEP +
leftPaddedSeqId + SEP
// pad the rest of the message with 'x'
val seqMsgString = String.format("%1$-"+msgSize+"s", msgHeader).replace(' ', 'x')
return seqMsgString.getBytes()
override def run {
var bytesSent = 0L
var lastBytesSent = 0L
var nSends = 0
var lastNSends = 0
val message = new Message(new Array[Byte](config.messageSize))
var reportTime = System.currentTimeMillis()
var lastReportTime = reportTime
val messagesPerThread = if(!config.isAsync) config.numMessages / config.numThreads / config.batchSize
else config.numMessages / config.numThreads
debug("Messages per thread = " + messagesPerThread)
var messageSet: List[Message] = Nil
if(config.isFixSize) {
for(k <- 0 until config.batchSize) {
messageSet ::= message
var j: Long = 0L
while(j < messagesPerThread) {
var strLength = config.messageSize
if (!config.isFixSize) {
for(k <- 0 until config.batchSize) {
strLength = rand.nextInt(config.messageSize)
val message = new Message(getByteArrayOfLength(strLength))
messageSet ::= message
bytesSent += message.payloadSize
}else if(!config.isAsync) {
bytesSent += config.batchSize*message.payloadSize
try {
if(!config.isAsync) {
producer.send(new ProducerData[Message,Message](config.topic, null, messageSet))
if(!config.isFixSize) messageSet = Nil
nSends += config.batchSize
}else {
if(!config.isFixSize) {
strLength = rand.nextInt(config.messageSize) + 1
var message : Message = null
if (config.initialMessageId > -1) {
val seqId = config.initialMessageId + (messagesPerThread * threadId) + j
message = new Message(generateMessageWithSeqId(config.topic, seqId, strLength))
else {
val messageBytes = getByteArrayOfLength(strLength)
message = new Message(messageBytes)
producer.send(new ProducerData[Message,Message](config.topic, message))
debug(config.topic + "-checksum:" + message.checksum)
bytesSent += message.payloadSize
}else {
producer.send(new ProducerData[Message,Message](config.topic, message))
debug(config.topic + "-checksum:" + message.checksum)
bytesSent += message.payloadSize
nSends += 1
}catch {
case e: Exception => e.printStackTrace
if(nSends % config.reportingInterval == 0) {
reportTime = System.currentTimeMillis()
val elapsed = (reportTime - lastReportTime)/ 1000.0
val mbBytesSent = ((bytesSent - lastBytesSent) * 1.0)/(1024 * 1024)
val numMessagesPerSec = (nSends - lastNSends) / elapsed
val mbPerSec = mbBytesSent / elapsed
val formattedReportTime = config.dateFormat.format(reportTime)
println(("%s, %d, %d, %d, %d, %.2f, %.4f, %d, %.4f").format(formattedReportTime, config.compressionCodec.codec,
threadId, config.messageSize, config.batchSize, (bytesSent*1.0)/(1024 * 1024), mbPerSec, nSends, numMessagesPerSec))
lastReportTime = reportTime
lastBytesSent = bytesSent
lastNSends = nSends
j += 1