blob: b4f903b6c7c3bb725cac7c05eb1f885906413c4d [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package kafka.tools
import joptsimple._
import kafka.utils._
import kafka.consumer._
import kafka.client.ClientUtils
import kafka.api.{OffsetRequest, FetchRequestBuilder, Request}
import kafka.cluster.Broker
import scala.collection.JavaConversions._
import kafka.common.TopicAndPartition
/**
* Command line program to dump out messages to standard out using the simple consumer
*/
object SimpleConsumerShell extends Logging {
def UseLeaderReplica = -1
def main(args: Array[String]): Unit = {
val parser = new OptionParser
val brokerListOpt = parser.accepts("broker-list", "REQUIRED: The list of hostname and port of the server to connect to.")
.withRequiredArg
.describedAs("hostname:port,...,hostname:port")
.ofType(classOf[String])
val topicOpt = parser.accepts("topic", "REQUIRED: The topic to consume from.")
.withRequiredArg
.describedAs("topic")
.ofType(classOf[String])
val partitionIdOpt = parser.accepts("partition", "The partition to consume from.")
.withRequiredArg
.describedAs("partition")
.ofType(classOf[java.lang.Integer])
.defaultsTo(0)
val replicaIdOpt = parser.accepts("replica", "The replica id to consume from, default -1 means leader broker.")
.withRequiredArg
.describedAs("replica id")
.ofType(classOf[java.lang.Integer])
.defaultsTo(UseLeaderReplica)
val offsetOpt = parser.accepts("offset", "The offset id to consume from, default to -2 which means from beginning; while value -1 means from end")
.withRequiredArg
.describedAs("consume offset")
.ofType(classOf[java.lang.Long])
.defaultsTo(OffsetRequest.EarliestTime)
val clientIdOpt = parser.accepts("clientId", "The ID of this client.")
.withRequiredArg
.describedAs("clientId")
.ofType(classOf[String])
.defaultsTo("SimpleConsumerShell")
val fetchSizeOpt = parser.accepts("fetchsize", "The fetch size of each request.")
.withRequiredArg
.describedAs("fetchsize")
.ofType(classOf[java.lang.Integer])
.defaultsTo(1024 * 1024)
val messageFormatterOpt = parser.accepts("formatter", "The name of a class to use for formatting kafka messages for display.")
.withRequiredArg
.describedAs("class")
.ofType(classOf[String])
.defaultsTo(classOf[DefaultMessageFormatter].getName)
val messageFormatterArgOpt = parser.accepts("property")
.withRequiredArg
.describedAs("prop")
.ofType(classOf[String])
val printOffsetOpt = parser.accepts("print-offsets", "Print the offsets returned by the iterator")
val maxWaitMsOpt = parser.accepts("max-wait-ms", "The max amount of time each fetch request waits.")
.withRequiredArg
.describedAs("ms")
.ofType(classOf[java.lang.Integer])
.defaultsTo(1000)
val maxMessagesOpt = parser.accepts("max-messages", "The number of messages to consume")
.withRequiredArg
.describedAs("max-messages")
.ofType(classOf[java.lang.Integer])
.defaultsTo(Integer.MAX_VALUE)
val skipMessageOnErrorOpt = parser.accepts("skip-message-on-error", "If there is an error when processing a message, " +
"skip it instead of halt.")
val noWaitAtEndOfLogOpt = parser.accepts("no-wait-at-logend",
"If set, when the simple consumer reaches the end of the Log, it will stop, not waiting for new produced messages")
if(args.length == 0)
CommandLineUtils.printUsageAndDie(parser, "A low-level tool for fetching data directly from a particular replica.")
val options = parser.parse(args : _*)
CommandLineUtils.checkRequiredArgs(parser, options, brokerListOpt, topicOpt, partitionIdOpt)
val topic = options.valueOf(topicOpt)
val partitionId = options.valueOf(partitionIdOpt).intValue()
val replicaId = options.valueOf(replicaIdOpt).intValue()
var startingOffset = options.valueOf(offsetOpt).longValue
val fetchSize = options.valueOf(fetchSizeOpt).intValue
val clientId = options.valueOf(clientIdOpt).toString
val maxWaitMs = options.valueOf(maxWaitMsOpt).intValue()
val maxMessages = options.valueOf(maxMessagesOpt).intValue
val skipMessageOnError = if (options.has(skipMessageOnErrorOpt)) true else false
val printOffsets = if(options.has(printOffsetOpt)) true else false
val noWaitAtEndOfLog = options.has(noWaitAtEndOfLogOpt)
val messageFormatterClass = Class.forName(options.valueOf(messageFormatterOpt))
val formatterArgs = CommandLineUtils.parseKeyValueArgs(options.valuesOf(messageFormatterArgOpt))
val fetchRequestBuilder = new FetchRequestBuilder()
.clientId(clientId)
.replicaId(Request.DebuggingConsumerId)
.maxWait(maxWaitMs)
.minBytes(ConsumerConfig.MinFetchBytes)
// getting topic metadata
info("Getting topic metatdata...")
val brokerList = options.valueOf(brokerListOpt)
ToolsUtils.validatePortOrDie(parser,brokerList)
val metadataTargetBrokers = ClientUtils.parseBrokerList(brokerList)
val topicsMetadata = ClientUtils.fetchTopicMetadata(Set(topic), metadataTargetBrokers, clientId, maxWaitMs).topicsMetadata
if(topicsMetadata.size != 1 || !topicsMetadata(0).topic.equals(topic)) {
System.err.println(("Error: no valid topic metadata for topic: %s, " + "what we get from server is only: %s").format(topic, topicsMetadata))
System.exit(1)
}
// validating partition id
val partitionsMetadata = topicsMetadata(0).partitionsMetadata
val partitionMetadataOpt = partitionsMetadata.find(p => p.partitionId == partitionId)
if(!partitionMetadataOpt.isDefined) {
System.err.println("Error: partition %d does not exist for topic %s".format(partitionId, topic))
System.exit(1)
}
// validating replica id and initializing target broker
var fetchTargetBroker: Broker = null
var replicaOpt: Option[Broker] = null
if(replicaId == UseLeaderReplica) {
replicaOpt = partitionMetadataOpt.get.leader
if(!replicaOpt.isDefined) {
System.err.println("Error: user speicifies to fetch from leader for partition (%s, %d) which has not been elected yet".format(replicaId, topic, partitionId))
System.exit(1)
}
}
else {
val replicasForPartition = partitionMetadataOpt.get.replicas
replicaOpt = replicasForPartition.find(r => r.id == replicaId)
if(!replicaOpt.isDefined) {
System.err.println("Error: replica %d does not exist for partition (%s, %d)".format(replicaId, topic, partitionId))
System.exit(1)
}
}
fetchTargetBroker = replicaOpt.get
// initializing starting offset
if(startingOffset < OffsetRequest.EarliestTime) {
System.err.println("Invalid starting offset: %d".format(startingOffset))
System.exit(1)
}
if (startingOffset < 0) {
val simpleConsumer = new SimpleConsumer(fetchTargetBroker.host, fetchTargetBroker.port, ConsumerConfig.SocketTimeout,
ConsumerConfig.SocketBufferSize, clientId)
try {
startingOffset = simpleConsumer.earliestOrLatestOffset(TopicAndPartition(topic, partitionId), startingOffset,
Request.DebuggingConsumerId)
} catch {
case t: Throwable =>
System.err.println("Error in getting earliest or latest offset due to: " + Utils.stackTrace(t))
System.exit(1)
} finally {
if (simpleConsumer != null)
simpleConsumer.close()
}
}
// initializing formatter
val formatter: MessageFormatter = messageFormatterClass.newInstance().asInstanceOf[MessageFormatter]
formatter.init(formatterArgs)
val replicaString = if(replicaId > 0) "leader" else "replica"
info("Starting simple consumer shell to partition [%s, %d], %s [%d], host and port: [%s, %d], from offset [%d]"
.format(topic, partitionId, replicaString, replicaId, fetchTargetBroker.host, fetchTargetBroker.port, startingOffset))
val simpleConsumer = new SimpleConsumer(fetchTargetBroker.host, fetchTargetBroker.port, 10000, 64*1024, clientId)
val thread = Utils.newThread("kafka-simpleconsumer-shell", new Runnable() {
def run() {
var offset = startingOffset
var numMessagesConsumed = 0
try {
while(numMessagesConsumed < maxMessages) {
val fetchRequest = fetchRequestBuilder
.addFetch(topic, partitionId, offset, fetchSize)
.build()
val fetchResponse = simpleConsumer.fetch(fetchRequest)
val messageSet = fetchResponse.messageSet(topic, partitionId)
if (messageSet.validBytes <= 0 && noWaitAtEndOfLog) {
println("Terminating. Reached the end of partition (%s, %d) at offset %d".format(topic, partitionId, offset))
return
}
debug("multi fetched " + messageSet.sizeInBytes + " bytes from offset " + offset)
for(messageAndOffset <- messageSet if(numMessagesConsumed < maxMessages)) {
try {
offset = messageAndOffset.nextOffset
if(printOffsets)
System.out.println("next offset = " + offset)
val message = messageAndOffset.message
val key = if(message.hasKey) Utils.readBytes(message.key) else null
formatter.writeTo(key, if(message.isNull) null else Utils.readBytes(message.payload), System.out)
numMessagesConsumed += 1
} catch {
case e: Throwable =>
if (skipMessageOnError)
error("Error processing message, skipping this message: ", e)
else
throw e
}
if(System.out.checkError()) {
// This means no one is listening to our output stream any more, time to shutdown
System.err.println("Unable to write to standard out, closing consumer.")
formatter.close()
simpleConsumer.close()
System.exit(1)
}
}
}
} catch {
case e: Throwable =>
error("Error consuming topic, partition, replica (%s, %d, %d) with offset [%d]".format(topic, partitionId, replicaId, offset), e)
}finally {
info("Consumed " + numMessagesConsumed + " messages")
}
}
}, false)
thread.start()
thread.join()
System.out.flush()
formatter.close()
simpleConsumer.close()
}
}