blob: bc25cd2f371c1b18aba4e7d760357e448acfd5c0 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package kafka.tools
import kafka.metrics.KafkaMetricsReporter
import kafka.producer.{OldProducer, NewShinyProducer}
import kafka.utils.{ToolsUtils, VerifiableProperties, Logging, CommandLineUtils}
import kafka.message.CompressionCodec
import kafka.serializer._
import java.util.concurrent.{CountDownLatch, Executors}
import java.util.concurrent.atomic.AtomicLong
import java.util._
import java.text.SimpleDateFormat
import java.math.BigInteger
import org.apache.log4j.Logger
/**
* Load test for the producer
*/
object ProducerPerformance extends Logging {
def main(args: Array[String]) {
val logger = Logger.getLogger(getClass)
val config = new ProducerPerfConfig(args)
if (!config.isFixedSize)
logger.info("WARN: Throughput will be slower due to changing message size per request")
val totalBytesSent = new AtomicLong(0)
val totalMessagesSent = new AtomicLong(0)
val executor = Executors.newFixedThreadPool(config.numThreads)
val allDone = new CountDownLatch(config.numThreads)
val startMs = System.currentTimeMillis
val rand = new java.util.Random
if (!config.hideHeader)
println("start.time, end.time, compression, message.size, batch.size, total.data.sent.in.MB, MB.sec, " +
"total.data.sent.in.nMsg, nMsg.sec")
for (i <- 0 until config.numThreads) {
executor.execute(new ProducerThread(i, config, totalBytesSent, totalMessagesSent, allDone, rand))
}
allDone.await()
val endMs = System.currentTimeMillis
val elapsedSecs = (endMs - startMs) / 1000.0
val totalMBSent = (totalBytesSent.get * 1.0) / (1024 * 1024)
println(("%s, %s, %d, %d, %d, %.2f, %.4f, %d, %.4f").format(
config.dateFormat.format(startMs), config.dateFormat.format(endMs),
config.compressionCodec.codec, config.messageSize, config.batchSize, totalMBSent,
totalMBSent / elapsedSecs, totalMessagesSent.get, totalMessagesSent.get / elapsedSecs))
System.exit(0)
}
class ProducerPerfConfig(args: Array[String]) extends PerfConfig(args) {
val brokerListOpt = parser.accepts("broker-list", "REQUIRED: broker info (the list of broker host and port for bootstrap.")
.withRequiredArg
.describedAs("hostname:port,..,hostname:port")
.ofType(classOf[String])
val topicsOpt = parser.accepts("topics", "REQUIRED: The comma separated list of topics to produce to")
.withRequiredArg
.describedAs("topic1,topic2..")
.ofType(classOf[String])
val producerRequestTimeoutMsOpt = parser.accepts("request-timeout-ms", "The produce request timeout in ms")
.withRequiredArg()
.ofType(classOf[java.lang.Integer])
.defaultsTo(3000)
val producerNumRetriesOpt = parser.accepts("producer-num-retries", "The producer retries number")
.withRequiredArg()
.ofType(classOf[java.lang.Integer])
.defaultsTo(3)
val producerRetryBackOffMsOpt = parser.accepts("producer-retry-backoff-ms", "The producer retry backoff time in milliseconds")
.withRequiredArg()
.ofType(classOf[java.lang.Integer])
.defaultsTo(100)
val producerRequestRequiredAcksOpt = parser.accepts("request-num-acks", "Number of acks required for producer request " +
"to complete")
.withRequiredArg()
.ofType(classOf[java.lang.Integer])
.defaultsTo(-1)
val varyMessageSizeOpt = parser.accepts("vary-message-size", "If set, message size will vary up to the given maximum.")
val syncOpt = parser.accepts("sync", "If set, messages are sent synchronously.")
val numThreadsOpt = parser.accepts("threads", "Number of sending threads.")
.withRequiredArg
.describedAs("number of threads")
.ofType(classOf[java.lang.Integer])
.defaultsTo(1)
val initialMessageIdOpt = parser.accepts("initial-message-id", "The is used for generating test data, If set, messages will be tagged with an " +
"ID and sent by producer starting from this ID sequentially. Message content will be String type and " +
"in the form of 'Message:000...1:xxx...'")
.withRequiredArg()
.describedAs("initial message id")
.ofType(classOf[java.lang.Integer])
val messageSendGapMsOpt = parser.accepts("message-send-gap-ms", "If set, the send thread will wait for specified time between two sends")
.withRequiredArg()
.describedAs("message send time gap")
.ofType(classOf[java.lang.Integer])
.defaultsTo(0)
val csvMetricsReporterEnabledOpt = parser.accepts("csv-reporter-enabled", "If set, the CSV metrics reporter will be enabled")
val metricsDirectoryOpt = parser.accepts("metrics-dir", "If csv-reporter-enable is set, and this parameter is" +
"set, the csv metrics will be outputed here")
.withRequiredArg
.describedAs("metrics dictory")
.ofType(classOf[java.lang.String])
val useNewProducerOpt = parser.accepts("new-producer", "Use the new producer implementation.")
val options = parser.parse(args: _*)
CommandLineUtils.checkRequiredArgs(parser, options, topicsOpt, brokerListOpt, numMessagesOpt)
val topicsStr = options.valueOf(topicsOpt)
val topics = topicsStr.split(",")
val numMessages = options.valueOf(numMessagesOpt).longValue
val reportingInterval = options.valueOf(reportingIntervalOpt).intValue
val dateFormat = new SimpleDateFormat(options.valueOf(dateFormatOpt))
val hideHeader = options.has(hideHeaderOpt)
val brokerList = options.valueOf(brokerListOpt)
ToolsUtils.validatePortOrDie(parser,brokerList)
val messageSize = options.valueOf(messageSizeOpt).intValue
var isFixedSize = !options.has(varyMessageSizeOpt)
var isSync = options.has(syncOpt)
var batchSize = options.valueOf(batchSizeOpt).intValue
var numThreads = options.valueOf(numThreadsOpt).intValue
val compressionCodec = CompressionCodec.getCompressionCodec(options.valueOf(compressionCodecOpt).intValue)
val seqIdMode = options.has(initialMessageIdOpt)
var initialMessageId: Int = 0
if (seqIdMode)
initialMessageId = options.valueOf(initialMessageIdOpt).intValue()
val producerRequestTimeoutMs = options.valueOf(producerRequestTimeoutMsOpt).intValue()
val producerRequestRequiredAcks = options.valueOf(producerRequestRequiredAcksOpt).intValue()
val producerNumRetries = options.valueOf(producerNumRetriesOpt).intValue()
val producerRetryBackoffMs = options.valueOf(producerRetryBackOffMsOpt).intValue()
val useNewProducer = options.has(useNewProducerOpt)
val csvMetricsReporterEnabled = options.has(csvMetricsReporterEnabledOpt)
if (csvMetricsReporterEnabled) {
val props = new Properties()
props.put("kafka.metrics.polling.interval.secs", "1")
props.put("kafka.metrics.reporters", "kafka.metrics.KafkaCSVMetricsReporter")
if (options.has(metricsDirectoryOpt))
props.put("kafka.csv.metrics.dir", options.valueOf(metricsDirectoryOpt))
else
props.put("kafka.csv.metrics.dir", "kafka_metrics")
props.put("kafka.csv.metrics.reporter.enabled", "true")
val verifiableProps = new VerifiableProperties(props)
KafkaMetricsReporter.startReporters(verifiableProps)
}
val messageSendGapMs = options.valueOf(messageSendGapMsOpt).intValue()
}
class ProducerThread(val threadId: Int,
val config: ProducerPerfConfig,
val totalBytesSent: AtomicLong,
val totalMessagesSent: AtomicLong,
val allDone: CountDownLatch,
val rand: Random) extends Runnable {
val seqIdNumDigit = 10 // no. of digits for max int value
val messagesPerThread = config.numMessages / config.numThreads
debug("Messages per thread = " + messagesPerThread)
val props = new Properties()
val producer =
if (config.useNewProducer) {
import org.apache.kafka.clients.producer.ProducerConfig
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, config.brokerList)
props.put(ProducerConfig.SEND_BUFFER_CONFIG, (64 * 1024).toString)
props.put(ProducerConfig.CLIENT_ID_CONFIG, "producer-performance")
props.put(ProducerConfig.ACKS_CONFIG, config.producerRequestRequiredAcks.toString)
props.put(ProducerConfig.TIMEOUT_CONFIG, config.producerRequestTimeoutMs.toString)
props.put(ProducerConfig.RETRIES_CONFIG, config.producerNumRetries.toString)
props.put(ProducerConfig.RETRY_BACKOFF_MS_CONFIG, config.producerRetryBackoffMs.toString)
props.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, config.compressionCodec.name)
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer")
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer")
new NewShinyProducer(props)
} else {
props.put("metadata.broker.list", config.brokerList)
props.put("compression.codec", config.compressionCodec.codec.toString)
props.put("send.buffer.bytes", (64 * 1024).toString)
if (!config.isSync) {
props.put("producer.type", "async")
props.put("batch.num.messages", config.batchSize.toString)
props.put("queue.enqueue.timeout.ms", "-1")
}
props.put("client.id", "producer-performance")
props.put("request.required.acks", config.producerRequestRequiredAcks.toString)
props.put("request.timeout.ms", config.producerRequestTimeoutMs.toString)
props.put("message.send.max.retries", config.producerNumRetries.toString)
props.put("retry.backoff.ms", config.producerRetryBackoffMs.toString)
props.put("serializer.class", classOf[DefaultEncoder].getName)
props.put("key.serializer.class", classOf[NullEncoder[Long]].getName)
new OldProducer(props)
}
// generate the sequential message ID
private val SEP = ":" // message field separator
private val messageIdLabel = "MessageID"
private val threadIdLabel = "ThreadID"
private val topicLabel = "Topic"
private var leftPaddedSeqId: String = ""
private def generateMessageWithSeqId(topic: String, msgId: Long, msgSize: Int): Array[Byte] = {
// Each thread gets a unique range of sequential no. for its ids.
// Eg. 1000 msg in 10 threads => 100 msg per thread
// thread 0 IDs : 0 ~ 99
// thread 1 IDs : 100 ~ 199
// thread 2 IDs : 200 ~ 299
// . . .
leftPaddedSeqId = String.format("%0" + seqIdNumDigit + "d", long2Long(msgId))
val msgHeader = topicLabel + SEP +
topic + SEP +
threadIdLabel + SEP +
threadId + SEP +
messageIdLabel + SEP +
leftPaddedSeqId + SEP
val seqMsgString = String.format("%1$-" + msgSize + "s", msgHeader).replace(' ', 'x')
debug(seqMsgString)
return seqMsgString.getBytes()
}
private def generateProducerData(topic: String, messageId: Long): Array[Byte] = {
val msgSize = if (config.isFixedSize) config.messageSize else 1 + rand.nextInt(config.messageSize)
if (config.seqIdMode) {
val seqId = config.initialMessageId + (messagesPerThread * threadId) + messageId
generateMessageWithSeqId(topic, seqId, msgSize)
} else {
new Array[Byte](msgSize)
}
}
override def run {
var bytesSent = 0L
var nSends = 0
var i: Long = 0L
var message: Array[Byte] = null
while (i < messagesPerThread) {
try {
config.topics.foreach(
topic => {
message = generateProducerData(topic, i)
producer.send(topic, BigInteger.valueOf(i).toByteArray, message)
bytesSent += message.size
nSends += 1
if (config.messageSendGapMs > 0)
Thread.sleep(config.messageSendGapMs)
})
} catch {
case e: Throwable => error("Error when sending message " + new String(message), e)
}
i += 1
}
try {
producer.close()
} catch {
case e: Throwable => error("Error when closing producer", e)
}
totalBytesSent.addAndGet(bytesSent)
totalMessagesSent.addAndGet(nSends)
allDone.countDown()
}
}
}