kafka-882; Enhance 0.7 ProducerPerformance to send sequential MessageID as in 0.8; patched by John Fung; reviewed by Jun Rao
diff --git a/perf/src/main/scala/kafka/perf/ProducerPerformance.scala b/perf/src/main/scala/kafka/perf/ProducerPerformance.scala
index 5888f1e..a8bd47f 100644
--- a/perf/src/main/scala/kafka/perf/ProducerPerformance.scala
+++ b/perf/src/main/scala/kafka/perf/ProducerPerformance.scala
@@ -97,6 +97,13 @@
.describedAs("compression codec ")
.ofType(classOf[java.lang.Integer])
.defaultsTo(0)
+ val initialMessageIdOpt = parser.accepts("initial-message-id", "The is used for generating test data, If set, messages will be tagged with an " +
+ "ID and sent by producer starting from this ID sequentially. Message content will be String type and " +
+ "in the form of 'Message:000...1:xxx...'")
+ .withRequiredArg()
+ .describedAs("initial message id")
+ .ofType(classOf[java.lang.Integer])
+ .defaultsTo(-1)
val options = parser.parse(args : _*)
for(arg <- List(topicOpt, brokerInfoOpt, numMessagesOpt)) {
@@ -119,6 +126,7 @@
var batchSize = options.valueOf(batchSizeOpt).intValue
val numThreads = options.valueOf(numThreadsOpt).intValue
val compressionCodec = CompressionCodec.getCompressionCodec(options.valueOf(compressionCodecOption).intValue)
+ var initialMessageId = options.valueOf(initialMessageIdOpt).intValue()
}
private def getStringOfLength(len: Int) : String = {
@@ -157,6 +165,35 @@
}
val producerConfig = new ProducerConfig(props)
val producer = new Producer[Message, Message](producerConfig)
+ val seqIdNumDigit = 10 // no. of digits for max int value
+ // generate the sequential message ID
+ private val SEP = ":" // message field separator
+ private val messageIdLabel = "MessageID"
+ private val threadIdLabel = "ThreadID"
+ private val topicLabel = "Topic"
+ private var leftPaddedSeqId : String = ""
+
+ private def generateMessageWithSeqId(topic: String, msgId: Long, msgSize: Int): Array[Byte] = {
+ // Each thread gets a unique range of sequential no. for its ids.
+ // Eg. 1000 msg in 10 threads => 100 msg per thread
+ // thread 0 IDs : 0 ~ 99
+ // thread 1 IDs : 100 ~ 199
+ // thread 2 IDs : 200 ~ 299
+ // . . .
+ leftPaddedSeqId = String.format("%0"+seqIdNumDigit+"d", long2Long(msgId))
+
+ val msgHeader = topicLabel + SEP +
+ topic + SEP +
+ threadIdLabel + SEP +
+ threadId + SEP +
+ messageIdLabel + SEP +
+ leftPaddedSeqId + SEP
+
+ // pad the rest of the message with 'x'
+ val seqMsgString = String.format("%1$-"+msgSize+"s", msgHeader).replace(' ', 'x')
+ debug(seqMsgString)
+ return seqMsgString.getBytes()
+ }
override def run {
var bytesSent = 0L
@@ -195,10 +232,17 @@
nSends += config.batchSize
}else {
if(!config.isFixSize) {
- strLength = rand.nextInt(config.messageSize)
- val messageBytes = getByteArrayOfLength(strLength)
- rand.nextBytes(messageBytes)
- val message = new Message(messageBytes)
+ strLength = rand.nextInt(config.messageSize) + 1
+ var message : Message = null
+ if (config.initialMessageId > -1) {
+ val seqId = config.initialMessageId + (messagesPerThread * threadId) + j
+ message = new Message(generateMessageWithSeqId(config.topic, seqId, strLength))
+ }
+ else {
+ val messageBytes = getByteArrayOfLength(strLength)
+ rand.nextBytes(messageBytes)
+ message = new Message(messageBytes)
+ }
producer.send(new ProducerData[Message,Message](config.topic, message))
debug(config.topic + "-checksum:" + message.checksum)
bytesSent += message.payloadSize