blob: 74218ecb22ba72406a55a01fc78561be71c7e37d [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package kafka.tools
import java.net.URI
import joptsimple._
import kafka.api.FetchRequest
import kafka.utils._
import kafka.consumer._
/**
* Command line program to dump out messages to standard out using the simple consumer
*/
object SimpleConsumerShell extends Logging {
def main(args: Array[String]): Unit = {
val parser = new OptionParser
val urlOpt = parser.accepts("server", "REQUIRED: The hostname of the server to connect to.")
.withRequiredArg
.describedAs("kafka://hostname:port")
.ofType(classOf[String])
val topicOpt = parser.accepts("topic", "REQUIRED: The topic to consume from.")
.withRequiredArg
.describedAs("topic")
.ofType(classOf[String])
val partitionOpt = parser.accepts("partition", "The partition to consume from.")
.withRequiredArg
.describedAs("partition")
.ofType(classOf[java.lang.Integer])
.defaultsTo(0)
val offsetOpt = parser.accepts("offset", "The offset to start consuming from.")
.withRequiredArg
.describedAs("offset")
.ofType(classOf[java.lang.Long])
.defaultsTo(0L)
val fetchsizeOpt = parser.accepts("fetchsize", "The fetch size of each request.")
.withRequiredArg
.describedAs("fetchsize")
.ofType(classOf[java.lang.Integer])
.defaultsTo(1000000)
val printOffsetOpt = parser.accepts("print-offsets", "Print the offsets returned by the iterator")
.withOptionalArg
.describedAs("print offsets")
.ofType(classOf[java.lang.Boolean])
.defaultsTo(false)
val printMessageOpt = parser.accepts("print-messages", "Print the messages returned by the iterator")
.withOptionalArg
.describedAs("print messages")
.ofType(classOf[java.lang.Boolean])
.defaultsTo(false)
val options = parser.parse(args : _*)
for(arg <- List(urlOpt, topicOpt)) {
if(!options.has(arg)) {
error("Missing required argument \"" + arg + "\"")
parser.printHelpOn(System.err)
System.exit(1)
}
}
val url = new URI(options.valueOf(urlOpt))
val topic = options.valueOf(topicOpt)
val partition = options.valueOf(partitionOpt).intValue
val startingOffset = options.valueOf(offsetOpt).longValue
val fetchsize = options.valueOf(fetchsizeOpt).intValue
val printOffsets = if(options.has(printOffsetOpt)) true else false
val printMessages = if(options.has(printMessageOpt)) true else false
info("Starting consumer...")
val consumer = new SimpleConsumer(url.getHost, url.getPort, 10000, 64*1024)
val thread = Utils.newThread("kafka-consumer", new Runnable() {
def run() {
var offset = startingOffset
while(true) {
val fetchRequest = new FetchRequest(topic, partition, offset, fetchsize)
val messageSets = consumer.multifetch(fetchRequest)
for (messages <- messageSets) {
debug("multi fetched " + messages.sizeInBytes + " bytes from offset " + offset)
var consumed = 0
for(messageAndOffset <- messages) {
if(printMessages)
info("consumed: " + Utils.toString(messageAndOffset.message.payload, "UTF-8"))
offset = messageAndOffset.offset
if(printOffsets)
info("next offset = " + offset)
consumed += 1
}
}
}
}
}, false);
thread.start()
thread.join()
}
}