blob: af496f7c547a5ac7a4096a6af325dad0d8feec6f [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package kafka.tools
import joptsimple.OptionParser
import java.util.Properties
import java.util.Random
import java.io._
import kafka.consumer._
import kafka.serializer._
import kafka.utils._
import kafka.log.FileMessageSet
import kafka.log.Log
import org.apache.kafka.clients.producer.{ProducerRecord, KafkaProducer, ProducerConfig}
/**
* This is a torture test that runs against an existing broker. Here is how it works:
*
* It produces a series of specially formatted messages to one or more partitions. Each message it produces
* it logs out to a text file. The messages have a limited set of keys, so there is duplication in the key space.
*
* The broker will clean its log as the test runs.
*
* When the specified number of messages have been produced we create a consumer and consume all the messages in the topic
* and write that out to another text file.
*
* Using a stable unix sort we sort both the producer log of what was sent and the consumer log of what was retrieved by the message key.
* Then we compare the final message in both logs for each key. If this final message is not the same for all keys we
* print an error and exit with exit code 1, otherwise we print the size reduction and exit with exit code 0.
*/
object TestLogCleaning {
def main(args: Array[String]) {
val parser = new OptionParser
val numMessagesOpt = parser.accepts("messages", "The number of messages to send or consume.")
.withRequiredArg
.describedAs("count")
.ofType(classOf[java.lang.Long])
.defaultsTo(Long.MaxValue)
val numDupsOpt = parser.accepts("duplicates", "The number of duplicates for each key.")
.withRequiredArg
.describedAs("count")
.ofType(classOf[java.lang.Integer])
.defaultsTo(5)
val brokerOpt = parser.accepts("broker", "Url to connect to.")
.withRequiredArg
.describedAs("url")
.ofType(classOf[String])
val topicsOpt = parser.accepts("topics", "The number of topics to test.")
.withRequiredArg
.describedAs("count")
.ofType(classOf[java.lang.Integer])
.defaultsTo(1)
val percentDeletesOpt = parser.accepts("percent-deletes", "The percentage of updates that are deletes.")
.withRequiredArg
.describedAs("percent")
.ofType(classOf[java.lang.Integer])
.defaultsTo(0)
val zkConnectOpt = parser.accepts("zk", "Zk url.")
.withRequiredArg
.describedAs("url")
.ofType(classOf[String])
val sleepSecsOpt = parser.accepts("sleep", "Time to sleep between production and consumption.")
.withRequiredArg
.describedAs("ms")
.ofType(classOf[java.lang.Integer])
.defaultsTo(0)
val dumpOpt = parser.accepts("dump", "Dump the message contents of a topic partition that contains test data from this test to standard out.")
.withRequiredArg
.describedAs("directory")
.ofType(classOf[String])
val options = parser.parse(args:_*)
if(args.length == 0)
CommandLineUtils.printUsageAndDie(parser, "An integration test for log cleaning.")
if(options.has(dumpOpt)) {
dumpLog(new File(options.valueOf(dumpOpt)))
System.exit(0)
}
CommandLineUtils.checkRequiredArgs(parser, options, brokerOpt, zkConnectOpt, numMessagesOpt)
// parse options
val messages = options.valueOf(numMessagesOpt).longValue
val percentDeletes = options.valueOf(percentDeletesOpt).intValue
val dups = options.valueOf(numDupsOpt).intValue
val brokerUrl = options.valueOf(brokerOpt)
val topicCount = options.valueOf(topicsOpt).intValue
val zkUrl = options.valueOf(zkConnectOpt)
val sleepSecs = options.valueOf(sleepSecsOpt).intValue
val testId = new Random().nextInt(Int.MaxValue)
val topics = (0 until topicCount).map("log-cleaner-test-" + testId + "-" + _).toArray
println("Producing %d messages...".format(messages))
val producedDataFile = produceMessages(brokerUrl, topics, messages, dups, percentDeletes)
println("Sleeping for %d seconds...".format(sleepSecs))
Thread.sleep(sleepSecs * 1000)
println("Consuming messages...")
val consumedDataFile = consumeMessages(zkUrl, topics)
val producedLines = lineCount(producedDataFile)
val consumedLines = lineCount(consumedDataFile)
val reduction = 1.0 - consumedLines.toDouble/producedLines.toDouble
println("%d rows of data produced, %d rows of data consumed (%.1f%% reduction).".format(producedLines, consumedLines, 100 * reduction))
println("De-duplicating and validating output files...")
validateOutput(producedDataFile, consumedDataFile)
producedDataFile.delete()
consumedDataFile.delete()
}
def dumpLog(dir: File) {
require(dir.exists, "Non-existent directory: " + dir.getAbsolutePath)
for(file <- dir.list.sorted; if file.endsWith(Log.LogFileSuffix)) {
val ms = new FileMessageSet(new File(dir, file))
for(entry <- ms) {
val key = Utils.readString(entry.message.key)
val content =
if(entry.message.isNull)
null
else
Utils.readString(entry.message.payload)
println("offset = %s, key = %s, content = %s".format(entry.offset, key, content))
}
}
}
def lineCount(file: File): Int = io.Source.fromFile(file).getLines.size
def validateOutput(producedDataFile: File, consumedDataFile: File) {
val producedReader = externalSort(producedDataFile)
val consumedReader = externalSort(consumedDataFile)
val produced = valuesIterator(producedReader)
val consumed = valuesIterator(consumedReader)
val producedDedupedFile = new File(producedDataFile.getAbsolutePath + ".deduped")
val producedDeduped = new BufferedWriter(new FileWriter(producedDedupedFile), 1024*1024)
val consumedDedupedFile = new File(consumedDataFile.getAbsolutePath + ".deduped")
val consumedDeduped = new BufferedWriter(new FileWriter(consumedDedupedFile), 1024*1024)
var total = 0
var mismatched = 0
while(produced.hasNext && consumed.hasNext) {
val p = produced.next()
producedDeduped.write(p.toString)
producedDeduped.newLine()
val c = consumed.next()
consumedDeduped.write(c.toString)
consumedDeduped.newLine()
if(p != c)
mismatched += 1
total += 1
}
producedDeduped.close()
consumedDeduped.close()
require(!produced.hasNext, "Additional values produced not found in consumer log.")
require(!consumed.hasNext, "Additional values consumed not found in producer log.")
println("Validated " + total + " values, " + mismatched + " mismatches.")
require(mismatched == 0, "Non-zero number of row mismatches.")
// if all the checks worked out we can delete the deduped files
producedDedupedFile.delete()
consumedDedupedFile.delete()
}
def valuesIterator(reader: BufferedReader) = {
new IteratorTemplate[TestRecord] {
def makeNext(): TestRecord = {
var next = readNext(reader)
while(next != null && next.delete)
next = readNext(reader)
if(next == null)
allDone()
else
next
}
}
}
def readNext(reader: BufferedReader): TestRecord = {
var line = reader.readLine()
if(line == null)
return null
var curr = new TestRecord(line)
while(true) {
line = peekLine(reader)
if(line == null)
return curr
val next = new TestRecord(line)
if(next == null || next.topicAndKey != curr.topicAndKey)
return curr
curr = next
reader.readLine()
}
null
}
def peekLine(reader: BufferedReader) = {
reader.mark(4096)
val line = reader.readLine
reader.reset()
line
}
def externalSort(file: File): BufferedReader = {
val builder = new ProcessBuilder("sort", "--key=1,2", "--stable", "--buffer-size=20%", "--temporary-directory=" + System.getProperty("java.io.tmpdir"), file.getAbsolutePath)
val process = builder.start()
new Thread() {
override def run() {
val exitCode = process.waitFor()
if(exitCode != 0) {
System.err.println("Process exited abnormally.")
while(process.getErrorStream.available > 0) {
System.err.write(process.getErrorStream().read())
}
}
}
}.start()
new BufferedReader(new InputStreamReader(process.getInputStream()), 10*1024*1024)
}
def produceMessages(brokerUrl: String,
topics: Array[String],
messages: Long,
dups: Int,
percentDeletes: Int): File = {
val producerProps = new Properties
producerProps.setProperty(ProducerConfig.BLOCK_ON_BUFFER_FULL_CONFIG, "true")
producerProps.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerUrl)
producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer")
producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer")
val producer = new KafkaProducer[Array[Byte],Array[Byte]](producerProps)
val rand = new Random(1)
val keyCount = (messages / dups).toInt
val producedFile = File.createTempFile("kafka-log-cleaner-produced-", ".txt")
println("Logging produce requests to " + producedFile.getAbsolutePath)
val producedWriter = new BufferedWriter(new FileWriter(producedFile), 1024*1024)
for(i <- 0L until (messages * topics.length)) {
val topic = topics((i % topics.length).toInt)
val key = rand.nextInt(keyCount)
val delete = i % 100 < percentDeletes
val msg =
if(delete)
new ProducerRecord[Array[Byte],Array[Byte]](topic, key.toString.getBytes(), null)
else
new ProducerRecord[Array[Byte],Array[Byte]](topic, key.toString.getBytes(), i.toString.getBytes())
producer.send(msg)
producedWriter.write(TestRecord(topic, key, i, delete).toString)
producedWriter.newLine()
}
producedWriter.close()
producer.close()
producedFile
}
def makeConsumer(zkUrl: String, topics: Array[String]): ZookeeperConsumerConnector = {
val consumerProps = new Properties
consumerProps.setProperty("group.id", "log-cleaner-test-" + new Random().nextInt(Int.MaxValue))
consumerProps.setProperty("zookeeper.connect", zkUrl)
consumerProps.setProperty("consumer.timeout.ms", (20*1000).toString)
consumerProps.setProperty("auto.offset.reset", "smallest")
new ZookeeperConsumerConnector(new ConsumerConfig(consumerProps))
}
def consumeMessages(zkUrl: String, topics: Array[String]): File = {
val connector = makeConsumer(zkUrl, topics)
val streams = connector.createMessageStreams(topics.map(topic => (topic, 1)).toMap, new StringDecoder, new StringDecoder)
val consumedFile = File.createTempFile("kafka-log-cleaner-consumed-", ".txt")
println("Logging consumed messages to " + consumedFile.getAbsolutePath)
val consumedWriter = new BufferedWriter(new FileWriter(consumedFile))
for(topic <- topics) {
val stream = streams(topic).head
try {
for(item <- stream) {
val delete = item.message == null
val value = if(delete) -1L else item.message.toLong
consumedWriter.write(TestRecord(topic, item.key.toInt, value, delete).toString)
consumedWriter.newLine()
}
} catch {
case e: ConsumerTimeoutException =>
}
}
consumedWriter.close()
connector.shutdown()
consumedFile
}
}
case class TestRecord(val topic: String, val key: Int, val value: Long, val delete: Boolean) {
def this(pieces: Array[String]) = this(pieces(0), pieces(1).toInt, pieces(2).toLong, pieces(3) == "d")
def this(line: String) = this(line.split("\t"))
override def toString() = topic + "\t" + key + "\t" + value + "\t" + (if(delete) "d" else "u")
def topicAndKey = topic + key
}