blob: ec90d15942b7bbd7d104e94c2ddcf6e9dc688787 [file] [log] [blame]
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions
* and limitations under the License.
package org.apache.storm.utils;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import org.apache.storm.generated.ComponentCommon;
import org.apache.storm.generated.SpoutSpec;
import org.apache.storm.generated.StormTopology;
import org.json.simple.JSONValue;
import org.json.simple.parser.ParseException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class TopologySpoutLag {
// FIXME: This class can be moved to webapp once UI porting is done.
private static final String SPOUT_ID = "spoutId";
private static final String SPOUT_TYPE = "spoutType";
private static final String SPOUT_LAG_RESULT = "spoutLagResult";
private static final String ERROR_INFO = "errorInfo";
private static final String CONFIG_KEY_PREFIX = "config.";
private static final String TOPICS_CONFIG = CONFIG_KEY_PREFIX + "topics";
private static final String GROUPID_CONFIG = CONFIG_KEY_PREFIX + "groupid";
private static final String BOOTSTRAP_CONFIG = CONFIG_KEY_PREFIX + "bootstrap.servers";
private static final String SECURITY_PROTOCOL_CONFIG = CONFIG_KEY_PREFIX + "security.protocol";
private static final Set<String> ALL_CONFIGS = new HashSet<>(Arrays.asList(TOPICS_CONFIG, GROUPID_CONFIG,
private static final Logger LOGGER = LoggerFactory.getLogger(TopologySpoutLag.class);
public static Map<String, Map<String, Object>> lag(StormTopology stormTopology, Map<String, Object> topologyConf) {
Map<String, Map<String, Object>> result = new HashMap<>();
Map<String, SpoutSpec> spouts = stormTopology.get_spouts();
for (Map.Entry<String, SpoutSpec> spout: spouts.entrySet()) {
try {
SpoutSpec spoutSpec = spout.getValue();
addLagResultForKafkaSpout(result, spout.getKey(), spoutSpec);
} catch (Exception e) {
LOGGER.warn("Exception thrown while getting lag for spout id: " + spout.getKey());
LOGGER.warn("Exception message:" + e.getMessage(), e);
return result;
private static List<String> getCommandLineOptionsForNewKafkaSpout(Map<String, Object> jsonConf) {
LOGGER.debug("json configuration: {}", jsonConf);
List<String> commands = new ArrayList<>();
commands.add((String) jsonConf.get(TOPICS_CONFIG));
commands.add((String) jsonConf.get(GROUPID_CONFIG));
commands.add((String) jsonConf.get(BOOTSTRAP_CONFIG));
String securityProtocol = (String) jsonConf.get(SECURITY_PROTOCOL_CONFIG);
if (!Strings.isNullOrEmpty(securityProtocol)) {
return commands;
private static File createExtraPropertiesFile(Map<String, Object> jsonConf) {
File file = null;
Map<String, String> extraProperties = new HashMap<>();
for (Map.Entry<String, Object> conf: jsonConf.entrySet()) {
if (conf.getKey().startsWith(CONFIG_KEY_PREFIX) && !ALL_CONFIGS.contains(conf.getKey())) {
extraProperties.put(conf.getKey().substring(CONFIG_KEY_PREFIX.length()), conf.getValue().toString());
if (!extraProperties.isEmpty()) {
try {
file = File.createTempFile("kafka-consumer-extra", "props");
Properties properties = new Properties();
try (FileOutputStream fos = new FileOutputStream(file)) {, "Kafka consumer extra properties");
} catch (IOException ex) {
// ignore
return file;
private static void addLagResultForKafkaSpout(Map<String, Map<String, Object>> finalResult, String spoutId, SpoutSpec spoutSpec)
throws IOException {
ComponentCommon componentCommon = spoutSpec.get_common();
String json = componentCommon.get_json_conf();
if (!Strings.isNullOrEmpty(json)) {
Map<String, Object> jsonMap = null;
try {
jsonMap = (Map<String, Object>) JSONValue.parseWithException(json);
} catch (ParseException e) {
throw new IOException(e);
if (jsonMap.containsKey(TOPICS_CONFIG)
&& jsonMap.containsKey(GROUPID_CONFIG)
&& jsonMap.containsKey(BOOTSTRAP_CONFIG)) {
finalResult.put(spoutId, getLagResultForNewKafkaSpout(spoutId, spoutSpec));
private static Map<String, Object> getLagResultForKafka(String spoutId, SpoutSpec spoutSpec) throws IOException {
ComponentCommon componentCommon = spoutSpec.get_common();
String json = componentCommon.get_json_conf();
Map<String, Object> result = null;
String errorMsg = new StringBuilder("Make sure Kafka spout version is latest and ")
.append(", ")
.append(" & ")
.append(" are not null for newer versions of Kafka spout.")
if (!Strings.isNullOrEmpty(json)) {
List<String> commands = new ArrayList<>();
String stormHomeDir = System.getenv("STORM_BASE_DIR");
if (stormHomeDir != null && !stormHomeDir.endsWith("/")) {
stormHomeDir += File.separator;
commands.add(stormHomeDir != null ? stormHomeDir + "bin" + File.separator + "storm-kafka-monitor" : "storm-kafka-monitor");
Map<String, Object> jsonMap = null;
try {
jsonMap = (Map<String, Object>) JSONValue.parseWithException(json);
} catch (ParseException e) {
throw new IOException(e);
File extraPropertiesFile = createExtraPropertiesFile(jsonMap);
if (extraPropertiesFile != null) {
LOGGER.debug("Command to run: {}", commands);
// if commands contains one or more null value, spout is compiled with lower version of storm-kafka-client
if (!commands.contains(null)) {
try {
String resultFromMonitor = new ShellCommandRunnerImpl().execCommand(commands.toArray(new String[0]));
try {
result = (Map<String, Object>) JSONValue.parseWithException(resultFromMonitor);
} catch (ParseException e) {
LOGGER.debug("JSON parsing failed, assuming message as error message: {}", resultFromMonitor);
// json parsing fail -> error received
errorMsg = resultFromMonitor;
} finally {
if (extraPropertiesFile != null) {
Map<String, Object> kafkaSpoutLagInfo = new HashMap<>();
kafkaSpoutLagInfo.put(SPOUT_ID, spoutId);
kafkaSpoutLagInfo.put(SPOUT_TYPE, "KAFKA");
if (result != null) {
kafkaSpoutLagInfo.put(SPOUT_LAG_RESULT, result);
} else {
kafkaSpoutLagInfo.put(ERROR_INFO, errorMsg);
return kafkaSpoutLagInfo;
private static Map<String, Object> getLagResultForNewKafkaSpout(String spoutId, SpoutSpec spoutSpec) throws IOException {
return getLagResultForKafka(spoutId, spoutSpec);