blob: bb327eee25c299a4d9ebc23f7f3e0da059545b3d [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.storm.utils;
import org.apache.storm.Config;
import org.apache.storm.generated.ComponentCommon;
import org.apache.storm.generated.ComponentObject;
import org.apache.storm.generated.SpoutSpec;
import org.apache.storm.generated.StormTopology;
import org.json.simple.JSONValue;
import org.json.simple.parser.ParseException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
public class TopologySpoutLag {
private static final String SPOUT_ID = "spoutId";
private static final String SPOUT_TYPE= "spoutType";
private static final String SPOUT_LAG_RESULT = "spoutLagResult";
private static final String ERROR_INFO = "errorInfo";
private final static Logger logger = LoggerFactory.getLogger(TopologySpoutLag.class);
public static Map<String, Map<String, Object>> lag (StormTopology stormTopology, Map topologyConf) {
Map<String, Map<String, Object>> result = new HashMap<>();
Map<String, SpoutSpec> spouts = stormTopology.get_spouts();
String className = null;
for (Map.Entry<String, SpoutSpec> spout: spouts.entrySet()) {
try {
SpoutSpec spoutSpec = spout.getValue();
ComponentObject componentObject = spoutSpec.get_spout_object();
// FIXME: yes it's a trick so we might be better to find alternative way...
className = getClassNameFromComponentObject(componentObject);
logger.debug("spout classname: {}", className);
if (className.endsWith("storm.kafka.spout.KafkaSpout")) {
result.put(spout.getKey(), getLagResultForNewKafkaSpout(spout.getKey(), spoutSpec, topologyConf));
} else if (className.endsWith("storm.kafka.KafkaSpout")) {
result.put(spout.getKey(), getLagResultForOldKafkaSpout(spout.getKey(), spoutSpec, topologyConf));
}
} catch (Exception e) {
logger.warn("Exception thrown while getting lag for spout id: " + spout.getKey() + " and spout class: " + className);
logger.warn("Exception message:" + e.getMessage(), e);
}
}
return result;
}
private static String getClassNameFromComponentObject(ComponentObject componentObject) {
try {
Object object = Utils.getSetComponentObject(componentObject);
return object.getClass().getCanonicalName();
} catch (RuntimeException e) {
if (e.getCause() instanceof ClassNotFoundException) {
return e.getCause().getMessage().trim();
}
throw e;
}
}
private static List<String> getCommandLineOptionsForNewKafkaSpout (Map<String, Object> jsonConf) {
logger.debug("json configuration: {}", jsonConf);
List<String> commands = new ArrayList<>();
String configKeyPrefix = "config.";
commands.add("-t");
commands.add((String)jsonConf.get(configKeyPrefix + "topics"));
commands.add("-g");
commands.add((String)jsonConf.get(configKeyPrefix + "groupid"));
commands.add("-b");
commands.add((String)jsonConf.get(configKeyPrefix + "bootstrap.servers"));
String securityProtocol = (String)jsonConf.get(configKeyPrefix + "security.protocol");
if (securityProtocol != null && !securityProtocol.isEmpty()) {
commands.add("-s");
commands.add(securityProtocol);
}
return commands;
}
private static List<String> getCommandLineOptionsForOldKafkaSpout (Map<String, Object> jsonConf, Map topologyConf) {
logger.debug("json configuration: {}", jsonConf);
List<String> commands = new ArrayList<>();
String configKeyPrefix = "config.";
commands.add("-o");
commands.add("-t");
commands.add((String)jsonConf.get(configKeyPrefix + "topics"));
commands.add("-n");
commands.add((String)jsonConf.get(configKeyPrefix + "zkRoot"));
String zkServers = (String)jsonConf.get(configKeyPrefix + "zkServers");
if (zkServers == null || zkServers.isEmpty()) {
StringBuilder zkServersBuilder = new StringBuilder();
Integer zkPort = ((Number) topologyConf.get(Config.STORM_ZOOKEEPER_PORT)).intValue();
for (String zkServer: (List<String>) topologyConf.get(Config.STORM_ZOOKEEPER_SERVERS)) {
zkServersBuilder.append(zkServer + ":" + zkPort + ",");
}
zkServers = zkServersBuilder.toString();
}
commands.add("-z");
commands.add(zkServers);
if (jsonConf.get(configKeyPrefix + "leaders") != null) {
commands.add("-p");
commands.add((String)jsonConf.get(configKeyPrefix + "partitions"));
commands.add("-l");
commands.add((String)jsonConf.get(configKeyPrefix + "leaders"));
} else {
commands.add("-r");
commands.add((String)jsonConf.get(configKeyPrefix + "zkNodeBrokers"));
Boolean isWildCard = (Boolean) topologyConf.get("kafka.topic.wildcard.match");
if (isWildCard != null && isWildCard.booleanValue()) {
commands.add("-w");
}
}
return commands;
}
private static Map<String, Object> getLagResultForKafka (String spoutId, SpoutSpec spoutSpec, Map topologyConf, boolean old) throws IOException {
ComponentCommon componentCommon = spoutSpec.get_common();
String json = componentCommon.get_json_conf();
Map<String, Object> result = null;
String errorMsg = "Offset lags for kafka not supported for older versions. Please update kafka spout to latest version.";
if (json != null && !json.isEmpty()) {
List<String> commands = new ArrayList<>();
String stormHomeDir = System.getenv("STORM_BASE_DIR");
if (stormHomeDir != null && !stormHomeDir.endsWith("/")) {
stormHomeDir += File.separator;
}
commands.add(stormHomeDir != null ? stormHomeDir + "bin" + File.separator + "storm-kafka-monitor" : "storm-kafka-monitor");
Map<String, Object> jsonMap = null;
try {
jsonMap = (Map<String, Object>) JSONValue.parseWithException(json);
} catch (ParseException e) {
throw new IOException(e);
}
commands.addAll(old ? getCommandLineOptionsForOldKafkaSpout(jsonMap, topologyConf) : getCommandLineOptionsForNewKafkaSpout(jsonMap));
logger.debug("Command to run: {}", commands);
// if commands contains one or more null value, spout is compiled with lower version of storm-kafka / storm-kafka-client
if (!commands.contains(null)) {
String resultFromMonitor = ShellUtils.execCommand(commands.toArray(new String[0]));
try {
result = (Map<String, Object>) JSONValue.parseWithException(resultFromMonitor);
} catch (ParseException e) {
logger.debug("JSON parsing failed, assuming message as error message: {}", resultFromMonitor);
// json parsing fail -> error received
errorMsg = resultFromMonitor;
}
}
}
Map<String, Object> kafkaSpoutLagInfo = new HashMap<>();
kafkaSpoutLagInfo.put(SPOUT_ID, spoutId);
kafkaSpoutLagInfo.put(SPOUT_TYPE, "KAFKA");
if (result != null) {
kafkaSpoutLagInfo.put(SPOUT_LAG_RESULT, result);
} else {
kafkaSpoutLagInfo.put(ERROR_INFO, errorMsg);
}
return kafkaSpoutLagInfo;
}
private static Map<String, Object> getLagResultForNewKafkaSpout (String spoutId, SpoutSpec spoutSpec, Map topologyConf) throws IOException {
return getLagResultForKafka(spoutId, spoutSpec, topologyConf, false);
}
private static Map<String, Object> getLagResultForOldKafkaSpout (String spoutId, SpoutSpec spoutSpec, Map topologyConf) throws IOException {
return getLagResultForKafka(spoutId, spoutSpec, topologyConf, true);
}
}