blob: 533fe4600e71b9b8cf3351eb8719c017ec0d8265 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package kafka.producer
import scala.collection.JavaConversions._
import joptsimple._
import java.util.Properties
import java.io._
import kafka.message._
import kafka.serializer._
object ConsoleProducer {
def main(args: Array[String]) {
val parser = new OptionParser
val topicOpt = parser.accepts("topic", "REQUIRED: The topic id to produce messages to.")
.withRequiredArg
.describedAs("topic")
.ofType(classOf[String])
val zkConnectOpt = parser.accepts("zookeeper", "REQUIRED: The zookeeper connection string for the kafka zookeeper instance in the form HOST:PORT[/CHROOT].")
.withRequiredArg
.describedAs("connection_string")
.ofType(classOf[String])
val syncOpt = parser.accepts("sync", "If set message send requests to the brokers are synchronously, one at a time as they arrive.")
val compressOpt = parser.accepts("compress", "If set, messages batches are sent compressed")
val batchSizeOpt = parser.accepts("batch-size", "Number of messages to send in a single batch if they are not being sent synchronously.")
.withRequiredArg
.describedAs("size")
.ofType(classOf[java.lang.Integer])
.defaultsTo(200)
val sendTimeoutOpt = parser.accepts("timeout", "If set and the producer is running in asynchronous mode, this gives the maximum amount of time" +
" a message will queue awaiting suffient batch size. The value is given in ms.")
.withRequiredArg
.describedAs("timeout_ms")
.ofType(classOf[java.lang.Long])
.defaultsTo(1000)
val messageEncoderOpt = parser.accepts("message-encoder", "The class name of the message encoder implementation to use.")
.withRequiredArg
.describedAs("encoder_class")
.ofType(classOf[java.lang.String])
.defaultsTo(classOf[StringEncoder].getName)
val messageReaderOpt = parser.accepts("line-reader", "The class name of the class to use for reading lines from standard in. " +
"By default each line is read as a seperate message.")
.withRequiredArg
.describedAs("reader_class")
.ofType(classOf[java.lang.String])
.defaultsTo(classOf[LineMessageReader].getName)
val propertyOpt = parser.accepts("property", "A mechanism to pass user-defined properties in the form key=value to the message reader. " +
"This allows custom configuration for a user-defined message reader.")
.withRequiredArg
.describedAs("prop")
.ofType(classOf[String])
val options = parser.parse(args : _*)
for(arg <- List(topicOpt, zkConnectOpt)) {
if(!options.has(arg)) {
System.err.println("Missing required argument \"" + arg + "\"")
parser.printHelpOn(System.err)
System.exit(1)
}
}
val topic = options.valueOf(topicOpt)
val zkConnect = options.valueOf(zkConnectOpt)
val sync = options.has(syncOpt)
val compress = options.has(compressOpt)
val batchSize = options.valueOf(batchSizeOpt)
val sendTimeout = options.valueOf(sendTimeoutOpt)
val encoderClass = options.valueOf(messageEncoderOpt)
val readerClass = options.valueOf(messageReaderOpt)
val cmdLineProps = parseLineReaderArgs(options.valuesOf(propertyOpt))
val props = new Properties()
props.put("zk.connect", zkConnect)
props.put("compression.codec", DefaultCompressionCodec.codec.toString)
props.put("producer.type", if(sync) "sync" else "async")
if(options.has(batchSizeOpt))
props.put("batch.size", batchSize.toString)
props.put("queue.time", sendTimeout.toString)
props.put("serializer.class", encoderClass)
val reader = Class.forName(readerClass).newInstance().asInstanceOf[MessageReader]
reader.init(System.in, cmdLineProps)
val producer = new Producer[Any, Any](new ProducerConfig(props))
Runtime.getRuntime.addShutdownHook(new Thread() {
override def run() {
producer.close()
}
})
var message: AnyRef = null
do {
message = reader.readMessage()
if(message != null)
producer.send(new ProducerData(topic, message))
} while(message != null)
}
def parseLineReaderArgs(args: Iterable[String]): Properties = {
val splits = args.map(_ split "=").filterNot(_ == null).filterNot(_.length == 0)
if(!splits.forall(_.length == 2)) {
System.err.println("Invalid line reader properties: " + args.mkString(" "))
System.exit(1)
}
val props = new Properties
for(a <- splits)
props.put(a(0), a(1))
props
}
trait MessageReader {
def init(inputStream: InputStream, props: Properties) {}
def readMessage(): AnyRef
def close() {}
}
class LineMessageReader extends MessageReader {
var reader: BufferedReader = null
override def init(inputStream: InputStream, props: Properties) {
reader = new BufferedReader(new InputStreamReader(inputStream))
}
override def readMessage() = reader.readLine()
}
}