blob: 9692be34ae93db2188431d153cdba9ea9616c34a [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.metron.parsers.topology;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
import org.apache.metron.common.Constants;
import org.apache.metron.common.configuration.ParserConfigurations;
import org.apache.metron.common.configuration.SensorParserConfig;
import org.apache.metron.common.writer.BulkMessageWriter;
import org.apache.metron.parsers.bolt.WriterHandler;
import org.apache.metron.writer.NoopWriter;
import org.apache.metron.writer.kafka.KafkaWriter;
import org.json.simple.JSONObject;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
public class ParserTopologyBuilderTest {
private static ParserConfigurations configs;
private static KafkaWriter kafkaWriter;
@BeforeAll
public static void setupAll() {
configs = mock(ParserConfigurations.class);
kafkaWriter = mock(KafkaWriter.class);
}
@Test
public void shouldCreateWriterConfig() {
SensorParserConfig broConfig = new SensorParserConfig();
broConfig.setSensorTopic("bro");
when(configs.getSensorParserConfig("bro")).thenReturn(broConfig);
Map<String, SensorParserConfig> sensorTypeToParserConfig = new HashMap<String, SensorParserConfig>() {{
put("bro", broConfig);
}};
Map<String, WriterHandler> writerConfigs = ParserTopologyBuilder
.createWriterConfigs("zookeeperUrl",
Optional.of("brokerUrl"),
sensorTypeToParserConfig,
Optional.of("securityProtocol"),
configs,
Optional.empty());
assertEquals(1, writerConfigs.size());
// Can't directly verify against mocks because this is all static. However, knowing that we have a KafkaWriter
// and the appropriate topic lets us know we've created the underlying config.
BulkMessageWriter writer = writerConfigs.get("bro").getBulkMessageWriter();
assertTrue(writer instanceof KafkaWriter);
assertEquals(Constants.ENRICHMENT_TOPIC, ((KafkaWriter) writer).getKafkaTopic(new JSONObject()).get());
}
@Test
public void shouldCreateWriterConfigWithSensorParserConfigOutputTopic() {
SensorParserConfig snortConfig = new SensorParserConfig();
snortConfig.setSensorTopic("snort");
snortConfig.setOutputTopic("snort_topic");
when(configs.getSensorParserConfig("snort")).thenReturn(snortConfig);
Map<String, SensorParserConfig> sensorTypeToParserConfig = new HashMap<String, SensorParserConfig>() {{
put("snort", snortConfig);
}};
Map<String, WriterHandler> writerConfigs = ParserTopologyBuilder
.createWriterConfigs("zookeeperUrl",
Optional.of("brokerUrl"),
sensorTypeToParserConfig,
Optional.of("securityProtocol"),
configs,
Optional.empty());
assertEquals(1, writerConfigs.size());
// Can't directly verify against mocks because this is all static. However, knowing that we have a KafkaWriter
// and the appropriate topic lets us know we've created the underlying config.
BulkMessageWriter writer = writerConfigs.get("snort").getBulkMessageWriter();
assertTrue(writer instanceof KafkaWriter);
assertEquals("snort_topic", ((KafkaWriter) writer).getKafkaTopic(new JSONObject()).get());
}
@Test
public void shouldCreateWriterConfigWithSuppliedOutputTopic() {
SensorParserConfig snortConfig = new SensorParserConfig();
snortConfig.setSensorTopic("snort");
when(configs.getSensorParserConfig("snort")).thenReturn(snortConfig);
Map<String, SensorParserConfig> sensorTypeToParserConfig = new HashMap<String, SensorParserConfig>() {{
put("snort", snortConfig);
}};
Map<String, WriterHandler> writerConfigs = ParserTopologyBuilder
.createWriterConfigs("zookeeperUrl",
Optional.of("brokerUrl"),
sensorTypeToParserConfig,
Optional.of("securityProtocol"),
configs,
Optional.of("supplied_topic"));
assertEquals(1, writerConfigs.size());
// Can't directly verify against mocks because this is all static. However, knowing that we have a KafkaWriter
// and the appropriate topic lets us know we've created the underlying config.
BulkMessageWriter writer = writerConfigs.get("snort").getBulkMessageWriter();
assertTrue(writer instanceof KafkaWriter);
assertEquals("supplied_topic", ((KafkaWriter) writer).getKafkaTopic(new JSONObject()).get());
}
@Test
public void shouldCreateWriterConfigWithWriterClassName() {
SensorParserConfig yafConfig = new SensorParserConfig();
yafConfig.setSensorTopic("yaf");
yafConfig.setWriterClassName("org.apache.metron.writer.NoopWriter");
when(configs.getSensorParserConfig("yaf")).thenReturn(yafConfig);
Map<String, SensorParserConfig> sensorTypeToParserConfig = new HashMap<String, SensorParserConfig>() {{
put("yaf", yafConfig);
}};
Map<String, WriterHandler> writerConfigs = ParserTopologyBuilder
.createWriterConfigs("zookeeperUrl",
Optional.of("brokerUrl"),
sensorTypeToParserConfig,
Optional.of("securityProtocol"),
configs,
Optional.empty());
assertEquals(1, writerConfigs.size());
assertTrue(writerConfigs.get("yaf").getBulkMessageWriter() instanceof NoopWriter);
}
}