kafka-882; Enhance 0.7 ProducerPerformance to send sequential MessageID as in 0.8; patched by John Fung; reviewed by Jun Rao
diff --git a/perf/src/main/scala/kafka/perf/ProducerPerformance.scala b/perf/src/main/scala/kafka/perf/ProducerPerformance.scala
index 5888f1e..a8bd47f 100644
--- a/perf/src/main/scala/kafka/perf/ProducerPerformance.scala
+++ b/perf/src/main/scala/kafka/perf/ProducerPerformance.scala
@@ -97,6 +97,13 @@
       .describedAs("compression codec ")
       .ofType(classOf[java.lang.Integer])
       .defaultsTo(0)
+    val initialMessageIdOpt = parser.accepts("initial-message-id", "The is used for generating test data, If set, messages will be tagged with an " +
+      "ID and sent by producer starting from this ID sequentially. Message content will be String type and " +
+      "in the form of 'Message:000...1:xxx...'")
+      .withRequiredArg()
+      .describedAs("initial message id")
+      .ofType(classOf[java.lang.Integer])
+      .defaultsTo(-1)
 
     val options = parser.parse(args : _*)
     for(arg <- List(topicOpt, brokerInfoOpt, numMessagesOpt)) {
@@ -119,6 +126,7 @@
     var batchSize = options.valueOf(batchSizeOpt).intValue
     val numThreads = options.valueOf(numThreadsOpt).intValue
     val compressionCodec = CompressionCodec.getCompressionCodec(options.valueOf(compressionCodecOption).intValue)
+    var initialMessageId = options.valueOf(initialMessageIdOpt).intValue()
   }
 
   private def getStringOfLength(len: Int) : String = {
@@ -157,6 +165,35 @@
     }
     val producerConfig = new ProducerConfig(props)
     val producer = new Producer[Message, Message](producerConfig)
+    val seqIdNumDigit = 10  // no. of digits for max int value
+    // generate the sequential message ID
+    private val SEP            = ":"              // message field separator
+    private val messageIdLabel = "MessageID"
+    private val threadIdLabel  = "ThreadID"
+    private val topicLabel     = "Topic"
+    private var leftPaddedSeqId : String = ""
+
+    private def generateMessageWithSeqId(topic: String, msgId: Long, msgSize: Int): Array[Byte] = {
+      // Each thread gets a unique range of sequential no. for its ids.
+      // Eg. 1000 msg in 10 threads => 100 msg per thread
+      // thread 0 IDs :   0 ~  99
+      // thread 1 IDs : 100 ~ 199
+      // thread 2 IDs : 200 ~ 299
+      // . . .
+      leftPaddedSeqId = String.format("%0"+seqIdNumDigit+"d", long2Long(msgId))
+
+      val msgHeader = topicLabel      + SEP +
+              topic           + SEP +
+              threadIdLabel   + SEP +
+              threadId        + SEP +
+              messageIdLabel  + SEP +
+              leftPaddedSeqId + SEP
+
+      // pad the rest of the message with 'x'
+      val seqMsgString = String.format("%1$-"+msgSize+"s", msgHeader).replace(' ', 'x')
+      debug(seqMsgString)
+      return seqMsgString.getBytes()
+    }
 
     override def run {
       var bytesSent = 0L
@@ -195,10 +232,17 @@
             nSends += config.batchSize
           }else {
             if(!config.isFixSize) {
-              strLength = rand.nextInt(config.messageSize)
-              val messageBytes = getByteArrayOfLength(strLength)
-              rand.nextBytes(messageBytes)
-              val message = new Message(messageBytes)
+              strLength = rand.nextInt(config.messageSize) + 1
+              var message : Message = null
+              if (config.initialMessageId > -1) {
+                val seqId = config.initialMessageId + (messagesPerThread * threadId) + j
+                message = new Message(generateMessageWithSeqId(config.topic, seqId, strLength))
+              }
+              else {
+                val messageBytes = getByteArrayOfLength(strLength)
+                rand.nextBytes(messageBytes)
+                message = new Message(messageBytes)
+              }
               producer.send(new ProducerData[Message,Message](config.topic, message))
               debug(config.topic + "-checksum:" + message.checksum)
               bytesSent += message.payloadSize