blob: aeafede485f994160da1b2c82a7451c234e15dfb [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.metron.enrichment.bolt;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import com.fasterxml.jackson.databind.JsonMappingException;
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import org.adrianwalker.multilinestring.Multiline;
import org.apache.metron.common.configuration.enrichment.SensorEnrichmentConfig;
import org.apache.metron.common.configuration.enrichment.threatintel.ThreatTriageConfig;
import org.apache.metron.storm.common.message.MessageGetStrategy;
import org.apache.metron.common.utils.JSONUtils;
import org.apache.metron.enrichment.adapters.maxmind.asn.GeoLiteAsnDatabase;
import org.apache.metron.enrichment.adapters.maxmind.geo.GeoLiteCityDatabase;
import org.apache.metron.test.bolt.BaseEnrichmentBoltTest;
import org.apache.metron.test.utils.UnitTestHelper;
import org.apache.storm.tuple.Tuple;
import org.json.simple.JSONObject;
import org.json.simple.parser.JSONParser;
import org.json.simple.parser.ParseException;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
public class ThreatIntelJoinBoltTest extends BaseEnrichmentBoltTest {
private static final String enrichmentConfigPath = "../" + sampleSensorEnrichmentConfigPath;
/**
* {
* "field1": "value1",
* "enrichedField1": "enrichedValue1",
* "source.type": "test"
* }
*/
@Multiline
private String messageString;
/**
* {
* "field1": "value1",
* "enrichedField1": "enrichedValue1",
* "source.type": "test",
* "threatintels.field.end.ts": "timing"
* }
*/
@Multiline
private String messageWithTimingString;
/**
* {
* "field1": "value1",
* "enrichedField1": "enrichedValue1",
* "source.type": "test",
* "threatintels.field": "threatIntelValue"
* }
*/
@Multiline
private String alertMessageString;
private JSONObject message;
private JSONObject messageWithTiming;
private JSONObject alertMessage;
@Before
public void parseMessages() throws ParseException {
JSONParser parser = new JSONParser();
message = (JSONObject) parser.parse(messageString);
messageWithTiming = (JSONObject) parser.parse(messageWithTimingString);
alertMessage = (JSONObject) parser.parse(alertMessageString);
}
/**
* {
* "riskLevelRules" : [
* {
* "rule" : "enrichedField1 == 'enrichedValue1'",
* "score" : 10
* }
* ],
* "aggregator" : "MAX"
* }
*/
@Multiline
private static String testWithTriageConfig;
@Test
public void testWithTriage() throws IOException {
test(testWithTriageConfig, false);
}
/**
* {
* "riskLevelRules" : [
* {
* "rule" : "enrichedField1 == 'enrichedValue1",
* "score" : 10
* }
* ],
* "aggregator" : "MAX"
* }
*/
@Multiline
private static String testWithBadTriageRuleConfig;
@Test
public void testWithBadTriageRule() throws IOException {
test(testWithBadTriageRuleConfig, true);
}
@Test
public void testWithoutTriage() throws IOException {
test(null, false);
}
/**
* {
* "riskLevelRules": [
* {
* "rule" : "not(IN_SUBNET(ip_dst_addr, '192.168.0.0/24'))",
* "score" : 10
* }
* ],
* "aggregator": "MAX"
* }
*/
@Multiline
private static String testWithStellarFunctionConfig;
@Test
public void testWithStellarFunction() throws IOException {
test(testWithStellarFunctionConfig, false);
}
public void test(String threatTriageConfig, boolean badConfig) throws IOException {
ThreatIntelJoinBolt threatIntelJoinBolt = new ThreatIntelJoinBolt("zookeeperUrl");
threatIntelJoinBolt.setCuratorFramework(client);
threatIntelJoinBolt.setZKCache(cache);
SensorEnrichmentConfig enrichmentConfig = JSONUtils.INSTANCE.load(
new FileInputStream(enrichmentConfigPath), SensorEnrichmentConfig.class);
boolean withThreatTriage = threatTriageConfig != null;
if (withThreatTriage) {
try {
enrichmentConfig.getThreatIntel().setTriageConfig(JSONUtils.INSTANCE.load(threatTriageConfig, ThreatTriageConfig.class));
if (badConfig) {
Assert.fail(threatTriageConfig + "\nThis should not parse!");
}
} catch (JsonMappingException pe) {
if (!badConfig) {
throw pe;
}
}
}
threatIntelJoinBolt.getConfigurations().updateSensorEnrichmentConfig(sensorType, enrichmentConfig);
HashMap<String, Object> globalConfig = new HashMap<>();
String baseDir = UnitTestHelper.findDir(new File("../metron-enrichment-common"), "GeoLite");
File geoHdfsFile = new File(new File(baseDir), "GeoLite2-City.mmdb.gz");
globalConfig.put(GeoLiteCityDatabase.GEO_HDFS_FILE, geoHdfsFile.getAbsolutePath());
File asnHdfsFile = new File(new File(baseDir), "GeoLite2-ASN.tar.gz");
globalConfig.put(GeoLiteAsnDatabase.ASN_HDFS_FILE, asnHdfsFile.getAbsolutePath());
threatIntelJoinBolt.getConfigurations().updateGlobalConfig(globalConfig);
threatIntelJoinBolt.withMaxCacheSize(100);
threatIntelJoinBolt.withMaxTimeRetain(10000);
threatIntelJoinBolt.prepare(new HashMap<>(), topologyContext, outputCollector);
Map<String, Object> fieldMap = threatIntelJoinBolt.getFieldMap("incorrectSourceType");
Assert.assertNull(fieldMap);
fieldMap = threatIntelJoinBolt.getFieldMap(sensorType);
Assert.assertTrue(fieldMap.containsKey("hbaseThreatIntel"));
MessageGetStrategy messageGetStrategy = mock(MessageGetStrategy.class);
Tuple messageTuple = mock(Tuple.class);
when(messageGetStrategy.get(messageTuple)).thenReturn(message);
Map<String, Tuple> streamMessageMap = new HashMap<>();
streamMessageMap.put("message", messageTuple);
JSONObject joinedMessage = threatIntelJoinBolt.joinMessages(streamMessageMap, messageGetStrategy);
assertFalse(joinedMessage.containsKey("is_alert"));
when(messageGetStrategy.get(messageTuple)).thenReturn(messageWithTiming);
joinedMessage = threatIntelJoinBolt.joinMessages(streamMessageMap, messageGetStrategy);
assertFalse(joinedMessage.containsKey("is_alert"));
when(messageGetStrategy.get(messageTuple)).thenReturn(alertMessage);
joinedMessage = threatIntelJoinBolt.joinMessages(streamMessageMap, messageGetStrategy);
assertTrue(joinedMessage.containsKey("is_alert") && "true".equals(joinedMessage.get("is_alert")));
if(withThreatTriage && !badConfig) {
assertTrue(joinedMessage.containsKey("threat.triage.score"));
Double score = (Double) joinedMessage.get("threat.triage.score");
assertTrue(Math.abs(10d - score) < 1e-10);
}
else {
assertFalse(joinedMessage.containsKey("threat.triage.score"));
}
}
}