blob: dd61279eed3b97f2b8a4d11635fd342f6909761f [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.metron.parsers.topology;
import org.adrianwalker.multilinestring.Multiline;
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.ParseException;
import org.apache.commons.cli.Parser;
import org.apache.commons.cli.PosixParser;
import org.apache.commons.io.FileUtils;
import org.apache.log4j.Level;
import org.apache.metron.common.configuration.SensorParserConfig;
import org.apache.metron.common.utils.JSONUtils;
import org.apache.metron.parsers.topology.config.ValueSupplier;
import org.apache.metron.test.utils.UnitTestHelper;
import org.apache.storm.Config;
import org.junit.jupiter.api.Test;
import java.io.File;
import java.io.IOException;
import java.util.*;
import java.util.function.Predicate;
import java.util.function.Supplier;
import static org.junit.jupiter.api.Assertions.*;
public class ParserTopologyCLITest {
public static class CLIBuilder {
EnumMap<ParserTopologyCLI.ParserOptions, String> map = new EnumMap<>(ParserTopologyCLI.ParserOptions.class);
public CLIBuilder with(ParserTopologyCLI.ParserOptions option, String val) {
map.put(option, val);
return this;
}
public CLIBuilder with(ParserTopologyCLI.ParserOptions option) {
map.put(option, null);
return this;
}
public CommandLine build(boolean longOpt) throws ParseException {
return getCLI(map, longOpt);
}
private CommandLine getCLI(EnumMap<ParserTopologyCLI.ParserOptions, String> options, boolean longOpt) throws ParseException {
ArrayList<String> args = new ArrayList<>();
for (Map.Entry<ParserTopologyCLI.ParserOptions, String> option : options.entrySet()) {
boolean hasLongOpt = option.getKey().option.hasLongOpt();
if (hasLongOpt && longOpt) {
args.add("--" + option.getKey().option.getLongOpt());
if (option.getKey().option.hasArg() && option.getValue() != null) {
args.add(option.getValue());
}
} else if (hasLongOpt && !longOpt) {
args.add("-" + option.getKey().shortCode);
if (option.getKey().option.hasArg() && option.getValue() != null) {
args.add(option.getValue());
}
}
}
return ParserTopologyCLI.ParserOptions.parse(new PosixParser(), args.toArray(new String[args.size()]));
}
}
@Test
public void testNoOverlappingArgs() throws Exception {
Set<String> optionStrs = new HashSet<>();
for(ParserTopologyCLI.ParserOptions option : ParserTopologyCLI.ParserOptions.values()) {
if(optionStrs.contains(option.option.getLongOpt())) {
throw new IllegalStateException("Reused long option: " + option.option.getLongOpt());
}
if(optionStrs.contains(option.shortCode)) {
throw new IllegalStateException("Reused short option: " + option.shortCode);
}
optionStrs.add(option.option.getLongOpt());
optionStrs.add(option.shortCode);
}
}
@Test
public void testKafkaOffset_happyPath() throws ParseException {
kafkaOffset(true);
kafkaOffset(false);
}
public void kafkaOffset(boolean longOpt) throws ParseException {
CommandLine cli = new CLIBuilder().with(ParserTopologyCLI.ParserOptions.BROKER_URL, "mybroker")
.with(ParserTopologyCLI.ParserOptions.ZK_QUORUM, "myzk")
.with(ParserTopologyCLI.ParserOptions.SENSOR_TYPES, "mysensor")
.build(longOpt);
assertEquals("myzk", ParserTopologyCLI.ParserOptions.ZK_QUORUM.get(cli));
assertEquals("mybroker", ParserTopologyCLI.ParserOptions.BROKER_URL.get(cli));
assertEquals("mysensor", ParserTopologyCLI.ParserOptions.SENSOR_TYPES.get(cli));
}
@Test
public void testCLI_happyPath() throws ParseException {
happyPath(true);
happyPath(false);
}
@Test
public void testCLI_insufficientArg() {
UnitTestHelper.setLog4jLevel(Parser.class, Level.FATAL);
assertThrows(ParseException.class, () ->
new CLIBuilder().with(ParserTopologyCLI.ParserOptions.BROKER_URL, "mybroker")
.with(ParserTopologyCLI.ParserOptions.ZK_QUORUM, "myzk")
.build(true));
UnitTestHelper.setLog4jLevel(Parser.class, Level.ERROR);
}
public void happyPath(boolean longOpt) throws ParseException {
CommandLine cli = new CLIBuilder().with(ParserTopologyCLI.ParserOptions.BROKER_URL, "mybroker")
.with(ParserTopologyCLI.ParserOptions.ZK_QUORUM, "myzk")
.with(ParserTopologyCLI.ParserOptions.SENSOR_TYPES, "mysensor")
.build(longOpt);
assertEquals("myzk", ParserTopologyCLI.ParserOptions.ZK_QUORUM.get(cli));
assertEquals("mybroker", ParserTopologyCLI.ParserOptions.BROKER_URL.get(cli));
assertEquals("mysensor", ParserTopologyCLI.ParserOptions.SENSOR_TYPES.get(cli));
}
@Test
public void testConfig_noExtra() throws ParseException {
testConfig_noExtra(true);
testConfig_noExtra(false);
}
public void testConfig_noExtra(boolean longOpt) throws ParseException {
CommandLine cli = new CLIBuilder().with(ParserTopologyCLI.ParserOptions.BROKER_URL, "mybroker")
.with(ParserTopologyCLI.ParserOptions.ZK_QUORUM, "myzk")
.with(ParserTopologyCLI.ParserOptions.SENSOR_TYPES, "mysensor")
.with(ParserTopologyCLI.ParserOptions.NUM_WORKERS, "1")
.with(ParserTopologyCLI.ParserOptions.NUM_ACKERS, "2")
.with(ParserTopologyCLI.ParserOptions.NUM_MAX_TASK_PARALLELISM, "3")
.with(ParserTopologyCLI.ParserOptions.MESSAGE_TIMEOUT, "4")
.build(longOpt);
Optional<Config> configOptional = ParserTopologyCLI.ParserOptions.getConfig(cli);
Config config = configOptional.get();
assertEquals(1, config.get(Config.TOPOLOGY_WORKERS));
assertEquals(2, config.get(Config.TOPOLOGY_ACKER_EXECUTORS));
assertEquals(3, config.get(Config.TOPOLOGY_MAX_TASK_PARALLELISM));
assertEquals(4, config.get(Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS));
}
@Test
public void testOutputTopic() throws Exception {
testOutputTopic(true);
testOutputTopic(false);
}
public void testOutputTopic(boolean longOpt) throws ParseException {
CommandLine cli = new CLIBuilder().with(ParserTopologyCLI.ParserOptions.BROKER_URL, "mybroker")
.with(ParserTopologyCLI.ParserOptions.ZK_QUORUM, "myzk")
.with(ParserTopologyCLI.ParserOptions.SENSOR_TYPES, "mysensor")
.with(ParserTopologyCLI.ParserOptions.OUTPUT_TOPIC, "my_topic")
.build(longOpt);
assertEquals("my_topic", ParserTopologyCLI.ParserOptions.OUTPUT_TOPIC.get(cli));
}
/**
{
"string" : "foo"
,"integer" : 1
}
*/
@Multiline
public static String extraConfig;
@Test
public void testConfig_extra() throws Exception {
testConfig_extra(true);
testConfig_extra(false);
}
public void testConfig_extra(boolean longOpt) throws IOException, ParseException {
File extraFile = File.createTempFile("extra", "json");
try {
FileUtils.write(extraFile, extraConfig);
CommandLine cli = new CLIBuilder().with(ParserTopologyCLI.ParserOptions.BROKER_URL, "mybroker")
.with(ParserTopologyCLI.ParserOptions.ZK_QUORUM, "myzk")
.with(ParserTopologyCLI.ParserOptions.SENSOR_TYPES, "mysensor")
.with(ParserTopologyCLI.ParserOptions.MESSAGE_TIMEOUT, "4")
.with(ParserTopologyCLI.ParserOptions.EXTRA_OPTIONS, extraFile.getAbsolutePath())
.build(longOpt);
Optional<Config> configOptional = ParserTopologyCLI.ParserOptions.getConfig(cli);
Config config = configOptional.get();
assertEquals(4, config.get(Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS));
assertEquals("foo", config.get("string"));
assertEquals(1, config.get("integer"));
} finally{
extraFile.deleteOnExit();
}
}
private static class ParserInput {
private List<Integer> spoutParallelism;
private List<Integer> spoutNumTasks;
private Integer parserParallelism;
private Integer parserNumTasks;
private Integer errorParallelism;
private Integer errorNumTasks;
private List<Map<String, Object>> spoutConfig;
private String securityProtocol;
private Config stormConf;
private String outputTopic;
private String errorTopic;
public ParserInput(ValueSupplier<List> spoutParallelism,
ValueSupplier<List> spoutNumTasks,
ValueSupplier<Integer> parserParallelism,
ValueSupplier<Integer> parserNumTasks,
ValueSupplier<Integer> errorParallelism,
ValueSupplier<Integer> errorNumTasks,
ValueSupplier<List> spoutConfig,
ValueSupplier<String> securityProtocol,
ValueSupplier<Config> stormConf,
ValueSupplier<String> outputTopic,
ValueSupplier<String> errorTopic,
List<SensorParserConfig> configs
)
{
this.spoutParallelism = spoutParallelism.get(configs, List.class);
this.spoutNumTasks = spoutNumTasks.get(configs, List.class);
this.parserParallelism = parserParallelism.get(configs, Integer.class);
this.parserNumTasks = parserNumTasks.get(configs, Integer.class);
this.errorParallelism = errorParallelism.get(configs, Integer.class);
this.errorNumTasks = errorNumTasks.get(configs, Integer.class);
this.spoutConfig = spoutConfig.get(configs, List.class);
this.securityProtocol = securityProtocol.get(configs, String.class);
this.stormConf = stormConf.get(configs, Config.class);
this.outputTopic = outputTopic.get(configs, String.class);
this.errorTopic = errorTopic.get(configs, String.class);
}
public List<Integer> getSpoutParallelism() {
return spoutParallelism;
}
public List<Integer> getSpoutNumTasks() {
return spoutNumTasks;
}
public Integer getParserParallelism() {
return parserParallelism;
}
public Integer getParserNumTasks() {
return parserNumTasks;
}
public Integer getErrorParallelism() {
return errorParallelism;
}
public Integer getErrorNumTasks() {
return errorNumTasks;
}
public List<Map<String, Object>> getSpoutConfig() {
return spoutConfig;
}
public String getSecurityProtocol() {
return securityProtocol;
}
public Config getStormConf() {
return stormConf;
}
public String getOutputTopic() {
return outputTopic;
}
public String getErrorTopic() {
return errorTopic;
}
}
/**
* {
* "parserClassName": "org.apache.metron.parsers.GrokParser",
* "sensorTopic": "squid",
* "parserConfig": {
* "grokPath": "/patterns/squid",
* "patternLabel": "SQUID_DELIMITED",
* "timestampField": "timestamp"
* },
* "fieldTransformations" : [
* {
* "transformation" : "STELLAR",
* "output" : [
* "full_hostname",
* "domain_without_subdomains"
* ],
* "config" : {
* "full_hostname" : "URL_TO_HOST(url)",
* "domain_without_subdomains" : "DOMAIN_REMOVE_SUBDOMAINS(full_hostname)"
* }
* }
* ]
* }
*/
@Multiline
public static String baseConfig;
private static SensorParserConfig getBaseConfig() {
try {
return JSONUtils.INSTANCE.load(baseConfig, SensorParserConfig.class);
} catch (IOException e) {
throw new IllegalStateException(e.getMessage(), e);
}
}
@Test
public void testSpoutParallelism() throws Exception {
testConfigOption(ParserTopologyCLI.ParserOptions.SPOUT_PARALLELISM
, "10"
, input -> input.getSpoutParallelism().equals(Collections.singletonList(10))
, () -> {
SensorParserConfig config = getBaseConfig();
config.setSpoutParallelism(20);
return Collections.singletonList(config);
}
, input -> input.getSpoutParallelism().equals(Collections.singletonList(20))
);
}
@Test
public void testSpoutParallelismMultiple() throws Exception {
// Each spout uses it's own
// Return one per spout.
List<Integer> spoutParCli = new ArrayList<>();
spoutParCli.add(10);
spoutParCli.add(12);
List<Integer> spoutParConfig = new ArrayList<>();
spoutParConfig.add(20);
spoutParConfig.add(30);
testConfigOption(ParserTopologyCLI.ParserOptions.SPOUT_PARALLELISM
, "10,12"
, input -> input.getSpoutParallelism().equals(spoutParCli)
, () -> {
SensorParserConfig config = getBaseConfig();
config.setSpoutParallelism(20);
SensorParserConfig config2 = getBaseConfig();
config2.setSpoutParallelism(30);
List<SensorParserConfig> configs = new ArrayList<>();
configs.add(config);
configs.add(config2);
return configs;
}
, input -> input.getSpoutParallelism().equals(spoutParConfig)
);
}
@Test
public void testSpoutNumTasks() throws Exception {
testConfigOption(ParserTopologyCLI.ParserOptions.SPOUT_NUM_TASKS
, "10"
, input -> input.getSpoutNumTasks().equals(Collections.singletonList(10))
, () -> {
SensorParserConfig config = getBaseConfig();
config.setSpoutNumTasks(20);
return Collections.singletonList(config);
}
, input -> input.getSpoutNumTasks().equals(Collections.singletonList(20))
);
}
@Test
public void testSpoutNumTasksMultiple() throws Exception {
// Return one per spout.
List<Integer> numTasksCli = new ArrayList<>();
numTasksCli.add(10);
numTasksCli.add(12);
List<Integer> numTasksConfig = new ArrayList<>();
numTasksConfig.add(20);
numTasksConfig.add(30);
testConfigOption(ParserTopologyCLI.ParserOptions.SPOUT_NUM_TASKS
, "10,12"
, input -> input.getSpoutNumTasks().equals(numTasksCli)
, () -> {
SensorParserConfig config = getBaseConfig();
config.setSpoutNumTasks(20);
SensorParserConfig config2 = getBaseConfig();
config2.setSpoutNumTasks(30);
List<SensorParserConfig> configs = new ArrayList<>();
configs.add(config);
configs.add(config2);
return configs;
}
, input -> input.getSpoutNumTasks().equals(numTasksConfig)
);
}
@Test
public void testParserParallelism() throws Exception {
testConfigOption(ParserTopologyCLI.ParserOptions.PARSER_PARALLELISM
, "10"
, input -> input.getParserParallelism().equals(10)
, () -> {
SensorParserConfig config = getBaseConfig();
config.setParserParallelism(20);
return Collections.singletonList(config);
}
, input -> input.getParserParallelism().equals(20)
);
}
@Test
public void testParserParallelismMultiple() throws Exception {
// Last one wins
testConfigOption(ParserTopologyCLI.ParserOptions.PARSER_PARALLELISM
, "10"
, input -> input.getParserParallelism().equals(10)
, () -> {
SensorParserConfig config = getBaseConfig();
config.setParserParallelism(20);
SensorParserConfig config2 = getBaseConfig();
config2.setParserParallelism(30);
List<SensorParserConfig> configs = new ArrayList<>();
configs.add(config);
configs.add(config2);
return configs;
}
, input -> input.getParserParallelism().equals(30)
);
}
@Test
public void testParserNumTasks() throws Exception {
testConfigOption(ParserTopologyCLI.ParserOptions.PARSER_NUM_TASKS
, "10"
, input -> input.getParserNumTasks().equals(10)
, () -> {
SensorParserConfig config = getBaseConfig();
config.setParserNumTasks(20);
SensorParserConfig config2 = getBaseConfig();
config2.setParserNumTasks(30);
List<SensorParserConfig> configs = new ArrayList<>();
configs.add(config);
configs.add(config2);
return configs;
}
, input -> input.getParserNumTasks().equals(30)
);
}
@Test
public void testParserNumTasksMultiple() throws Exception {
testConfigOption(ParserTopologyCLI.ParserOptions.PARSER_NUM_TASKS
, "10"
, input -> input.getParserNumTasks().equals(10)
, () -> {
SensorParserConfig config = getBaseConfig();
config.setParserNumTasks(20);
return Collections.singletonList(config);
}
, input -> input.getParserNumTasks().equals(20)
);
}
@Test
public void testErrorParallelism() throws Exception {
testConfigOption(ParserTopologyCLI.ParserOptions.ERROR_WRITER_PARALLELISM
, "10"
, input -> input.getErrorParallelism().equals(10)
, () -> {
SensorParserConfig config = getBaseConfig();
config.setErrorWriterParallelism(20);
return Collections.singletonList(config);
}
, input -> input.getErrorParallelism().equals(20)
);
}
@Test
public void testErrorNumTasks() throws Exception {
testConfigOption(ParserTopologyCLI.ParserOptions.ERROR_WRITER_NUM_TASKS
, "10"
, input -> input.getErrorNumTasks().equals(10)
, () -> {
SensorParserConfig config = getBaseConfig();
config.setErrorWriterNumTasks(20);
return Collections.singletonList(config);
}
, input -> input.getErrorNumTasks().equals(20)
);
}
@Test
public void testSecurityProtocol_fromCLI() throws Exception {
testConfigOption(ParserTopologyCLI.ParserOptions.SECURITY_PROTOCOL
, "PLAINTEXT"
, input -> input.getSecurityProtocol().equals("PLAINTEXT")
, () -> {
SensorParserConfig config = getBaseConfig();
config.setSecurityProtocol("KERBEROS");
return Collections.singletonList(config);
}
, input -> input.getSecurityProtocol().equals("KERBEROS")
);
}
@Test
public void testSecurityProtocol_fromCLIMultipleUniform() throws Exception {
testConfigOption(ParserTopologyCLI.ParserOptions.SECURITY_PROTOCOL
, "PLAINTEXT"
, input -> input.getSecurityProtocol().equals("PLAINTEXT")
, () -> {
SensorParserConfig config = getBaseConfig();
config.setSecurityProtocol("PLAINTEXT");
SensorParserConfig config2 = getBaseConfig();
config2.setSecurityProtocol("PLAINTEXT");
List<SensorParserConfig> configs = new ArrayList<>();
configs.add(config);
configs.add(config2);
return configs;
}
, input -> input.getSecurityProtocol().equals("PLAINTEXT")
);
}
@Test
public void testSecurityProtocol_fromCLIMultipleMixed() throws Exception {
// Non plaintext wins
testConfigOption(ParserTopologyCLI.ParserOptions.SECURITY_PROTOCOL
, "PLAINTEXT"
, input -> input.getSecurityProtocol().equals("PLAINTEXT")
, () -> {
SensorParserConfig config = getBaseConfig();
config.setSecurityProtocol("PLAINTEXT");
SensorParserConfig config2 = getBaseConfig();
config2.setSecurityProtocol("KERBEROS");
SensorParserConfig config3 = getBaseConfig();
config3.setSecurityProtocol("PLAINTEXT");
List<SensorParserConfig> configs = new ArrayList<>();
configs.add(config);
configs.add(config2);
configs.add(config3);
return configs;
}
, input -> input.getSecurityProtocol().equals("KERBEROS")
);
}
@Test
public void testSecurityProtocol_fromSpout() throws Exception {
//Ultimately the order of precedence is CLI > spout config > parser config
File extraConfig = File.createTempFile("spoutConfig", "json");
extraConfig.deleteOnExit();
writeMap(extraConfig, new HashMap<String, Object>() {{
put("security.protocol", "PLAINTEXTSASL");
}});
{
//Ensure that the CLI spout config takes precedence
testConfigOption(new EnumMap<ParserTopologyCLI.ParserOptions, String>(ParserTopologyCLI.ParserOptions.class) {{
put(ParserTopologyCLI.ParserOptions.SPOUT_CONFIG, extraConfig.getAbsolutePath());
put(ParserTopologyCLI.ParserOptions.SECURITY_PROTOCOL, "PLAINTEXT");
}}
, input -> input.getSecurityProtocol().equals("PLAINTEXT")
, () -> {
SensorParserConfig config = getBaseConfig();
config.setSecurityProtocol("PLAINTEXTSASL_FROM_ZK");
return Collections.singletonList(config);
}
, input -> input.getSecurityProtocol().equals("PLAINTEXTSASL_FROM_ZK")
);
}
{
//Ensure that the spout config takes precedence
testConfigOption(new EnumMap<ParserTopologyCLI.ParserOptions, String>(ParserTopologyCLI.ParserOptions.class) {{
put(ParserTopologyCLI.ParserOptions.SPOUT_CONFIG, extraConfig.getAbsolutePath());
}}
, input -> input.getSecurityProtocol().equals("PLAINTEXTSASL")
, () -> {
SensorParserConfig config = getBaseConfig();
config.setSecurityProtocol("PLAINTEXTSASL_FROM_ZK");
return Collections.singletonList(config);
}
, input -> input.getSecurityProtocol().equals("PLAINTEXTSASL_FROM_ZK")
);
}
}
@Test
public void testTopologyConfig_fromConfigExplicitly() throws Exception {
testConfigOption(new EnumMap<ParserTopologyCLI.ParserOptions, String>(ParserTopologyCLI.ParserOptions.class)
{{
put(ParserTopologyCLI.ParserOptions.NUM_WORKERS, "10");
put(ParserTopologyCLI.ParserOptions.NUM_ACKERS, "20");
}}
, input -> {
Config c = input.getStormConf();
return (int)c.get(Config.TOPOLOGY_WORKERS) == 10
&& (int)c.get(Config.TOPOLOGY_ACKER_EXECUTORS) == 20;
}
, () -> {
SensorParserConfig config = getBaseConfig();
config.setNumWorkers(100);
config.setNumAckers(200);
return Collections.singletonList(config);
}
, input -> {
Config c = input.getStormConf();
return (int)c.get(Config.TOPOLOGY_WORKERS) == 100
&& (int)c.get(Config.TOPOLOGY_ACKER_EXECUTORS) == 200
;
}
);
}
@Test
public void testTopologyConfig() throws Exception {
File extraConfig = File.createTempFile("topologyConfig", "json");
extraConfig.deleteOnExit();
writeMap(extraConfig, new HashMap<String, Object>() {{
put(Config.TOPOLOGY_DEBUG, true);
}});
testConfigOption(new EnumMap<ParserTopologyCLI.ParserOptions, String>(ParserTopologyCLI.ParserOptions.class)
{{
put(ParserTopologyCLI.ParserOptions.NUM_WORKERS, "10");
put(ParserTopologyCLI.ParserOptions.NUM_ACKERS, "20");
put(ParserTopologyCLI.ParserOptions.EXTRA_OPTIONS, extraConfig.getAbsolutePath());
}}
, input -> {
Config c = input.getStormConf();
return (int)c.get(Config.TOPOLOGY_WORKERS) == 10
&& (int)c.get(Config.TOPOLOGY_ACKER_EXECUTORS) == 20
&& (boolean)c.get(Config.TOPOLOGY_DEBUG);
}
, () -> {
SensorParserConfig config = getBaseConfig();
config.setStormConfig(
new HashMap<String, Object>() {{
put(Config.TOPOLOGY_WORKERS, 100);
put(Config.TOPOLOGY_ACKER_EXECUTORS, 200);
}}
);
return Collections.singletonList(config);
}
, input -> {
Config c = input.getStormConf();
return (int)c.get(Config.TOPOLOGY_WORKERS) == 100
&& (int)c.get(Config.TOPOLOGY_ACKER_EXECUTORS) == 200
&& !c.containsKey(Config.TOPOLOGY_DEBUG);
}
);
}
@Test
public void testSpoutConfig() throws Exception {
File extraConfig = File.createTempFile("spoutConfig", "json");
extraConfig.deleteOnExit();
writeMap(extraConfig, new HashMap<String, Object>() {{
put("extra_config", "from_file");
}});
EnumMap<ParserTopologyCLI.ParserOptions, String> cliOptions = new EnumMap<ParserTopologyCLI.ParserOptions, String>(ParserTopologyCLI.ParserOptions.class)
{{
put(ParserTopologyCLI.ParserOptions.SPOUT_CONFIG, extraConfig.getAbsolutePath());
}};
Predicate<ParserInput> cliOverrideExpected = input -> {
return input.getSpoutConfig().get(0).get("extra_config").equals("from_file");
};
Predicate<ParserInput> configOverrideExpected = input -> {
return input.getSpoutConfig().get(0).get("extra_config").equals("from_zk");
};
Supplier<List<SensorParserConfig>> configSupplier = () -> {
SensorParserConfig config = getBaseConfig();
config.setSpoutConfig(
new HashMap<String, Object>() {{
put("extra_config", "from_zk");
}}
);
return Collections.singletonList(config);
};
testConfigOption( cliOptions
, cliOverrideExpected
, configSupplier
, configOverrideExpected
);
}
private void writeMap(File outFile, Map<String, Object> config) throws IOException {
FileUtils.write(outFile, JSONUtils.INSTANCE.toJSON(config, true));
}
private void testConfigOption( ParserTopologyCLI.ParserOptions option
, String cliOverride
, Predicate<ParserInput> cliOverrideCondition
, Supplier<List<SensorParserConfig>> configSupplier
, Predicate<ParserInput> configOverrideCondition
) throws Exception {
testConfigOption(
new EnumMap<ParserTopologyCLI.ParserOptions, String>(ParserTopologyCLI.ParserOptions.class) {{
put(option, cliOverride);
}},
cliOverrideCondition,
configSupplier,
configOverrideCondition
);
}
private void testConfigOption( EnumMap<ParserTopologyCLI.ParserOptions, String> options
, Predicate<ParserInput> cliOverrideCondition
, Supplier<List<SensorParserConfig>> configSupplier
, Predicate<ParserInput> configOverrideCondition
) throws Exception {
//CLI Override
List<SensorParserConfig> configs = configSupplier.get();
{
CLIBuilder builder = new CLIBuilder().with(ParserTopologyCLI.ParserOptions.BROKER_URL, "mybroker")
.with(ParserTopologyCLI.ParserOptions.ZK_QUORUM, "myzk")
.with(ParserTopologyCLI.ParserOptions.SENSOR_TYPES, "mysensor");
for(Map.Entry<ParserTopologyCLI.ParserOptions, String> entry : options.entrySet()) {
builder.with(entry.getKey(), entry.getValue());
}
CommandLine cmd = builder.build(true);
ParserInput input = getInput(cmd, configs);
assertTrue(cliOverrideCondition.test(input));
}
// Config Override
{
CLIBuilder builder = new CLIBuilder().with(ParserTopologyCLI.ParserOptions.BROKER_URL, "mybroker")
.with(ParserTopologyCLI.ParserOptions.ZK_QUORUM, "myzk")
.with(ParserTopologyCLI.ParserOptions.SENSOR_TYPES, "mysensor");
CommandLine cmd = builder.build(true);
ParserInput input = getInput(cmd, configs);
assertTrue(configOverrideCondition.test(input));
}
}
private static ParserInput getInput(CommandLine cmd, List<SensorParserConfig> configs ) throws Exception {
final ParserInput[] parserInput = new ParserInput[]{null};
new ParserTopologyCLI() {
@Override
protected ParserTopologyBuilder.ParserTopology getParserTopology(
String zookeeperUrl,
Optional<String> brokerUrl,
List<String> sensorType,
ValueSupplier<List> spoutParallelism,
ValueSupplier<List> spoutNumTasks,
ValueSupplier<Integer> parserParallelism,
ValueSupplier<Integer> parserNumTasks,
ValueSupplier<Integer> errorParallelism,
ValueSupplier<Integer> errorNumTasks,
ValueSupplier<List> spoutConfig,
ValueSupplier<String> securityProtocol,
ValueSupplier<Config> stormConf,
ValueSupplier<String> outputTopic,
ValueSupplier<String> errorTopic) throws Exception {
parserInput[0] = new ParserInput(
spoutParallelism,
spoutNumTasks,
parserParallelism,
parserNumTasks,
errorParallelism,
errorNumTasks,
spoutConfig,
securityProtocol,
stormConf,
outputTopic,
errorTopic,
configs
);
return null;
}
}.createParserTopology(cmd);
return parserInput[0];
}
}