blob: fa80da0be90c66df19295170d24f845d8ae1a7ae [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.storm.kafka.monitor;
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.CommandLineParser;
import org.apache.commons.cli.DefaultParser;
import org.apache.commons.cli.HelpFormatter;
import org.apache.commons.cli.Options;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.RetryOneTime;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.json.simple.JSONValue;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import kafka.api.OffsetRequest;
import kafka.api.PartitionOffsetRequestInfo;
import kafka.common.TopicAndPartition;
import kafka.javaapi.OffsetResponse;
import kafka.javaapi.consumer.SimpleConsumer;
/**
* Utility class for querying offset lag for kafka spout
*/
public class KafkaOffsetLagUtil {
private static final String OPTION_TOPIC_SHORT = "t";
private static final String OPTION_TOPIC_LONG = "topics";
private static final String OPTION_OLD_CONSUMER_SHORT = "o";
private static final String OPTION_OLD_CONSUMER_LONG = "old-spout";
private static final String OPTION_BOOTSTRAP_BROKERS_SHORT = "b";
private static final String OPTION_BOOTSTRAP_BROKERS_LONG = "bootstrap-brokers";
private static final String OPTION_GROUP_ID_SHORT = "g";
private static final String OPTION_GROUP_ID_LONG = "groupid";
private static final String OPTION_TOPIC_WILDCARD_SHORT = "w";
private static final String OPTION_TOPIC_WILDCARD_LONG = "wildcard-topic";
private static final String OPTION_PARTITIONS_SHORT = "p";
private static final String OPTION_PARTITIONS_LONG = "partitions";
private static final String OPTION_LEADERS_SHORT = "l";
private static final String OPTION_LEADERS_LONG = "leaders";
private static final String OPTION_ZK_SERVERS_SHORT = "z";
private static final String OPTION_ZK_SERVERS_LONG = "zk-servers";
private static final String OPTION_ZK_COMMITTED_NODE_SHORT = "n";
private static final String OPTION_ZK_COMMITTED_NODE_LONG = "zk-node";
private static final String OPTION_ZK_BROKERS_ROOT_SHORT = "r";
private static final String OPTION_ZK_BROKERS_ROOT_LONG = "zk-brokers-root-node";
private static final String OPTION_SECURITY_PROTOCOL_SHORT = "s";
private static final String OPTION_SECURITY_PROTOCOL_LONG = "security-protocol";
public static void main (String args[]) {
try {
List<KafkaOffsetLagResult> results;
Options options = buildOptions();
CommandLineParser parser = new DefaultParser();
CommandLine commandLine = parser.parse(options, args);
if (!commandLine.hasOption(OPTION_TOPIC_LONG)) {
printUsageAndExit(options, OPTION_TOPIC_LONG + " is required");
}
if (commandLine.hasOption(OPTION_OLD_CONSUMER_LONG)) {
OldKafkaSpoutOffsetQuery oldKafkaSpoutOffsetQuery;
if (commandLine.hasOption(OPTION_GROUP_ID_LONG) || commandLine.hasOption(OPTION_BOOTSTRAP_BROKERS_LONG) || commandLine.hasOption(OPTION_SECURITY_PROTOCOL_LONG)) {
printUsageAndExit(options, OPTION_GROUP_ID_LONG + " or " + OPTION_BOOTSTRAP_BROKERS_LONG + " or " + OPTION_SECURITY_PROTOCOL_LONG + " is " +
"not accepted with option " + OPTION_OLD_CONSUMER_LONG);
}
if (!commandLine.hasOption(OPTION_ZK_SERVERS_LONG) || !commandLine.hasOption(OPTION_ZK_COMMITTED_NODE_LONG)) {
printUsageAndExit(options, OPTION_ZK_SERVERS_LONG + " and " + OPTION_ZK_COMMITTED_NODE_LONG + " are required with " +
OPTION_OLD_CONSUMER_LONG);
}
String[] topics = commandLine.getOptionValue(OPTION_TOPIC_LONG).split(",");
if (topics != null && topics.length > 1) {
printUsageAndExit(options, "Multiple topics not supported with option " + OPTION_OLD_CONSUMER_LONG + ". Either a single topic or a " +
"wildcard string for matching topics is supported");
}
if (commandLine.hasOption(OPTION_ZK_BROKERS_ROOT_LONG)) {
if (commandLine.hasOption(OPTION_PARTITIONS_LONG) || commandLine.hasOption(OPTION_LEADERS_LONG)) {
printUsageAndExit(options, OPTION_PARTITIONS_LONG + " or " + OPTION_LEADERS_LONG + " is not accepted with " +
OPTION_ZK_BROKERS_ROOT_LONG);
}
oldKafkaSpoutOffsetQuery = new OldKafkaSpoutOffsetQuery(commandLine.getOptionValue(OPTION_TOPIC_LONG), commandLine.getOptionValue
(OPTION_ZK_SERVERS_LONG), commandLine.getOptionValue(OPTION_ZK_COMMITTED_NODE_LONG), commandLine.hasOption
(OPTION_TOPIC_WILDCARD_LONG), commandLine.getOptionValue(OPTION_ZK_BROKERS_ROOT_LONG));
} else {
if (commandLine.hasOption(OPTION_TOPIC_WILDCARD_LONG)) {
printUsageAndExit(options, OPTION_TOPIC_WILDCARD_LONG + " is not supported without " + OPTION_ZK_BROKERS_ROOT_LONG);
}
if (!commandLine.hasOption(OPTION_PARTITIONS_LONG) || !commandLine.hasOption(OPTION_LEADERS_LONG)) {
printUsageAndExit(options, OPTION_PARTITIONS_LONG + " and " + OPTION_LEADERS_LONG + " are required if " + OPTION_ZK_BROKERS_ROOT_LONG +
" is not provided");
}
String[] partitions = commandLine.getOptionValue(OPTION_PARTITIONS_LONG).split(",");
String[] leaders = commandLine.getOptionValue(OPTION_LEADERS_LONG).split(",");
if (partitions.length != leaders.length) {
printUsageAndExit(options, OPTION_PARTITIONS_LONG + " and " + OPTION_LEADERS_LONG + " need to be of same size");
}
oldKafkaSpoutOffsetQuery = new OldKafkaSpoutOffsetQuery(commandLine.getOptionValue(OPTION_TOPIC_LONG), commandLine.getOptionValue
(OPTION_ZK_SERVERS_LONG), commandLine.getOptionValue(OPTION_ZK_COMMITTED_NODE_LONG), commandLine.getOptionValue
(OPTION_PARTITIONS_LONG), commandLine.getOptionValue(OPTION_LEADERS_LONG));
}
results = getOffsetLags(oldKafkaSpoutOffsetQuery);
} else {
String securityProtocol = commandLine.getOptionValue(OPTION_SECURITY_PROTOCOL_LONG);
String[] oldSpoutOptions = {OPTION_TOPIC_WILDCARD_LONG, OPTION_PARTITIONS_LONG, OPTION_LEADERS_LONG, OPTION_ZK_SERVERS_LONG,
OPTION_ZK_COMMITTED_NODE_LONG, OPTION_ZK_BROKERS_ROOT_LONG};
for (String oldOption: oldSpoutOptions) {
if (commandLine.hasOption(oldOption)) {
printUsageAndExit(options, oldOption + " is not accepted without " + OPTION_OLD_CONSUMER_LONG);
}
}
if (!commandLine.hasOption(OPTION_GROUP_ID_LONG) || !commandLine.hasOption(OPTION_BOOTSTRAP_BROKERS_LONG)) {
printUsageAndExit(options, OPTION_GROUP_ID_LONG + " and " + OPTION_BOOTSTRAP_BROKERS_LONG + " are required if " + OPTION_OLD_CONSUMER_LONG +
" is not specified");
}
NewKafkaSpoutOffsetQuery newKafkaSpoutOffsetQuery = new NewKafkaSpoutOffsetQuery(commandLine.getOptionValue(OPTION_TOPIC_LONG),
commandLine.getOptionValue(OPTION_BOOTSTRAP_BROKERS_LONG), commandLine.getOptionValue(OPTION_GROUP_ID_LONG), securityProtocol);
results = getOffsetLags(newKafkaSpoutOffsetQuery);
}
Map<String, Map<Integer, KafkaPartitionOffsetLag>> keyedResult = keyByTopicAndPartition(results);
System.out.print(JSONValue.toJSONString(keyedResult));
} catch (Exception ex) {
System.out.print("Unable to get offset lags for kafka. Reason: ");
ex.printStackTrace(System.out);
}
}
private static Map<String, Map<Integer, KafkaPartitionOffsetLag>> keyByTopicAndPartition(
List<KafkaOffsetLagResult> results) {
Map<String, Map<Integer, KafkaPartitionOffsetLag>> resultKeyedByTopic = new HashMap<>();
for (KafkaOffsetLagResult result : results) {
String topic = result.getTopic();
Map<Integer, KafkaPartitionOffsetLag> topicResultKeyedByPartition = resultKeyedByTopic.get(topic);
if (topicResultKeyedByPartition == null) {
topicResultKeyedByPartition = new HashMap<>();
resultKeyedByTopic.put(topic, topicResultKeyedByPartition);
}
topicResultKeyedByPartition.put(result.getPartition(),
new KafkaPartitionOffsetLag(result.getConsumerCommittedOffset(), result.getLogHeadOffset()));
}
return resultKeyedByTopic;
}
private static void printUsageAndExit (Options options, String message) {
System.out.println(message);
HelpFormatter formatter = new HelpFormatter();
formatter.printHelp("storm-kafka-monitor ", options);
System.exit(1);
}
private static Options buildOptions () {
Options options = new Options();
options.addOption(OPTION_TOPIC_SHORT, OPTION_TOPIC_LONG, true, "REQUIRED Topics (comma separated list) for fetching log head and spout committed " +
"offset");
options.addOption(OPTION_OLD_CONSUMER_SHORT, OPTION_OLD_CONSUMER_LONG, false, "Whether request is for old spout");
options.addOption(OPTION_BOOTSTRAP_BROKERS_SHORT, OPTION_BOOTSTRAP_BROKERS_LONG, true, "Comma separated list of bootstrap broker hosts for new " +
"consumer/spout e.g. hostname1:9092,hostname2:9092");
options.addOption(OPTION_GROUP_ID_SHORT, OPTION_GROUP_ID_LONG, true, "Group id of consumer (applicable only for new kafka spout) ");
options.addOption(OPTION_TOPIC_WILDCARD_SHORT, OPTION_TOPIC_WILDCARD_LONG, false, "Whether topic provided is a wildcard as supported by ZkHosts in " +
"old spout");
options.addOption(OPTION_PARTITIONS_SHORT, OPTION_PARTITIONS_LONG, true, "Comma separated list of partitions corresponding to " +
OPTION_LEADERS_LONG + " for old spout with StaticHosts");
options.addOption(OPTION_LEADERS_SHORT, OPTION_LEADERS_LONG, true, "Comma separated list of broker leaders corresponding to " +
OPTION_PARTITIONS_LONG + " for old spout with StaticHosts e.g. hostname1:9092,hostname2:9092");
options.addOption(OPTION_ZK_SERVERS_SHORT, OPTION_ZK_SERVERS_LONG, true, "Comma separated list of zk servers for fetching spout committed offsets " +
"and/or topic metadata for ZkHosts e.g hostname1:2181,hostname2:2181");
options.addOption(OPTION_ZK_COMMITTED_NODE_SHORT, OPTION_ZK_COMMITTED_NODE_LONG, true, "Zk node prefix where old kafka spout stores the committed" +
" offsets without the topic and partition nodes");
options.addOption(OPTION_ZK_BROKERS_ROOT_SHORT, OPTION_ZK_BROKERS_ROOT_LONG, true, "Zk node prefix where kafka stores broker information e.g. " +
"/brokers (applicable only for old kafka spout) ");
options.addOption(OPTION_SECURITY_PROTOCOL_SHORT, OPTION_SECURITY_PROTOCOL_LONG, true, "Security protocol to connect to kafka");
return options;
}
/**
*
* @param newKafkaSpoutOffsetQuery represents the information needed to query kafka for log head and spout offsets
* @return log head offset, spout offset and lag for each partition
*/
public static List<KafkaOffsetLagResult> getOffsetLags (NewKafkaSpoutOffsetQuery newKafkaSpoutOffsetQuery) {
KafkaConsumer<String, String> consumer = null;
List<KafkaOffsetLagResult> result = new ArrayList<>();
try {
Properties props = new Properties();
props.put("bootstrap.servers", newKafkaSpoutOffsetQuery.getBootStrapBrokers());
props.put("group.id", newKafkaSpoutOffsetQuery.getConsumerGroupId());
props.put("enable.auto.commit", "false");
props.put("session.timeout.ms", "30000");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
if (newKafkaSpoutOffsetQuery.getSecurityProtocol() != null) {
props.put("security.protocol", newKafkaSpoutOffsetQuery.getSecurityProtocol());
}
List<TopicPartition> topicPartitionList = new ArrayList<>();
consumer = new KafkaConsumer<>(props);
for (String topic: newKafkaSpoutOffsetQuery.getTopics().split(",")) {
List<PartitionInfo> partitionInfoList = consumer.partitionsFor(topic);
if (partitionInfoList != null) {
for (PartitionInfo partitionInfo : partitionInfoList) {
topicPartitionList.add(new TopicPartition(partitionInfo.topic(), partitionInfo.partition()));
}
}
}
consumer.assign(topicPartitionList);
for (TopicPartition topicPartition : topicPartitionList) {
OffsetAndMetadata offsetAndMetadata = consumer.committed(topicPartition);
long committedOffset = offsetAndMetadata != null ? offsetAndMetadata.offset() : -1;
consumer.seekToEnd(toArrayList(topicPartition));
result.add(new KafkaOffsetLagResult(topicPartition.topic(), topicPartition.partition(), committedOffset, consumer.position(topicPartition)));
}
} finally {
if (consumer != null) {
consumer.close();
}
}
return result;
}
private static Collection<TopicPartition> toArrayList(final TopicPartition tp) {
return new ArrayList<TopicPartition>(1){{add(tp);}};
}
/**
*
* @param oldKafkaSpoutOffsetQuery represents the information needed to query kafka for log head and spout offsets
* @return log head offset, spout offset and lag for each partition
*/
public static List<KafkaOffsetLagResult> getOffsetLags (OldKafkaSpoutOffsetQuery oldKafkaSpoutOffsetQuery) throws Exception {
List<KafkaOffsetLagResult> result = new ArrayList<>();
Map<String, List<TopicPartition>> leaders = getLeadersAndTopicPartitions(oldKafkaSpoutOffsetQuery);
if (leaders != null) {
Map<String, Map<Integer, Long>> logHeadOffsets = getLogHeadOffsets(leaders);
Map<String, List<Integer>> topicPartitions = new HashMap<>();
for (Map.Entry<String, List<TopicPartition>> entry: leaders.entrySet()) {
for (TopicPartition topicPartition: entry.getValue()) {
if (!topicPartitions.containsKey(topicPartition.topic())) {
topicPartitions.put(topicPartition.topic(), new ArrayList<Integer>());
}
topicPartitions.get(topicPartition.topic()).add(topicPartition.partition());
}
}
Map<String, Map<Integer, Long>> oldConsumerOffsets = getOldConsumerOffsetsFromZk(topicPartitions, oldKafkaSpoutOffsetQuery);
for (Map.Entry<String, Map<Integer, Long>> topicOffsets: logHeadOffsets.entrySet()) {
for (Map.Entry<Integer, Long> partitionOffsets: topicOffsets.getValue().entrySet()) {
Long consumerCommittedOffset = oldConsumerOffsets.get(topicOffsets.getKey()) != null ? oldConsumerOffsets.get(topicOffsets.getKey()).get
(partitionOffsets.getKey()) : -1;
consumerCommittedOffset = (consumerCommittedOffset == null ? -1 : consumerCommittedOffset);
KafkaOffsetLagResult kafkaOffsetLagResult = new KafkaOffsetLagResult(topicOffsets.getKey(), partitionOffsets.getKey(),
consumerCommittedOffset, partitionOffsets.getValue());
result.add(kafkaOffsetLagResult);
}
}
}
return result;
}
private static Map<String, List<TopicPartition>> getLeadersAndTopicPartitions (OldKafkaSpoutOffsetQuery oldKafkaSpoutOffsetQuery) throws Exception {
Map<String, List<TopicPartition>> result = new HashMap<>();
// this means that kafka spout was configured with StaticHosts hosts (leader for partition)
if (oldKafkaSpoutOffsetQuery.getPartitions() != null) {
String[] partitions = oldKafkaSpoutOffsetQuery.getPartitions().split(",");
String[] leaders = oldKafkaSpoutOffsetQuery.getLeaders().split(",");
for (int i = 0; i < leaders.length; ++i) {
if (!result.containsKey(leaders[i])) {
result.put(leaders[i], new ArrayList<TopicPartition>());
}
result.get(leaders[i]).add(new TopicPartition(oldKafkaSpoutOffsetQuery.getTopic(), Integer.parseInt(partitions[i])));
}
} else {
// else use zk nodes to figure out partitions and leaders for topics i.e. ZkHosts
CuratorFramework curatorFramework = null;
try {
String brokersZkNode = oldKafkaSpoutOffsetQuery.getBrokersZkPath();
if (!brokersZkNode.endsWith("/")) {
brokersZkNode += "/";
}
String topicsZkPath = brokersZkNode + "topics";
curatorFramework = CuratorFrameworkFactory.newClient(oldKafkaSpoutOffsetQuery.getZkServers(), 20000, 15000, new RetryOneTime(1000));
curatorFramework.start();
List<String> topics = new ArrayList<>();
if (oldKafkaSpoutOffsetQuery.isWildCardTopic()) {
List<String> children = curatorFramework.getChildren().forPath(topicsZkPath);
for (String child : children) {
if (child.matches(oldKafkaSpoutOffsetQuery.getTopic())) {
topics.add(child);
}
}
} else {
topics.add(oldKafkaSpoutOffsetQuery.getTopic());
}
for (String topic: topics) {
String partitionsPath = topicsZkPath + "/" + topic + "/partitions";
List<String> children = curatorFramework.getChildren().forPath(partitionsPath);
for (int i = 0; i < children.size(); ++i) {
byte[] leaderData = curatorFramework.getData().forPath(partitionsPath + "/" + i + "/state");
Map<Object, Object> value = (Map<Object, Object>) JSONValue.parseWithException(new String(leaderData, "UTF-8"));
Integer leader = ((Number) value.get("leader")).intValue();
byte[] brokerData = curatorFramework.getData().forPath(brokersZkNode + "ids/" + leader);
Map<Object, Object> broker = (Map<Object, Object>) JSONValue.parseWithException(new String(brokerData, "UTF-8"));
String host = (String) broker.get("host");
Integer port = ((Long) broker.get("port")).intValue();
String leaderBroker = host + ":" + port;
if (!result.containsKey(leaderBroker)) {
result.put(leaderBroker, new ArrayList<TopicPartition>());
}
result.get(leaderBroker).add(new TopicPartition(topic, i));
}
}
} finally {
if (curatorFramework != null) {
curatorFramework.close();
}
}
}
return result;
}
private static Map<String, Map<Integer, Long>> getLogHeadOffsets (Map<String, List<TopicPartition>> leadersAndTopicPartitions) {
Map<String, Map<Integer, Long>> result = new HashMap<>();
if (leadersAndTopicPartitions != null) {
PartitionOffsetRequestInfo partitionOffsetRequestInfo = new PartitionOffsetRequestInfo(OffsetRequest.LatestTime(), 1);
SimpleConsumer simpleConsumer = null;
for (Map.Entry<String, List<TopicPartition>> leader: leadersAndTopicPartitions.entrySet()) {
try {
simpleConsumer = new SimpleConsumer(leader.getKey().split(":")[0], Integer.parseInt(leader.getKey().split(":")[1]), 10000, 64 *
1024, "LogHeadOffsetRequest");
Map<TopicAndPartition, PartitionOffsetRequestInfo> requestInfo = new HashMap<TopicAndPartition, PartitionOffsetRequestInfo>();
for (TopicPartition topicPartition : leader.getValue()) {
requestInfo.put(new TopicAndPartition(topicPartition.topic(), topicPartition.partition()), partitionOffsetRequestInfo);
if (!result.containsKey(topicPartition.topic())) {
result.put(topicPartition.topic(), new HashMap<Integer, Long>());
}
}
kafka.javaapi.OffsetRequest request = new kafka.javaapi.OffsetRequest(requestInfo, kafka.api.OffsetRequest.CurrentVersion(),
"LogHeadOffsetRequest");
OffsetResponse response = simpleConsumer.getOffsetsBefore(request);
for (TopicPartition topicPartition : leader.getValue()) {
result.get(topicPartition.topic()).put(topicPartition.partition(), response.offsets(topicPartition.topic(), topicPartition.partition())[0]);
}
} finally {
if (simpleConsumer != null) {
simpleConsumer.close();
}
}
}
}
return result;
}
private static Map<String, Map<Integer, Long>> getOldConsumerOffsetsFromZk (Map<String, List<Integer>> topicPartitions, OldKafkaSpoutOffsetQuery
oldKafkaSpoutOffsetQuery) throws Exception {
Map<String, Map<Integer, Long>> result = new HashMap<>();
CuratorFramework curatorFramework = CuratorFrameworkFactory.newClient(oldKafkaSpoutOffsetQuery.getZkServers(), 20000, 15000, new RetryOneTime(1000));
curatorFramework.start();
String partitionPrefix = "partition_";
String zkPath = oldKafkaSpoutOffsetQuery.getZkPath();
if (zkPath.endsWith("/")) {
zkPath = zkPath.substring(0, zkPath.length()-1);
}
if (curatorFramework.checkExists().forPath(zkPath) == null) {
throw new IllegalArgumentException(OPTION_ZK_COMMITTED_NODE_LONG+" '"+zkPath+"' dose not exists.");
}
byte[] zkData;
try {
if (topicPartitions != null) {
for (Map.Entry<String, List<Integer>> topicEntry: topicPartitions.entrySet()) {
Map<Integer, Long> partitionOffsets = new HashMap<>();
for (Integer partition: topicEntry.getValue()) {
String path = zkPath + "/" + (oldKafkaSpoutOffsetQuery.isWildCardTopic() ? topicEntry.getKey() + "/" : "") + partitionPrefix + partition;
if (curatorFramework.checkExists().forPath(path) != null) {
zkData = curatorFramework.getData().forPath(path);
Map<Object, Object> offsetData = (Map<Object, Object>) JSONValue.parseWithException(new String(zkData, "UTF-8"));
partitionOffsets.put(partition, (Long) offsetData.get("offset"));
}
}
result.put(topicEntry.getKey(), partitionOffsets);
}
}
} finally {
if (curatorFramework != null) {
curatorFramework.close();
}
}
return result;
}
}