blob: 049f12973b89e6f738518899434cc1225de6fd68 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package kafka.tools
import joptsimple.OptionParser
import java.util.concurrent.{Executors, CountDownLatch}
import java.util.Properties
import kafka.consumer._
import kafka.utils.{ToolsUtils, CommandLineUtils, Logging, ZkUtils}
import kafka.api.OffsetRequest
import org.apache.kafka.clients.producer.{ProducerRecord, KafkaProducer, ProducerConfig}
import scala.collection.JavaConverters._
object ReplayLogProducer extends Logging {
private val GroupId: String = "replay-log-producer"
def main(args: Array[String]) {
val config = new Config(args)
// if there is no group specified then avoid polluting zookeeper with persistent group data, this is a hack
ZkUtils.maybeDeletePath(config.zkConnect, "/consumers/" + GroupId)
Thread.sleep(500)
// consumer properties
val consumerProps = new Properties
consumerProps.put("group.id", GroupId)
consumerProps.put("zookeeper.connect", config.zkConnect)
consumerProps.put("consumer.timeout.ms", "10000")
consumerProps.put("auto.offset.reset", OffsetRequest.SmallestTimeString)
consumerProps.put("fetch.message.max.bytes", (1024*1024).toString)
consumerProps.put("socket.receive.buffer.bytes", (2 * 1024 * 1024).toString)
val consumerConfig = new ConsumerConfig(consumerProps)
val consumerConnector: ConsumerConnector = Consumer.create(consumerConfig)
val topicMessageStreams = consumerConnector.createMessageStreams(Predef.Map(config.inputTopic -> config.numThreads))
var threadList = List[ZKConsumerThread]()
for (streamList <- topicMessageStreams.values)
for (stream <- streamList)
threadList ::= new ZKConsumerThread(config, stream)
for (thread <- threadList)
thread.start
threadList.foreach(_.shutdown)
consumerConnector.shutdown
}
class Config(args: Array[String]) {
val parser = new OptionParser
val zkConnectOpt = parser.accepts("zookeeper", "REQUIRED: The connection string for the zookeeper connection in the form host:port. " +
"Multiple URLS can be given to allow fail-over.")
.withRequiredArg
.describedAs("zookeeper url")
.ofType(classOf[String])
.defaultsTo("127.0.0.1:2181")
val brokerListOpt = parser.accepts("broker-list", "REQUIRED: the broker list must be specified.")
.withRequiredArg
.describedAs("hostname:port")
.ofType(classOf[String])
val inputTopicOpt = parser.accepts("inputtopic", "REQUIRED: The topic to consume from.")
.withRequiredArg
.describedAs("input-topic")
.ofType(classOf[String])
val outputTopicOpt = parser.accepts("outputtopic", "REQUIRED: The topic to produce to")
.withRequiredArg
.describedAs("output-topic")
.ofType(classOf[String])
val numMessagesOpt = parser.accepts("messages", "The number of messages to send.")
.withRequiredArg
.describedAs("count")
.ofType(classOf[java.lang.Integer])
.defaultsTo(-1)
val numThreadsOpt = parser.accepts("threads", "Number of sending threads.")
.withRequiredArg
.describedAs("threads")
.ofType(classOf[java.lang.Integer])
.defaultsTo(1)
val reportingIntervalOpt = parser.accepts("reporting-interval", "Interval at which to print progress info.")
.withRequiredArg
.describedAs("size")
.ofType(classOf[java.lang.Integer])
.defaultsTo(5000)
val propertyOpt = parser.accepts("property", "A mechanism to pass properties in the form key=value to the producer. " +
"This allows the user to override producer properties that are not exposed by the existing command line arguments")
.withRequiredArg
.describedAs("producer properties")
.ofType(classOf[String])
val syncOpt = parser.accepts("sync", "If set message send requests to the brokers are synchronously, one at a time as they arrive.")
val options = parser.parse(args : _*)
CommandLineUtils.checkRequiredArgs(parser, options, brokerListOpt, inputTopicOpt)
val zkConnect = options.valueOf(zkConnectOpt)
val brokerList = options.valueOf(brokerListOpt)
ToolsUtils.validatePortOrDie(parser,brokerList)
val numMessages = options.valueOf(numMessagesOpt).intValue
val numThreads = options.valueOf(numThreadsOpt).intValue
val inputTopic = options.valueOf(inputTopicOpt)
val outputTopic = options.valueOf(outputTopicOpt)
val reportingInterval = options.valueOf(reportingIntervalOpt).intValue
val isSync = options.has(syncOpt)
val producerProps = CommandLineUtils.parseKeyValueArgs(options.valuesOf(propertyOpt).asScala)
producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList)
producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer")
producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer")
}
class ZKConsumerThread(config: Config, stream: KafkaStream[Array[Byte], Array[Byte]]) extends Thread with Logging {
val shutdownLatch = new CountDownLatch(1)
val producer = new KafkaProducer[Array[Byte],Array[Byte]](config.producerProps)
override def run() {
info("Starting consumer thread..")
var messageCount: Int = 0
try {
val iter =
if(config.numMessages >= 0)
stream.slice(0, config.numMessages)
else
stream
for (messageAndMetadata <- iter) {
try {
val response = producer.send(new ProducerRecord[Array[Byte],Array[Byte]](config.outputTopic, null,
messageAndMetadata.timestamp, messageAndMetadata.key(), messageAndMetadata.message()))
if(config.isSync) {
response.get()
}
messageCount += 1
}catch {
case ie: Exception => error("Skipping this message", ie)
}
}
}catch {
case e: ConsumerTimeoutException => error("consumer thread timing out", e)
}
info("Sent " + messageCount + " messages")
shutdownLatch.countDown
info("thread finished execution !" )
}
def shutdown() {
shutdownLatch.await
producer.close
}
}
}