blob: 533fe4600e71b9b8cf3351eb8719c017ec0d8265 [file] [log] [blame]
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* See the License for the specific language governing permissions and
* limitations under the License.
package kafka.producer
import scala.collection.JavaConversions._
import joptsimple._
import java.util.Properties
import kafka.message._
import kafka.serializer._
object ConsoleProducer {
def main(args: Array[String]) {
val parser = new OptionParser
val topicOpt = parser.accepts("topic", "REQUIRED: The topic id to produce messages to.")
val zkConnectOpt = parser.accepts("zookeeper", "REQUIRED: The zookeeper connection string for the kafka zookeeper instance in the form HOST:PORT[/CHROOT].")
val syncOpt = parser.accepts("sync", "If set message send requests to the brokers are synchronously, one at a time as they arrive.")
val compressOpt = parser.accepts("compress", "If set, messages batches are sent compressed")
val batchSizeOpt = parser.accepts("batch-size", "Number of messages to send in a single batch if they are not being sent synchronously.")
val sendTimeoutOpt = parser.accepts("timeout", "If set and the producer is running in asynchronous mode, this gives the maximum amount of time" +
" a message will queue awaiting suffient batch size. The value is given in ms.")
val messageEncoderOpt = parser.accepts("message-encoder", "The class name of the message encoder implementation to use.")
val messageReaderOpt = parser.accepts("line-reader", "The class name of the class to use for reading lines from standard in. " +
"By default each line is read as a seperate message.")
val propertyOpt = parser.accepts("property", "A mechanism to pass user-defined properties in the form key=value to the message reader. " +
"This allows custom configuration for a user-defined message reader.")
val options = parser.parse(args : _*)
for(arg <- List(topicOpt, zkConnectOpt)) {
if(!options.has(arg)) {
System.err.println("Missing required argument \"" + arg + "\"")
val topic = options.valueOf(topicOpt)
val zkConnect = options.valueOf(zkConnectOpt)
val sync = options.has(syncOpt)
val compress = options.has(compressOpt)
val batchSize = options.valueOf(batchSizeOpt)
val sendTimeout = options.valueOf(sendTimeoutOpt)
val encoderClass = options.valueOf(messageEncoderOpt)
val readerClass = options.valueOf(messageReaderOpt)
val cmdLineProps = parseLineReaderArgs(options.valuesOf(propertyOpt))
val props = new Properties()
props.put("zk.connect", zkConnect)
props.put("compression.codec", DefaultCompressionCodec.codec.toString)
props.put("producer.type", if(sync) "sync" else "async")
props.put("batch.size", batchSize.toString)
props.put("queue.time", sendTimeout.toString)
props.put("serializer.class", encoderClass)
val reader = Class.forName(readerClass).newInstance().asInstanceOf[MessageReader]
reader.init(, cmdLineProps)
val producer = new Producer[Any, Any](new ProducerConfig(props))
Runtime.getRuntime.addShutdownHook(new Thread() {
override def run() {
var message: AnyRef = null
do {
message = reader.readMessage()
if(message != null)
producer.send(new ProducerData(topic, message))
} while(message != null)
def parseLineReaderArgs(args: Iterable[String]): Properties = {
val splits = split "=").filterNot(_ == null).filterNot(_.length == 0)
if(!splits.forall(_.length == 2)) {
System.err.println("Invalid line reader properties: " + args.mkString(" "))
val props = new Properties
for(a <- splits)
props.put(a(0), a(1))
trait MessageReader {
def init(inputStream: InputStream, props: Properties) {}
def readMessage(): AnyRef
def close() {}
class LineMessageReader extends MessageReader {
var reader: BufferedReader = null
override def init(inputStream: InputStream, props: Properties) {
reader = new BufferedReader(new InputStreamReader(inputStream))
override def readMessage() = reader.readLine()