blob: 414c9650d6c2249161b957a7257618baa69d6833 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package kafka.perf
import java.util.concurrent.CountDownLatch
import java.util.concurrent.atomic.AtomicLong
import java.nio.channels.ClosedByInterruptException
import org.apache.log4j.Logger
import kafka.message.Message
import kafka.utils.Utils
import java.util.{Random, Properties}
import kafka.consumer._
import java.text.SimpleDateFormat
/**
* Performance test for the full zookeeper consumer
*/
object ConsumerPerformance {
private val logger = Logger.getLogger(getClass())
def main(args: Array[String]): Unit = {
val config = new ConsumerPerfConfig(args)
logger.info("Starting consumer...")
var totalMessagesRead = new AtomicLong(0)
var totalBytesRead = new AtomicLong(0)
if(!config.hideHeader) {
if(!config.showDetailedStats)
println("start.time, end.time, fetch.size, data.consumed.in.MB, MB.sec, data.consumed.in.nMsg, nMsg.sec")
else
println("time, fetch.size, data.consumed.in.MB, MB.sec, data.consumed.in.nMsg, nMsg.sec")
}
// clean up zookeeper state for this group id for every perf run
Utils.tryCleanupZookeeper(config.consumerConfig.zkConnect, config.consumerConfig.groupId)
val consumerConnector: ConsumerConnector = Consumer.create(config.consumerConfig)
val topicMessageStreams = consumerConnector.createMessageStreams(Predef.Map(config.topic -> config.numThreads))
var threadList = List[ConsumerPerfThread]()
for ((topic, streamList) <- topicMessageStreams)
for (i <- 0 until streamList.length)
threadList ::= new ConsumerPerfThread(i, "kafka-zk-consumer-" + i, streamList(i), config,
totalMessagesRead, totalBytesRead)
logger.info("Sleeping for 1000 seconds.")
Thread.sleep(1000)
logger.info("starting threads")
val startMs = System.currentTimeMillis
for (thread <- threadList)
thread.start
for (thread <- threadList)
thread.shutdown
val endMs = System.currentTimeMillis
val elapsedSecs = (endMs - startMs - config.consumerConfig.consumerTimeoutMs) / 1000.0
if(!config.showDetailedStats) {
val totalMBRead = (totalBytesRead.get*1.0)/(1024*1024)
println(("%s, %s, %d, %.4f, %.4f, %d, %.4f").format(config.dateFormat.format(startMs), config.dateFormat.format(endMs),
config.consumerConfig.fetchSize, totalMBRead, totalMBRead/elapsedSecs, totalMessagesRead.get,
totalMessagesRead.get/elapsedSecs))
}
System.exit(0)
}
class ConsumerPerfConfig(args: Array[String]) extends PerfConfig(args) {
val zkConnectOpt = parser.accepts("zookeeper", "REQUIRED: The connection string for the zookeeper connection in the form host:port. " +
"Multiple URLS can be given to allow fail-over.")
.withRequiredArg
.describedAs("urls")
.ofType(classOf[String])
val groupIdOpt = parser.accepts("group", "The group id to consume on.")
.withRequiredArg
.describedAs("gid")
.defaultsTo("perf-consumer-" + new Random().nextInt(100000))
.ofType(classOf[String])
val fetchSizeOpt = parser.accepts("fetch-size", "The amount of data to fetch in a single request.")
.withRequiredArg
.describedAs("size")
.ofType(classOf[java.lang.Integer])
.defaultsTo(1024 * 1024)
val resetBeginningOffsetOpt = parser.accepts("from-latest", "If the consumer does not already have an established " +
"offset to consume from, start with the latest message present in the log rather than the earliest message.")
val socketBufferSizeOpt = parser.accepts("socket-buffer-size", "The size of the tcp RECV size.")
.withRequiredArg
.describedAs("size")
.ofType(classOf[java.lang.Integer])
.defaultsTo(2 * 1024 * 1024)
val numThreadsOpt = parser.accepts("threads", "Number of processing threads.")
.withRequiredArg
.describedAs("count")
.ofType(classOf[java.lang.Integer])
.defaultsTo(10)
val options = parser.parse(args : _*)
for(arg <- List(topicOpt, zkConnectOpt)) {
if(!options.has(arg)) {
System.err.println("Missing required argument \"" + arg + "\"")
parser.printHelpOn(System.err)
System.exit(1)
}
}
val props = new Properties
props.put("groupid", options.valueOf(groupIdOpt))
props.put("socket.buffer.size", options.valueOf(socketBufferSizeOpt).toString)
props.put("fetch.size", options.valueOf(fetchSizeOpt).toString)
props.put("autooffset.reset", if(options.has(resetBeginningOffsetOpt)) "largest" else "smallest")
props.put("zk.connect", options.valueOf(zkConnectOpt))
props.put("consumer.timeout.ms", "5000")
val consumerConfig = new ConsumerConfig(props)
val numThreads = options.valueOf(numThreadsOpt).intValue
val topic = options.valueOf(topicOpt)
val numMessages = options.valueOf(numMessagesOpt).longValue
val reportingInterval = options.valueOf(reportingIntervalOpt).intValue
val showDetailedStats = options.has(showDetailedStatsOpt)
val dateFormat = new SimpleDateFormat(options.valueOf(dateFormatOpt))
val hideHeader = options.has(hideHeaderOpt)
}
class ConsumerPerfThread(threadId: Int, name: String, stream: KafkaStream[Message],
config:ConsumerPerfConfig, totalMessagesRead: AtomicLong, totalBytesRead: AtomicLong)
extends Thread(name) {
private val shutdownLatch = new CountDownLatch(1)
def shutdown(): Unit = {
shutdownLatch.await
}
override def run() {
var bytesRead = 0L
var messagesRead = 0L
val startMs = System.currentTimeMillis
var lastReportTime: Long = startMs
var lastBytesRead = 0L
var lastMessagesRead = 0L
try {
for (messageAndMetadata <- stream if messagesRead < config.numMessages) {
messagesRead += 1
bytesRead += messageAndMetadata.message.payloadSize
if (messagesRead % config.reportingInterval == 0) {
if(config.showDetailedStats)
printMessage(threadId, bytesRead, lastBytesRead, messagesRead, lastMessagesRead, lastReportTime, System.currentTimeMillis)
lastReportTime = System.currentTimeMillis
lastMessagesRead = messagesRead
lastBytesRead = bytesRead
}
}
}
catch {
case _: InterruptedException =>
case _: ClosedByInterruptException =>
case _: ConsumerTimeoutException =>
case e => throw e
}
totalMessagesRead.addAndGet(messagesRead)
totalBytesRead.addAndGet(bytesRead)
if(config.showDetailedStats)
printMessage(threadId, bytesRead, lastBytesRead, messagesRead, lastMessagesRead, startMs, System.currentTimeMillis)
shutdownComplete
}
private def printMessage(id: Int, bytesRead: Long, lastBytesRead: Long, messagesRead: Long, lastMessagesRead: Long,
startMs: Long, endMs: Long) = {
val elapsedMs = endMs - startMs
val totalMBRead = (bytesRead*1.0)/(1024*1024)
val mbRead = ((bytesRead - lastBytesRead)*1.0)/(1024*1024)
println(("%s, %d, %d, %.4f, %.4f, %d, %.4f").format(config.dateFormat.format(endMs), id,
config.consumerConfig.fetchSize, totalMBRead,
1000.0*(mbRead/elapsedMs), messagesRead, ((messagesRead - lastMessagesRead)/elapsedMs)*1000.0))
}
private def shutdownComplete() = shutdownLatch.countDown
}
}