| /** |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package org.apache.metron.parsers.topology; |
| |
| import com.google.common.base.Joiner; |
| import java.io.File; |
| import java.io.IOException; |
| import java.util.ArrayList; |
| import java.util.Arrays; |
| import java.util.Collections; |
| import java.util.List; |
| import java.util.ListIterator; |
| import java.util.Map; |
| import java.util.Optional; |
| import java.util.function.Function; |
| import java.util.stream.Collectors; |
| import org.apache.commons.cli.CommandLine; |
| import org.apache.commons.cli.CommandLineParser; |
| import org.apache.commons.cli.HelpFormatter; |
| import org.apache.commons.cli.Option; |
| import org.apache.commons.cli.Options; |
| import org.apache.commons.cli.ParseException; |
| import org.apache.commons.cli.PosixParser; |
| import org.apache.commons.io.FileUtils; |
| import org.apache.kafka.common.protocol.SecurityProtocol; |
| import org.apache.metron.common.Constants; |
| import org.apache.metron.common.configuration.SensorParserConfig; |
| import org.apache.metron.common.utils.JSONUtils; |
| import org.apache.metron.common.utils.KafkaUtils; |
| import org.apache.metron.parsers.topology.config.Arg; |
| import org.apache.metron.parsers.topology.config.ConfigHandlers; |
| import org.apache.metron.parsers.topology.config.ValueSupplier; |
| import org.apache.metron.storm.kafka.flux.SpoutConfiguration; |
| import org.apache.storm.Config; |
| import org.apache.storm.LocalCluster; |
| import org.apache.storm.StormSubmitter; |
| import org.apache.storm.utils.Utils; |
| |
| public class ParserTopologyCLI { |
| |
| private static final String STORM_JOB_SEPARATOR = "__"; |
| |
| public enum ParserOptions { |
| HELP("h", code -> { |
| Option o = new Option(code, "help", false, "This screen"); |
| o.setRequired(false); |
| return o; |
| }), |
| ZK_QUORUM("z", code -> { |
| Option o = new Option(code, "zk", true, "Zookeeper Quorum URL (zk1:2181,zk2:2181,..."); |
| o.setArgName("ZK_QUORUM"); |
| o.setRequired(true); |
| return o; |
| }), |
| BROKER_URL("k", code -> { |
| Option o = new Option(code, "kafka", true, "Kafka Broker URL"); |
| o.setArgName("BROKER_URL"); |
| o.setRequired(false); |
| return o; |
| }), |
| SENSOR_TYPES("s", code -> { |
| Option o = new Option(code, "sensor", true, "Sensor Types as comma-separated list"); |
| o.setArgName("SENSOR_TYPES"); |
| o.setRequired(true); |
| return o; |
| }), |
| SPOUT_PARALLELISM("sp", code -> { |
| Option o = new Option(code, "spout_p", true, "Spout Parallelism Hint. If multiple sensors are specified, this should be a comma separated list in the same order."); |
| o.setArgName("SPOUT_PARALLELISM_HINT"); |
| o.setRequired(false); |
| o.setType(Number.class); |
| return o; |
| }), |
| PARSER_PARALLELISM("pp", code -> { |
| Option o = new Option(code, "parser_p", true, "Parser Parallelism Hint"); |
| o.setArgName("PARALLELISM_HINT"); |
| o.setRequired(false); |
| o.setType(Number.class); |
| return o; |
| }), |
| INVALID_WRITER_PARALLELISM("iwp", code -> { |
| Option o = new Option(code, "invalid_writer_p", true, "Invalid Message Writer Parallelism Hint"); |
| o.setArgName("PARALLELISM_HINT"); |
| o.setRequired(false); |
| o.setType(Number.class); |
| return o; |
| }), |
| ERROR_WRITER_PARALLELISM("ewp", code -> { |
| Option o = new Option(code, "error_writer_p", true, "Error Writer Parallelism Hint"); |
| o.setArgName("PARALLELISM_HINT"); |
| o.setRequired(false); |
| o.setType(Number.class); |
| return o; |
| }), |
| SPOUT_NUM_TASKS("snt", code -> { |
| Option o = new Option(code, "spout_num_tasks", true, "Spout Num Tasks. If multiple sensors are specified, this should be a comma separated list in the same order."); |
| o.setArgName("NUM_TASKS"); |
| o.setRequired(false); |
| o.setType(Number.class); |
| return o; |
| }), |
| PARSER_NUM_TASKS("pnt", code -> { |
| Option o = new Option(code, "parser_num_tasks", true, "Parser Num Tasks"); |
| o.setArgName("NUM_TASKS"); |
| o.setRequired(false); |
| o.setType(Number.class); |
| return o; |
| }), |
| INVALID_WRITER_NUM_TASKS("iwnt", code -> { |
| Option o = new Option(code, "invalid_writer_num_tasks", true, "Invalid Writer Num Tasks"); |
| o.setArgName("NUM_TASKS"); |
| o.setRequired(false); |
| o.setType(Number.class); |
| return o; |
| }), |
| ERROR_WRITER_NUM_TASKS("ewnt", code -> { |
| Option o = new Option(code, "error_writer_num_tasks", true, "Error Writer Num Tasks"); |
| o.setArgName("NUM_TASKS"); |
| o.setRequired(false); |
| o.setType(Number.class); |
| return o; |
| }), |
| NUM_WORKERS("nw", code -> { |
| Option o = new Option(code, "num_workers", true, "Number of Workers"); |
| o.setArgName("NUM_WORKERS"); |
| o.setRequired(false); |
| o.setType(Number.class); |
| return o; |
| }, new ConfigHandlers.SetNumWorkersHandler() |
| ) |
| ,NUM_ACKERS("na", code -> { |
| Option o = new Option(code, "num_ackers", true, "Number of Ackers"); |
| o.setArgName("NUM_ACKERS"); |
| o.setRequired(false); |
| o.setType(Number.class); |
| return o; |
| }, new ConfigHandlers.SetNumAckersHandler() |
| ) |
| ,NUM_MAX_TASK_PARALLELISM("mtp", code -> { |
| Option o = new Option(code, "max_task_parallelism", true, "Max task parallelism"); |
| o.setArgName("MAX_TASK"); |
| o.setRequired(false); |
| o.setType(Number.class); |
| return o; |
| }, new ConfigHandlers.SetMaxTaskParallelismHandler() |
| ) |
| ,MESSAGE_TIMEOUT("mt", code -> { |
| Option o = new Option(code, "message_timeout", true, "Message Timeout in Seconds"); |
| o.setArgName("TIMEOUT_IN_SECS"); |
| o.setRequired(false); |
| o.setType(Number.class); |
| return o; |
| }, new ConfigHandlers.SetMessageTimeoutHandler() |
| ) |
| ,EXTRA_OPTIONS("e", code -> { |
| Option o = new Option(code, "extra_topology_options", true |
| , "Extra options in the form of a JSON file with a map for content." + |
| " Available options are those in the Kafka Consumer Configs at http://kafka.apache.org/0100/documentation.html#newconsumerconfigs" + |
| " and " + Joiner.on(",").join(SpoutConfiguration.allOptions()) |
| ); |
| o.setArgName("JSON_FILE"); |
| o.setRequired(false); |
| o.setType(String.class); |
| return o; |
| }, new ConfigHandlers.LoadJSONHandler() |
| ) |
| ,SPOUT_CONFIG("esc", code -> { |
| Option o = new Option(code |
| , "extra_kafka_spout_config" |
| , true |
| , "Extra spout config options in the form of a JSON file with a map for content." |
| ); |
| o.setArgName("JSON_FILE"); |
| o.setRequired(false); |
| o.setType(String.class); |
| return o; |
| } |
| ) |
| ,SECURITY_PROTOCOL("ksp", code -> { |
| Option o = new Option(code |
| , "kafka_security_protocol" |
| , true |
| , "The kafka security protocol to use (if running with a kerberized cluster). E.g. PLAINTEXTSASL" |
| ); |
| o.setArgName("SECURITY_PROTOCOL"); |
| o.setRequired(false); |
| o.setType(String.class); |
| return o; |
| } |
| ) |
| ,OUTPUT_TOPIC("ot", code -> { |
| Option o = new Option(code |
| , "output_topic" |
| , true |
| , "The output kafka topic for the parser. If unset, the default is " + Constants.ENRICHMENT_TOPIC |
| ); |
| o.setArgName("KAFKA_TOPIC"); |
| o.setRequired(false); |
| o.setType(String.class); |
| return o; |
| } |
| ) |
| ,TEST("t", code -> |
| { |
| Option o = new Option("t", "test", true, "Run in Test Mode"); |
| o.setArgName("TEST"); |
| o.setRequired(false); |
| return o; |
| }) |
| ; |
| Option option; |
| String shortCode; |
| Function<Arg, Config> configHandler; |
| ParserOptions(String shortCode |
| , Function<String, Option> optionHandler |
| ) { |
| this(shortCode, optionHandler, arg -> arg.getConfig()); |
| } |
| ParserOptions(String shortCode |
| , Function<String, Option> optionHandler |
| , Function<Arg, Config> configHandler |
| ) { |
| this.shortCode = shortCode; |
| this.option = optionHandler.apply(shortCode); |
| this.configHandler = configHandler; |
| } |
| |
| public boolean has(CommandLine cli) { |
| return cli.hasOption(shortCode); |
| } |
| |
| public String get(CommandLine cli) { |
| return cli.getOptionValue(shortCode); |
| } |
| public String get(CommandLine cli, String def) { |
| return has(cli)?cli.getOptionValue(shortCode):def; |
| } |
| |
| public static Optional<Config> getConfig(CommandLine cli) { |
| return getConfig(cli, new Config()); |
| } |
| |
| public static Optional<Config> getConfig(CommandLine cli, Config config) { |
| if(EXTRA_OPTIONS.has(cli)) { |
| Map<String, Object> extraOptions = readJSONMapFromFile(new File(EXTRA_OPTIONS.get(cli))); |
| config.putAll(extraOptions); |
| } |
| for(ParserOptions option : ParserOptions.values()) { |
| config = option.configHandler.apply(new Arg(config, option.get(cli))); |
| } |
| return config.isEmpty()?Optional.empty():Optional.of(config); |
| } |
| |
| public static CommandLine parse(CommandLineParser parser, String[] args) throws ParseException { |
| try { |
| CommandLine cli = parser.parse(getOptions(), args); |
| if(HELP.has(cli)) { |
| printHelp(); |
| System.exit(0); |
| } |
| return cli; |
| } catch (ParseException e) { |
| System.err.println("Unable to parse args: " + Joiner.on(' ').join(args)); |
| e.printStackTrace(System.err); |
| printHelp(); |
| throw e; |
| } |
| } |
| |
| public static void printHelp() { |
| HelpFormatter formatter = new HelpFormatter(); |
| formatter.printHelp( "ParserTopologyCLI", getOptions()); |
| } |
| |
| public static Options getOptions() { |
| Options ret = new Options(); |
| for(ParserOptions o : ParserOptions.values()) { |
| ret.addOption(o.option); |
| } |
| return ret; |
| } |
| } |
| |
| private static CommandLine parse(Options options, String[] args) { |
| /* |
| * The general gist is that in order to pass args to storm jar, |
| * we have to disregard options that we don't know about in the CLI. |
| * Storm will ignore our args, we have to do the same. |
| */ |
| CommandLineParser parser = new PosixParser() { |
| @Override |
| protected void processOption(String arg, ListIterator iter) throws ParseException { |
| if(getOptions().hasOption(arg)) { |
| super.processOption(arg, iter); |
| } |
| } |
| }; |
| try { |
| return ParserOptions.parse(parser, args); |
| } catch (ParseException pe) { |
| pe.printStackTrace(); |
| final HelpFormatter usageFormatter = new HelpFormatter(); |
| usageFormatter.printHelp("ParserTopologyCLI", null, options, null, true); |
| System.exit(-1); |
| return null; |
| } |
| } |
| |
| public ParserTopologyBuilder.ParserTopology createParserTopology(final CommandLine cmd) throws Exception { |
| String zookeeperUrl = ParserOptions.ZK_QUORUM.get(cmd); |
| Optional<String> brokerUrl = ParserOptions.BROKER_URL.has(cmd)?Optional.of(ParserOptions.BROKER_URL.get(cmd)):Optional.empty(); |
| String sensorTypeRaw= ParserOptions.SENSOR_TYPES.get(cmd); |
| List<String> sensorTypes = Arrays.stream(sensorTypeRaw.split(",")).map(String::trim).collect( |
| Collectors.toList()); |
| |
| /* |
| * It bears mentioning why we're creating this ValueSupplier indirection here. |
| * As a separation of responsibilities, the CLI class defines the order of precedence |
| * for the various topological and structural properties for creating a parser. This is |
| * desirable because there are now (i.e. integration tests) |
| * and may be in the future (i.e. a REST service to start parsers without using the CLI) |
| * other mechanisms to construct parser topologies. It's sensible to split those concerns.. |
| * |
| * Unfortunately, determining the structural parameters for a parser requires interacting with |
| * external services (e.g. zookeeper) that are set up well within the ParserTopology class. |
| * Rather than pulling the infrastructure to interact with those services out and moving it into the |
| * CLI class and breaking that separation of concerns, we've created a supplier |
| * indirection where are providing the logic as to how to create precedence in the CLI class |
| * without owning the responsibility of constructing the infrastructure where the values are |
| * necessarily supplied. |
| * |
| */ |
| |
| // kafka spout parallelism |
| ValueSupplier<List> spoutParallelism = (parserConfigs, clazz) -> { |
| if(ParserOptions.SPOUT_PARALLELISM.has(cmd)) { |
| // Handle the case where there's only one and we can default reasonably |
| if( parserConfigs.size() == 1) { |
| return Collections.singletonList(Integer.parseInt(ParserOptions.SPOUT_PARALLELISM.get(cmd, "1"))); |
| } |
| |
| // Handle the multiple explicitly passed spout parallelism's case. |
| String parallelismRaw = ParserOptions.SPOUT_PARALLELISM.get(cmd, "1"); |
| List<String> parallelisms = Arrays.stream(parallelismRaw.split(",")).map(String::trim).collect( |
| Collectors.toList()); |
| if (parallelisms.size() != parserConfigs.size()) { |
| throw new IllegalArgumentException("Spout parallelism should match number of sensors 1:1"); |
| } |
| List<Integer> spoutParallelisms = new ArrayList<>(); |
| for (String s : parallelisms) { |
| spoutParallelisms.add(Integer.parseInt(s)); |
| } |
| return spoutParallelisms; |
| } |
| |
| List<Integer> spoutParallelisms = new ArrayList<>(); |
| for (SensorParserConfig parserConfig : parserConfigs) { |
| spoutParallelisms.add(parserConfig.getSpoutParallelism()); |
| } |
| return spoutParallelisms; |
| }; |
| |
| // kafka spout number of tasks |
| ValueSupplier<List> spoutNumTasks = (parserConfigs, clazz) -> { |
| if(ParserOptions.SPOUT_NUM_TASKS.has(cmd)) { |
| // Handle the case where there's only one and we can default reasonably |
| if( parserConfigs.size() == 1) { |
| return Collections.singletonList(Integer.parseInt(ParserOptions.SPOUT_NUM_TASKS.get(cmd, "1"))); |
| } |
| |
| // Handle the multiple explicitly passed spout parallelism's case. |
| String numTasksRaw = ParserOptions.SPOUT_NUM_TASKS.get(cmd, "1"); |
| List<String> numTasks = Arrays.stream(numTasksRaw.split(",")).map(String::trim).collect( |
| Collectors.toList()); |
| if (numTasks.size() != parserConfigs.size()) { |
| throw new IllegalArgumentException("Spout num tasks should match number of sensors 1:1"); |
| } |
| List<Integer> spoutTasksList = new ArrayList<>(); |
| for (String s : numTasks) { |
| spoutTasksList.add(Integer.parseInt(s)); |
| } |
| return spoutTasksList; |
| } |
| |
| List<Integer> numTasks = new ArrayList<>(); |
| for (SensorParserConfig parserConfig : parserConfigs) { |
| numTasks.add(parserConfig.getSpoutNumTasks()); |
| } |
| return numTasks; |
| }; |
| |
| // parser bolt parallelism |
| ValueSupplier<Integer> parserParallelism = (parserConfigs, clazz) -> { |
| if(ParserOptions.PARSER_PARALLELISM.has(cmd)) { |
| return Integer.parseInt(ParserOptions.PARSER_PARALLELISM.get(cmd, "1")); |
| } |
| int retValue = 1; |
| for (SensorParserConfig config : parserConfigs) { |
| Integer configValue = config.getParserParallelism(); |
| retValue = configValue == null ? retValue : configValue; |
| } |
| return retValue; |
| }; |
| |
| // parser bolt number of tasks |
| ValueSupplier<Integer> parserNumTasks = (parserConfigs, clazz) -> { |
| if(ParserOptions.PARSER_NUM_TASKS.has(cmd)) { |
| return Integer.parseInt(ParserOptions.PARSER_NUM_TASKS.get(cmd, "1")); |
| } |
| int retValue = 1; |
| for (SensorParserConfig config : parserConfigs) { |
| Integer configValue = config.getParserNumTasks(); |
| retValue = configValue == null ? retValue : configValue; |
| } |
| return retValue; |
| }; |
| |
| // error bolt parallelism |
| ValueSupplier<Integer> errorParallelism = (parserConfigs, clazz) -> { |
| if(ParserOptions.ERROR_WRITER_PARALLELISM.has(cmd)) { |
| return Integer.parseInt(ParserOptions.ERROR_WRITER_PARALLELISM.get(cmd, "1")); |
| } |
| int retValue = 1; |
| for (SensorParserConfig config : parserConfigs) { |
| Integer configValue = config.getErrorWriterParallelism(); |
| retValue = configValue == null ? retValue : configValue; |
| } |
| return retValue; |
| }; |
| |
| // error bolt number of tasks |
| ValueSupplier<Integer> errorNumTasks = (parserConfigs, clazz) -> { |
| if(ParserOptions.ERROR_WRITER_NUM_TASKS.has(cmd)) { |
| return Integer.parseInt(ParserOptions.ERROR_WRITER_NUM_TASKS.get(cmd, "1")); |
| } |
| int retValue = 1; |
| for (SensorParserConfig config : parserConfigs) { |
| Integer configValue = config.getErrorWriterNumTasks(); |
| retValue = configValue == null ? retValue : configValue; |
| } |
| return retValue; |
| }; |
| |
| // kafka spout config |
| ValueSupplier<List> spoutConfig = (parserConfigs, clazz) -> { |
| if(ParserOptions.SPOUT_CONFIG.has(cmd)) { |
| return Collections.singletonList(readJSONMapFromFile(new File(ParserOptions.SPOUT_CONFIG.get(cmd)))); |
| } |
| List<Map<String, Object>> retValue = new ArrayList<>(); |
| for (SensorParserConfig config : parserConfigs) { |
| retValue.add(config.getSpoutConfig()); |
| } |
| return retValue; |
| }; |
| |
| // security protocol |
| ValueSupplier<String> securityProtocol = (parserConfigs, clazz) -> { |
| Optional<String> sp = Optional.empty(); |
| if (ParserOptions.SECURITY_PROTOCOL.has(cmd)) { |
| sp = Optional.of(ParserOptions.SECURITY_PROTOCOL.get(cmd)); |
| } |
| // Need to adjust to handle list of spoutConfigs. Any non-plaintext wins |
| if (!sp.isPresent()) { |
| sp = getSecurityProtocol(sp, spoutConfig.get(parserConfigs, List.class)); |
| } |
| // Need to look through parserConfigs for any non-plaintext |
| String parserConfigSp = SecurityProtocol.PLAINTEXT.name; |
| for (SensorParserConfig config : parserConfigs) { |
| String configSp = config.getSecurityProtocol(); |
| if (!SecurityProtocol.PLAINTEXT.name.equals(configSp)) { |
| // We have a winner |
| parserConfigSp = configSp; |
| } |
| } |
| |
| return sp.orElse(Optional.ofNullable(parserConfigSp).orElse(null)); |
| }; |
| |
| // storm configuration |
| ValueSupplier<Config> stormConf = (parserConfigs, clazz) -> { |
| // Last one wins |
| Config finalConfig = new Config(); |
| for (SensorParserConfig parserConfig : parserConfigs) { |
| Map<String, Object> c = parserConfig.getStormConfig(); |
| if (c != null && !c.isEmpty()) { |
| finalConfig.putAll(c); |
| } |
| if (parserConfig.getNumAckers() != null) { |
| Config.setNumAckers(finalConfig, parserConfig.getNumAckers()); |
| } |
| if (parserConfig.getNumWorkers() != null) { |
| Config.setNumWorkers(finalConfig, parserConfig.getNumWorkers()); |
| } |
| } |
| return ParserOptions.getConfig(cmd, finalConfig).orElse(finalConfig); |
| }; |
| |
| // output topic |
| ValueSupplier<String> outputTopic = (parserConfigs, clazz) -> { |
| String topic = null; |
| |
| if(ParserOptions.OUTPUT_TOPIC.has(cmd)) { |
| topic = ParserOptions.OUTPUT_TOPIC.get(cmd); |
| } |
| |
| return topic; |
| }; |
| |
| // Error topic will throw an exception if the topics aren't all the same. |
| ValueSupplier<String> errorTopic = (parserConfigs, clazz) -> { |
| // topic will to set to the 'parser.error.topic' setting in globals when the error bolt is created |
| String topic = null; |
| for (SensorParserConfig parserConfig : parserConfigs) { |
| String currentTopic = parserConfig.getErrorTopic(); |
| if(topic != null && !topic.equals(currentTopic)) { |
| throw new IllegalArgumentException( |
| "Parser Aggregation specified with differing error topics"); |
| } |
| topic = currentTopic; |
| } |
| |
| return topic; |
| }; |
| |
| return getParserTopology( |
| zookeeperUrl, |
| brokerUrl, |
| sensorTypes, |
| spoutParallelism, |
| spoutNumTasks, |
| parserParallelism, |
| parserNumTasks, |
| errorParallelism, |
| errorNumTasks, |
| spoutConfig, |
| securityProtocol, |
| stormConf, |
| outputTopic, |
| errorTopic); |
| } |
| |
| protected ParserTopologyBuilder.ParserTopology getParserTopology( String zookeeperUrl, |
| Optional<String> brokerUrl, |
| List<String> sensorTypes, |
| ValueSupplier<List> spoutParallelism, |
| ValueSupplier<List> spoutNumTasks, |
| ValueSupplier<Integer> parserParallelism, |
| ValueSupplier<Integer> parserNumTasks, |
| ValueSupplier<Integer> errorParallelism, |
| ValueSupplier<Integer> errorNumTasks, |
| ValueSupplier<List> spoutConfig, |
| ValueSupplier<String> securityProtocol, |
| ValueSupplier<Config> stormConf, |
| ValueSupplier<String> outputTopic, |
| ValueSupplier<String> errorTopic) throws Exception { |
| return ParserTopologyBuilder.build( |
| zookeeperUrl, |
| brokerUrl, |
| sensorTypes, |
| spoutParallelism, |
| spoutNumTasks, |
| parserParallelism, |
| parserNumTasks, |
| errorParallelism, |
| errorNumTasks, |
| spoutConfig, |
| securityProtocol, |
| outputTopic, |
| errorTopic, |
| stormConf |
| ); |
| } |
| |
| |
| public static void main(String[] args) { |
| |
| try { |
| Options options = new Options(); |
| final CommandLine cmd = parse(options, args); |
| if (cmd.hasOption("h")) { |
| final HelpFormatter usageFormatter = new HelpFormatter(); |
| usageFormatter.printHelp("ParserTopologyCLI", null, options, null, true); |
| System.exit(0); |
| } |
| ParserTopologyCLI cli = new ParserTopologyCLI(); |
| ParserTopologyBuilder.ParserTopology topology = cli.createParserTopology(cmd); |
| String sensorTypes = ParserOptions.SENSOR_TYPES.get(cmd); |
| if (ParserOptions.TEST.has(cmd)) { |
| topology.getTopologyConfig().put(Config.TOPOLOGY_DEBUG, true); |
| LocalCluster cluster = new LocalCluster(); |
| cluster.submitTopology(sensorTypes.replaceAll(",", STORM_JOB_SEPARATOR), topology.getTopologyConfig(), topology.getBuilder().createTopology()); |
| Utils.sleep(300000); |
| cluster.shutdown(); |
| } else { |
| StormSubmitter.submitTopology(sensorTypes.replaceAll(",", STORM_JOB_SEPARATOR), topology.getTopologyConfig(), topology.getBuilder().createTopology()); |
| } |
| } catch (Exception e) { |
| e.printStackTrace(); |
| System.exit(-1); |
| } |
| } |
| |
| private static Optional<String> getSecurityProtocol(Optional<String> protocol, List<Map<String, Object>> spoutConfig) { |
| Optional<String> ret = protocol; |
| if(ret.isPresent() && protocol.get().equalsIgnoreCase(SecurityProtocol.PLAINTEXT.name)) { |
| ret = Optional.empty(); |
| } |
| if(!ret.isPresent()) { |
| // Need to look through spoutConfig for any non-plaintext |
| String spoutConfigSp = null; |
| for (Map<String, Object> config: spoutConfig) { |
| String configSp = (String) config.get(KafkaUtils.SECURITY_PROTOCOL); |
| if (configSp != null && !SecurityProtocol.PLAINTEXT.name.equals(configSp)) { |
| // We have a winner |
| spoutConfigSp = configSp; |
| } else if (configSp != null) { |
| // Use something explicitly defined. |
| spoutConfigSp = configSp; |
| } |
| } |
| ret = Optional.ofNullable(spoutConfigSp); |
| } |
| if(ret.isPresent() && ret.get().equalsIgnoreCase(SecurityProtocol.PLAINTEXT.name)) { |
| ret = Optional.empty(); |
| } |
| return ret; |
| } |
| |
| private static Map<String, Object> readJSONMapFromFile(File inputFile) { |
| String json = null; |
| if (inputFile.exists()) { |
| try { |
| json = FileUtils.readFileToString(inputFile); |
| } catch (IOException e) { |
| throw new IllegalStateException("Unable to process JSON file " + inputFile, e); |
| } |
| } |
| else { |
| throw new IllegalArgumentException("Unable to load JSON file at " + inputFile.getAbsolutePath()); |
| } |
| try { |
| return JSONUtils.INSTANCE.load(json, JSONUtils.MAP_SUPPLIER); |
| } catch (IOException e) { |
| throw new IllegalStateException("Unable to process JSON.", e); |
| } |
| } |
| } |