blob: 584d4fb7ee7e5e9f119f87fd29fae9f8dc210666 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package kafka.tools
import java.util.{Arrays, Properties}
import org.apache.kafka.clients.consumer.{ConsumerConfig, KafkaConsumer}
import org.apache.kafka.clients.producer._
import org.apache.kafka.common.utils.Utils
import org.apache.kafka.common.TopicPartition
import scala.collection.JavaConversions._
/**
* This class records the average end to end latency for a single message to travel through Kafka
*
* broker_list = location of the bootstrap broker for both the producer and the consumer
* num_messages = # messages to send
* producer_acks = See ProducerConfig.ACKS_DOC
* message_size_bytes = size of each message in bytes
*
* e.g. [localhost:9092 test 10000 1 20]
*/
object EndToEndLatency {
private val timeout: Long = 60000
def main(args: Array[String]) {
if (args.length != 5 && args.length != 6) {
System.err.println("USAGE: java " + getClass.getName + " broker_list topic num_messages producer_acks message_size_bytes [optional] ssl_properties_file")
System.exit(1)
}
val brokerList = args(0)
val topic = args(1)
val numMessages = args(2).toInt
val producerAcks = args(3)
val messageLen = args(4).toInt
val sslPropsFile = if (args.length == 6) args(5) else ""
if (!List("1", "all").contains(producerAcks))
throw new IllegalArgumentException("Latency testing requires synchronous acknowledgement. Please use 1 or all")
val consumerProps = if (sslPropsFile.equals("")) new Properties() else Utils.loadProps(sslPropsFile)
consumerProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList)
consumerProps.put(ConsumerConfig.GROUP_ID_CONFIG, "test-group-" + System.currentTimeMillis())
consumerProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false")
consumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest")
consumerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArrayDeserializer")
consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArrayDeserializer")
consumerProps.put(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG, "0") //ensure we have no temporal batching
val consumer = new KafkaConsumer[Array[Byte], Array[Byte]](consumerProps)
consumer.subscribe(List(topic))
val producerProps = if (sslPropsFile.equals("")) new Properties() else Utils.loadProps(sslPropsFile)
producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList)
producerProps.put(ProducerConfig.LINGER_MS_CONFIG, "0") //ensure writes are synchronous
producerProps.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, Long.MaxValue.toString)
producerProps.put(ProducerConfig.ACKS_CONFIG, producerAcks.toString)
producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer")
producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer")
val producer = new KafkaProducer[Array[Byte], Array[Byte]](producerProps)
def finalise() {
consumer.commitSync()
producer.close()
consumer.close()
}
//Ensure we are at latest offset. seekToEnd evaluates lazily, that is to say actually performs the seek only when
//a poll() or position() request is issued. Hence we need to poll after we seek to ensure we see our first write.
consumer.seekToEnd(List[TopicPartition]())
consumer.poll(0)
var totalTime = 0.0
val latencies = new Array[Long](numMessages)
for (i <- 0 until numMessages) {
val message = randomBytesOfLen(messageLen)
val begin = System.nanoTime
//Send message (of random bytes) synchronously then immediately poll for it
producer.send(new ProducerRecord[Array[Byte], Array[Byte]](topic, message)).get()
val recordIter = consumer.poll(timeout).iterator
val elapsed = System.nanoTime - begin
//Check we got results
if (!recordIter.hasNext) {
finalise()
throw new RuntimeException(s"poll() timed out before finding a result (timeout:[$timeout])")
}
//Check result matches the original record
val sent = new String(message)
val read = new String(recordIter.next().value())
if (!read.equals(sent)) {
finalise()
throw new RuntimeException(s"The message read [$read] did not match the message sent [$sent]")
}
//Check we only got the one message
if (recordIter.hasNext) {
var count = 1
for (elem <- recordIter) count += 1
throw new RuntimeException(s"Only one result was expected during this test. We found [$count]")
}
//Report progress
if (i % 1000 == 0)
println(i + "\t" + elapsed / 1000.0 / 1000.0)
totalTime += elapsed
latencies(i) = elapsed / 1000 / 1000
}
//Results
println("Avg latency: %.4f ms\n".format(totalTime / numMessages / 1000.0 / 1000.0))
Arrays.sort(latencies)
val p50 = latencies((latencies.length * 0.5).toInt)
val p99 = latencies((latencies.length * 0.99).toInt)
val p999 = latencies((latencies.length * 0.999).toInt)
println("Percentiles: 50th = %d, 99th = %d, 99.9th = %d".format(p50, p99, p999))
finalise()
}
def randomBytesOfLen(len: Int): Array[Byte] = {
Array.fill(len)((scala.util.Random.nextInt(26) + 65).toByte)
}
}