| /** |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package org.apache.metron.enrichment.integration; |
| |
| import com.google.common.base.*; |
| import com.google.common.collect.Iterables; |
| import org.apache.commons.lang3.StringUtils; |
| import org.apache.metron.TestConstants; |
| import org.apache.metron.common.Constants; |
| import org.apache.metron.common.utils.JSONUtils; |
| import org.apache.metron.enrichment.adapters.maxmind.asn.GeoLiteAsnDatabase; |
| import org.apache.metron.enrichment.adapters.maxmind.geo.GeoLiteCityDatabase; |
| import org.apache.metron.enrichment.converter.EnrichmentHelper; |
| import org.apache.metron.enrichment.converter.EnrichmentKey; |
| import org.apache.metron.enrichment.converter.EnrichmentValue; |
| import org.apache.metron.enrichment.lookup.LookupKV; |
| import org.apache.metron.enrichment.lookup.accesstracker.PersistentBloomTrackerCreator; |
| import org.apache.metron.enrichment.stellar.SimpleHBaseEnrichmentFunctions; |
| import org.apache.metron.enrichment.utils.ThreatIntelUtils; |
| import org.apache.metron.hbase.mock.MockHBaseTableProvider; |
| import org.apache.metron.hbase.mock.MockHTable; |
| import org.apache.metron.integration.BaseIntegrationTest; |
| import org.apache.metron.integration.ComponentRunner; |
| import org.apache.metron.integration.ProcessorResult; |
| import org.apache.metron.integration.components.ConfigUploadComponent; |
| import org.apache.metron.integration.components.FluxTopologyComponent; |
| import org.apache.metron.integration.components.KafkaComponent; |
| import org.apache.metron.integration.components.ZKServerComponent; |
| import org.apache.metron.integration.processors.KafkaMessageSet; |
| import org.apache.metron.integration.processors.KafkaProcessor; |
| import org.apache.metron.integration.utils.TestUtils; |
| import org.apache.metron.test.utils.UnitTestHelper; |
| import org.json.simple.parser.ParseException; |
| import org.junit.jupiter.api.BeforeAll; |
| import org.junit.jupiter.api.Test; |
| |
| import javax.annotation.Nullable; |
| import java.io.File; |
| import java.io.IOException; |
| import java.nio.charset.StandardCharsets; |
| import java.util.Objects; |
| import java.util.*; |
| import java.util.stream.Stream; |
| |
| import static org.junit.jupiter.api.Assertions.*; |
| |
| /** |
| * Integration test for the enrichment topology. |
| */ |
| public class EnrichmentIntegrationTest extends BaseIntegrationTest { |
| |
| public static final String ERROR_TOPIC = "enrichment_error"; |
| public static final String SRC_IP = "ip_src_addr"; |
| public static final String DST_IP = "ip_dst_addr"; |
| public static final String MALICIOUS_IP_TYPE = "malicious_ip"; |
| public static final String PLAYFUL_CLASSIFICATION_TYPE = "playful_classification"; |
| public static final Map<String, Object> PLAYFUL_ENRICHMENT = new HashMap<String, Object>() {{ |
| put("orientation", "north"); |
| }}; |
| public static final String DEFAULT_COUNTRY = "test country"; |
| public static final String DEFAULT_CITY = "test city"; |
| public static final String DEFAULT_POSTAL_CODE = "test postalCode"; |
| public static final String DEFAULT_LATITUDE = "test latitude"; |
| public static final String DEFAULT_LONGITUDE = "test longitude"; |
| public static final String DEFAULT_DMACODE= "test dmaCode"; |
| public static final String DEFAULT_LOCATION_POINT= Joiner.on(',').join(DEFAULT_LATITUDE,DEFAULT_LONGITUDE); |
| public static final String cf = "cf"; |
| public static final String trackerHBaseTableName = "tracker"; |
| public static final String threatIntelTableName = "threat_intel"; |
| public static final String enrichmentsTableName = "enrichments"; |
| |
| protected String enrichmentConfigPath = "../" + TestConstants.SAMPLE_CONFIG_PATH; |
| protected String sampleParsedPath = "../" + TestConstants.SAMPLE_DATA_PARSED_PATH + "TestExampleParsed"; |
| private final List<byte[]> inputMessages = getInputMessages(sampleParsedPath); |
| |
| private static File geoHdfsFile; |
| private static File asnHdfsFile; |
| |
| private static List<byte[]> getInputMessages(String path){ |
| try { |
| List<byte[]> ret = TestUtils.readSampleData(path); |
| { |
| //we want one of the fields without a destination IP to ensure that enrichments can function |
| Map<String, Object> sansDestinationIp = JSONUtils.INSTANCE.load(new String(ret.get(ret.size() -1), |
| StandardCharsets.UTF_8) |
| , JSONUtils.MAP_SUPPLIER); |
| sansDestinationIp.remove(Constants.Fields.DST_ADDR.getName()); |
| ret.add(JSONUtils.INSTANCE.toJSONPretty(sansDestinationIp)); |
| } |
| return ret; |
| } catch(IOException ioe){ |
| return null; |
| } |
| } |
| |
| @BeforeAll |
| public static void setupOnce() throws ParseException { |
| String baseDir = UnitTestHelper.findDir(new File("../metron-enrichment-common"), "GeoLite"); |
| geoHdfsFile = new File(new File(baseDir), "GeoLite2-City.mmdb.gz"); |
| asnHdfsFile = new File(new File(baseDir), "GeoLite2-ASN.tar.gz"); |
| } |
| |
| /** |
| * @return The path to the topology properties template. |
| */ |
| public String getTemplatePath() { |
| return "src/main/config/enrichment.properties.j2"; |
| } |
| |
| /** |
| * @return The path to the flux file defining the topology. |
| */ |
| public String fluxPath() { |
| return "src/main/flux/enrichment/remote.yaml"; |
| } |
| |
| /** |
| * @return The topology properties. |
| */ |
| public Properties getTopologyProperties() { |
| return new Properties() {{ |
| |
| // storm |
| setProperty("enrichment_workers", "1"); |
| setProperty("enrichment_acker_executors", "0"); |
| setProperty("enrichment_topology_worker_childopts", ""); |
| setProperty("topology_auto_credentials", "[]"); |
| setProperty("enrichment_topology_max_spout_pending", "500"); |
| |
| // kafka - zookeeper_quorum, kafka_brokers set elsewhere |
| setProperty("kafka_security_protocol", "PLAINTEXT"); |
| setProperty("enrichment_kafka_start", "UNCOMMITTED_EARLIEST"); |
| setProperty("enrichment_input_topic", Constants.ENRICHMENT_TOPIC); |
| setProperty("enrichment_output_topic", Constants.INDEXING_TOPIC); |
| setProperty("enrichment_error_topic", ERROR_TOPIC); |
| setProperty("threatintel_error_topic", ERROR_TOPIC); |
| |
| // enrichment |
| setProperty("enrichment_hbase_provider_impl", "" + MockHBaseTableProvider.class.getName()); |
| setProperty("enrichment_hbase_table", enrichmentsTableName); |
| setProperty("enrichment_hbase_cf", cf); |
| setProperty("enrichment_host_known_hosts", "[{\"ip\":\"10.1.128.236\", \"local\":\"YES\", \"type\":\"webserver\", \"asset_value\" : \"important\"}," + |
| "{\"ip\":\"10.1.128.237\", \"local\":\"UNKNOWN\", \"type\":\"unknown\", \"asset_value\" : \"important\"}," + |
| "{\"ip\":\"10.60.10.254\", \"local\":\"YES\", \"type\":\"printer\", \"asset_value\" : \"important\"}," + |
| "{\"ip\":\"10.0.2.15\", \"local\":\"YES\", \"type\":\"printer\", \"asset_value\" : \"important\"}]"); |
| |
| // threat intel |
| setProperty("threatintel_hbase_table", threatIntelTableName); |
| setProperty("threatintel_hbase_cf", cf); |
| |
| // parallelism |
| setProperty("unified_kafka_spout_parallelism", "1"); |
| setProperty("unified_enrichment_parallelism", "1"); |
| setProperty("unified_threat_intel_parallelism", "1"); |
| setProperty("unified_kafka_writer_parallelism", "1"); |
| |
| // caches |
| setProperty("unified_enrichment_cache_size", "1000"); |
| setProperty("unified_threat_intel_cache_size", "1000"); |
| |
| // threads |
| setProperty("unified_enrichment_threadpool_size", "1"); |
| setProperty("unified_enrichment_threadpool_type", "FIXED"); |
| }}; |
| } |
| |
| |
| @Test |
| public void test() throws Exception { |
| |
| final Properties topologyProperties = getTopologyProperties(); |
| final ZKServerComponent zkServerComponent = getZKServerComponent(topologyProperties); |
| final KafkaComponent kafkaComponent = getKafkaComponent(topologyProperties, new ArrayList<KafkaComponent.Topic>() {{ |
| add(new KafkaComponent.Topic(Constants.ENRICHMENT_TOPIC, 1)); |
| add(new KafkaComponent.Topic(Constants.INDEXING_TOPIC, 1)); |
| add(new KafkaComponent.Topic(ERROR_TOPIC, 1)); |
| }}); |
| String globalConfigStr = null; |
| { |
| File globalConfig = new File(enrichmentConfigPath, "global.json"); |
| Map<String, Object> config = JSONUtils.INSTANCE.load(globalConfig, JSONUtils.MAP_SUPPLIER); |
| config.put(SimpleHBaseEnrichmentFunctions.TABLE_PROVIDER_TYPE_CONF, MockHBaseTableProvider.class.getName()); |
| config.put(SimpleHBaseEnrichmentFunctions.ACCESS_TRACKER_TYPE_CONF, "PERSISTENT_BLOOM"); |
| config.put(PersistentBloomTrackerCreator.Config.PERSISTENT_BLOOM_TABLE, trackerHBaseTableName); |
| config.put(PersistentBloomTrackerCreator.Config.PERSISTENT_BLOOM_CF, cf); |
| config.put(GeoLiteCityDatabase.GEO_HDFS_FILE, geoHdfsFile.getAbsolutePath()); |
| config.put(GeoLiteAsnDatabase.ASN_HDFS_FILE, asnHdfsFile.getAbsolutePath()); |
| globalConfigStr = JSONUtils.INSTANCE.toJSON(config, true); |
| } |
| ConfigUploadComponent configUploadComponent = new ConfigUploadComponent() |
| .withTopologyProperties(topologyProperties) |
| .withGlobalConfig(globalConfigStr) |
| .withEnrichmentConfigsPath(enrichmentConfigPath); |
| |
| //create MockHBaseTables |
| final MockHTable trackerTable = (MockHTable) MockHBaseTableProvider.addToCache(trackerHBaseTableName, cf); |
| final MockHTable threatIntelTable = (MockHTable) MockHBaseTableProvider.addToCache(threatIntelTableName, cf); |
| EnrichmentHelper.INSTANCE.load(threatIntelTable, cf, new ArrayList<LookupKV<EnrichmentKey, EnrichmentValue>>() {{ |
| add(new LookupKV<>(new EnrichmentKey(MALICIOUS_IP_TYPE, "10.0.2.3"), new EnrichmentValue(new HashMap<>()))); |
| }}); |
| final MockHTable enrichmentTable = (MockHTable) MockHBaseTableProvider.addToCache(enrichmentsTableName, cf); |
| EnrichmentHelper.INSTANCE.load(enrichmentTable, cf, new ArrayList<LookupKV<EnrichmentKey, EnrichmentValue>>() {{ |
| add(new LookupKV<>(new EnrichmentKey(PLAYFUL_CLASSIFICATION_TYPE, "10.0.2.3") |
| , new EnrichmentValue(PLAYFUL_ENRICHMENT) |
| ) |
| ); |
| }}); |
| |
| FluxTopologyComponent fluxComponent = new FluxTopologyComponent.Builder() |
| .withTopologyLocation(new File(fluxPath())) |
| .withTopologyName("test") |
| .withTemplateLocation(new File(getTemplatePath())) |
| .withTopologyProperties(topologyProperties) |
| .build(); |
| |
| |
| //UnitTestHelper.verboseLogging(); |
| ComponentRunner runner = new ComponentRunner.Builder() |
| .withComponent("zk",zkServerComponent) |
| .withComponent("kafka", kafkaComponent) |
| .withComponent("config", configUploadComponent) |
| .withComponent("storm", fluxComponent) |
| .withMillisecondsBetweenAttempts(15000) |
| .withCustomShutdownOrder(new String[]{"storm","config","kafka","zk"}) |
| .withNumRetries(10) |
| .build(); |
| |
| try { |
| runner.start(); |
| fluxComponent.submitTopology(); |
| |
| kafkaComponent.writeMessages(Constants.ENRICHMENT_TOPIC, inputMessages); |
| ProcessorResult<Map<String, List<Map<String, Object>>>> result = runner.process(getProcessor()); |
| Map<String,List<Map<String, Object>>> outputMessages = result.getResult(); |
| List<Map<String, Object>> docs = outputMessages.get(Constants.INDEXING_TOPIC); |
| assertEquals(inputMessages.size(), docs.size()); |
| validateAll(docs); |
| List<Map<String, Object>> errors = outputMessages.get(ERROR_TOPIC); |
| assertEquals(inputMessages.size(), errors.size()); |
| validateErrors(errors); |
| } finally { |
| runner.stop(); |
| } |
| } |
| |
| public void dumpParsedMessages(List<Map<String,Object>> outputMessages, StringBuffer buffer) { |
| for (Map<String,Object> map : outputMessages) { |
| for( String json : map.keySet()) { |
| buffer.append(json).append("\n"); |
| } |
| } |
| } |
| |
| public static void validateAll(List<Map<String, Object>> docs) { |
| |
| for (Map<String, Object> doc : docs) { |
| baseValidation(doc); |
| hostEnrichmentValidation(doc); |
| geoEnrichmentValidation(doc); |
| threatIntelValidation(doc); |
| simpleEnrichmentValidation(doc); |
| } |
| } |
| |
| protected void validateErrors(List<Map<String, Object>> errors) { |
| for(Map<String, Object> error : errors) { |
| assertTrue(error.get(Constants.ErrorFields.MESSAGE.getName()).toString().contains("/ by zero"), error.get(Constants.ErrorFields.MESSAGE.getName()).toString()); |
| assertTrue(error.get(Constants.ErrorFields.EXCEPTION.getName()).toString().contains("/ by zero")); |
| assertEquals(Constants.ErrorType.ENRICHMENT_ERROR.getType(), error.get(Constants.ErrorFields.ERROR_TYPE.getName())); |
| assertEquals("{\"error_test\":{},\"source.type\":\"test\"}", error.get(Constants.ErrorFields.RAW_MESSAGE.getName())); |
| } |
| } |
| |
| public static void baseValidation(Map<String, Object> jsonDoc) { |
| assertEnrichmentsExists("threatintels.", setOf("hbaseThreatIntel"), jsonDoc.keySet()); |
| assertEnrichmentsExists("enrichments.", setOf("geo", "host", "hbaseEnrichment" ), jsonDoc.keySet()); |
| |
| //ensure no values are empty |
| for(Map.Entry<String, Object> kv : jsonDoc.entrySet()) { |
| String actual = Objects.toString(kv.getValue(), ""); |
| assertTrue(StringUtils.isNotEmpty(actual), String.format("Value of '%s' is empty: '%s'", kv.getKey(), actual)); |
| } |
| |
| //ensure we always have a source ip and destination ip |
| assertNotNull(jsonDoc.get(SRC_IP)); |
| assertNotNull(jsonDoc.get("ALL_CAPS")); |
| assertNotNull(jsonDoc.get("map.blah")); |
| assertNull(jsonDoc.get("map")); |
| assertNotNull(jsonDoc.get("one")); |
| assertEquals(1, jsonDoc.get("one")); |
| assertEquals(1, jsonDoc.get("map.blah")); |
| assertNotNull(jsonDoc.get("foo")); |
| assertNotNull(jsonDoc.get("alt_src_type")); |
| assertEquals("test", jsonDoc.get("alt_src_type")); |
| assertEquals("TEST", jsonDoc.get("ALL_CAPS")); |
| assertNotNull(jsonDoc.get("bar")); |
| assertEquals("TEST", jsonDoc.get("bar")); |
| } |
| |
| private static class EvaluationPayload { |
| Map<String, Object> indexedDoc; |
| String key; |
| public EvaluationPayload(Map<String, Object> indexedDoc, String key) { |
| this.indexedDoc = indexedDoc; |
| this.key = key; |
| } |
| } |
| |
| private static enum HostEnrichments implements Predicate<EvaluationPayload>{ |
| |
| LOCAL_LOCATION(new Predicate<EvaluationPayload>() { |
| |
| @Override |
| public boolean apply(@Nullable EvaluationPayload evaluationPayload) { |
| |
| return evaluationPayload.indexedDoc.getOrDefault("enrichments.host." + evaluationPayload.key + ".known_info.local","").equals("YES"); |
| |
| } |
| }) |
| |
| ,UNKNOWN_LOCATION(new Predicate<EvaluationPayload>() { |
| |
| @Override |
| public boolean apply(@Nullable EvaluationPayload evaluationPayload) { |
| return evaluationPayload.indexedDoc.getOrDefault("enrichments.host." + evaluationPayload.key + ".known_info.local","").equals("UNKNOWN"); |
| } |
| }) |
| ,IMPORTANT(new Predicate<EvaluationPayload>() { |
| @Override |
| public boolean apply(@Nullable EvaluationPayload evaluationPayload) { |
| return evaluationPayload.indexedDoc.getOrDefault("enrichments.host." + evaluationPayload.key + ".known_info.asset_value","").equals("important"); |
| } |
| }) |
| ,PRINTER_TYPE(new Predicate<EvaluationPayload>() { |
| @Override |
| public boolean apply(@Nullable EvaluationPayload evaluationPayload) { |
| return evaluationPayload.indexedDoc.getOrDefault("enrichments.host." + evaluationPayload.key + ".known_info.type","").equals("printer"); |
| } |
| }) |
| ,WEBSERVER_TYPE(new Predicate<EvaluationPayload>() { |
| @Override |
| public boolean apply(@Nullable EvaluationPayload evaluationPayload) { |
| return evaluationPayload.indexedDoc.getOrDefault("enrichments.host." + evaluationPayload.key + ".known_info.type","").equals("webserver"); |
| } |
| }) |
| ,UNKNOWN_TYPE(new Predicate<EvaluationPayload>() { |
| @Override |
| public boolean apply(@Nullable EvaluationPayload evaluationPayload) { |
| return evaluationPayload.indexedDoc.getOrDefault("enrichments.host." + evaluationPayload.key + ".known_info.type","").equals("unknown"); |
| } |
| }) |
| ; |
| |
| Predicate<EvaluationPayload> _predicate; |
| HostEnrichments(Predicate<EvaluationPayload> predicate) { |
| this._predicate = predicate; |
| } |
| |
| @Override |
| public boolean apply(EvaluationPayload payload) { |
| return _predicate.apply(payload); |
| } |
| |
| } |
| |
| private static void assertEnrichmentsExists(String topLevel, Set<String> expectedEnrichments, Set<String> keys) { |
| for(String key : keys) { |
| if(key.startsWith(topLevel)) { |
| String secondLevel = Iterables.get(Splitter.on(".").split(key), 1); |
| String message = "Found an enrichment/threat intel (" + secondLevel + ") that I didn't expect (expected enrichments :" |
| + Joiner.on(",").join(expectedEnrichments) + "), but it was not there. If you've created a new" |
| + " enrichment, then please add a validation method to this unit test. Otherwise, it's a solid error" |
| + " and should be investigated."; |
| assertTrue(expectedEnrichments.contains(secondLevel), message); |
| } |
| } |
| } |
| private static void simpleEnrichmentValidation(Map<String, Object> indexedDoc) { |
| if(indexedDoc.getOrDefault(SRC_IP,"").equals("10.0.2.3") |
| || indexedDoc.getOrDefault(DST_IP,"").equals("10.0.2.3") |
| ) { |
| assertTrue(keyPatternExists("enrichments.hbaseEnrichment", indexedDoc)); |
| if(indexedDoc.getOrDefault(SRC_IP,"").equals("10.0.2.3")) { |
| assertEquals(indexedDoc.get("enrichments.hbaseEnrichment." + SRC_IP + "." + PLAYFUL_CLASSIFICATION_TYPE+ ".orientation") |
| , PLAYFUL_ENRICHMENT.get("orientation") |
| ); |
| assertEquals(indexedDoc.get("src_classification.orientation") |
| , PLAYFUL_ENRICHMENT.get("orientation")); |
| assertEquals(indexedDoc.get("is_src_malicious") |
| , true); |
| } |
| else if(indexedDoc.getOrDefault(DST_IP,"").equals("10.0.2.3")) { |
| assertEquals( indexedDoc.get("enrichments.hbaseEnrichment." + DST_IP + "." + PLAYFUL_CLASSIFICATION_TYPE + ".orientation") |
| , PLAYFUL_ENRICHMENT.get("orientation") |
| ); |
| assertEquals(indexedDoc.get("dst_classification.orientation") |
| , PLAYFUL_ENRICHMENT.get("orientation")); |
| |
| } |
| if(!indexedDoc.getOrDefault(SRC_IP,"").equals("10.0.2.3")) { |
| assertEquals(indexedDoc.get("is_src_malicious") |
| , false); |
| } |
| } |
| else { |
| assertEquals(indexedDoc.get("is_src_malicious") |
| , false); |
| } |
| } |
| private static void threatIntelValidation(Map<String, Object> indexedDoc) { |
| if(indexedDoc.getOrDefault(SRC_IP,"").equals("10.0.2.3") || |
| indexedDoc.getOrDefault(DST_IP,"").equals("10.0.2.3")) { |
| |
| //if we have any threat intel messages, we want to tag is_alert to true |
| assertTrue(keyPatternExists("threatintels.", indexedDoc)); |
| assertEquals(indexedDoc.getOrDefault("is_alert",""), "true"); |
| |
| // validate threat triage score |
| assertTrue(indexedDoc.containsKey(ThreatIntelUtils.THREAT_TRIAGE_SCORE_KEY)); |
| Double score = (Double) indexedDoc.get(ThreatIntelUtils.THREAT_TRIAGE_SCORE_KEY); |
| assertEquals(score, 10d, 1e-7); |
| |
| // validate threat triage rules |
| Joiner joiner = Joiner.on("."); |
| Stream.of( |
| joiner.join(ThreatIntelUtils.THREAT_TRIAGE_RULES_KEY, 0, ThreatIntelUtils.THREAT_TRIAGE_RULE_NAME), |
| joiner.join(ThreatIntelUtils.THREAT_TRIAGE_RULES_KEY, 0, ThreatIntelUtils.THREAT_TRIAGE_RULE_COMMENT), |
| joiner.join(ThreatIntelUtils.THREAT_TRIAGE_RULES_KEY, 0, ThreatIntelUtils.THREAT_TRIAGE_RULE_REASON), |
| joiner.join(ThreatIntelUtils.THREAT_TRIAGE_RULES_KEY, 0, ThreatIntelUtils.THREAT_TRIAGE_RULE_SCORE)) |
| .forEach(key -> |
| assertTrue(indexedDoc.containsKey(key), String.format("Missing expected key: '%s'", key))); |
| } |
| else { |
| //For YAF this is the case, but if we do snort later on, this will be invalid. |
| assertNull(indexedDoc.get("is_alert")); |
| assertFalse(keyPatternExists("threatintels.", indexedDoc)); |
| } |
| |
| //ip threat intels |
| if(keyPatternExists("threatintels.hbaseThreatIntel.", indexedDoc)) { |
| if(indexedDoc.getOrDefault(SRC_IP,"").equals("10.0.2.3")) { |
| assertEquals(indexedDoc.get("threatintels.hbaseThreatIntel." + SRC_IP + "." + MALICIOUS_IP_TYPE), "alert"); |
| } |
| else if(indexedDoc.getOrDefault(DST_IP,"").equals("10.0.2.3")) { |
| assertEquals(indexedDoc.get("threatintels.hbaseThreatIntel." + DST_IP + "." + MALICIOUS_IP_TYPE), "alert"); |
| } |
| else { |
| fail("There was a threat intels that I did not expect: " + indexedDoc); |
| } |
| } |
| |
| } |
| |
| private static void geoEnrichmentValidation(Map<String, Object> indexedDoc) { |
| // Need to check both separately. Local IPs will have no Geo entries |
| if(indexedDoc.containsKey("enrichments.geo." + DST_IP + ".location_point")) { |
| assertEquals(DEFAULT_LOCATION_POINT, indexedDoc.get("enrichments.geo." + DST_IP + ".location_point")); |
| assertEquals(DEFAULT_LONGITUDE, indexedDoc.get("enrichments.geo." + DST_IP + ".longitude")); |
| assertEquals(DEFAULT_CITY, indexedDoc.get("enrichments.geo." + DST_IP + ".city")); |
| assertEquals(DEFAULT_LATITUDE, indexedDoc.get("enrichments.geo." + DST_IP + ".latitude")); |
| assertEquals(DEFAULT_COUNTRY, indexedDoc.get("enrichments.geo." + DST_IP + ".country")); |
| assertEquals(DEFAULT_DMACODE, indexedDoc.get("enrichments.geo." + DST_IP + ".dmaCode")); |
| assertEquals(DEFAULT_POSTAL_CODE, indexedDoc.get("enrichments.geo." + DST_IP + ".postalCode")); |
| } |
| if(indexedDoc.containsKey("enrichments.geo." + SRC_IP + ".location_point")) { |
| assertEquals(DEFAULT_LOCATION_POINT, indexedDoc.get("enrichments.geo." + SRC_IP + ".location_point")); |
| assertEquals(DEFAULT_LONGITUDE, indexedDoc.get("enrichments.geo." + SRC_IP + ".longitude")); |
| assertEquals(DEFAULT_CITY, indexedDoc.get("enrichments.geo." + SRC_IP + ".city")); |
| assertEquals(DEFAULT_LATITUDE, indexedDoc.get("enrichments.geo." + SRC_IP + ".latitude")); |
| assertEquals(DEFAULT_COUNTRY, indexedDoc.get("enrichments.geo." + SRC_IP + ".country")); |
| assertEquals(DEFAULT_DMACODE, indexedDoc.get("enrichments.geo." + SRC_IP + ".dmaCode")); |
| assertEquals(DEFAULT_POSTAL_CODE, indexedDoc.get("enrichments.geo." + SRC_IP + ".postalCode")); |
| } |
| } |
| |
| private static void hostEnrichmentValidation(Map<String, Object> indexedDoc) { |
| boolean enriched = false; |
| //important local printers |
| { |
| Set<String> ips = setOf("10.0.2.15", "10.60.10.254"); |
| if (ips.contains(indexedDoc.get(SRC_IP))) { |
| //this is a local, important, printer |
| assertTrue(Predicates.and(HostEnrichments.LOCAL_LOCATION |
| ,HostEnrichments.IMPORTANT |
| ,HostEnrichments.PRINTER_TYPE |
| ).apply(new EvaluationPayload(indexedDoc, SRC_IP)) |
| ); |
| enriched = true; |
| } |
| if (ips.contains(indexedDoc.get(DST_IP))) { |
| boolean isEnriched = Predicates.and(HostEnrichments.LOCAL_LOCATION |
| ,HostEnrichments.IMPORTANT |
| ,HostEnrichments.PRINTER_TYPE |
| ).apply(new EvaluationPayload(indexedDoc, DST_IP)); |
| assertTrue(isEnriched); |
| enriched = true; |
| } |
| } |
| //important local webservers |
| { |
| Set<String> ips = setOf("10.1.128.236"); |
| if (ips.contains(indexedDoc.get(SRC_IP))) { |
| //this is a local, important, printer |
| assertTrue(Predicates.and(HostEnrichments.LOCAL_LOCATION |
| ,HostEnrichments.IMPORTANT |
| ,HostEnrichments.WEBSERVER_TYPE |
| ).apply(new EvaluationPayload(indexedDoc, SRC_IP)) |
| ); |
| enriched = true; |
| } |
| if (ips.contains(indexedDoc.get(DST_IP))) { |
| boolean isEnriched = Predicates.and(HostEnrichments.LOCAL_LOCATION |
| ,HostEnrichments.IMPORTANT |
| ,HostEnrichments.WEBSERVER_TYPE |
| ).apply(new EvaluationPayload(indexedDoc, DST_IP)); |
| assertTrue(isEnriched); |
| enriched = true; |
| } |
| } |
| if(!enriched) { |
| assertFalse(keyPatternExists("enrichments.host", indexedDoc)); |
| } |
| } |
| |
| |
| private static boolean keyPatternExists(String pattern, Map<String, Object> indexedObj) { |
| for(String k : indexedObj.keySet()) { |
| if(k.startsWith(pattern)) { |
| return true; |
| } |
| } |
| return false; |
| } |
| private static Set<String> setOf(String... items) { |
| Set<String> ret = new HashSet<>(); |
| for(String item : items) { |
| ret.add(item); |
| } |
| return ret; |
| } |
| |
| private static List<Map<String, Object>> loadMessages(List<byte[]> outputMessages) { |
| List<Map<String, Object>> tmp = new ArrayList<>(); |
| Iterables.addAll(tmp |
| , Iterables.transform(outputMessages |
| , message -> { |
| try { |
| return new HashMap<>(JSONUtils.INSTANCE.load(new String(message, |
| StandardCharsets.UTF_8) |
| , JSONUtils.MAP_SUPPLIER |
| ) |
| ); |
| } catch (Exception ex) { |
| throw new IllegalStateException(ex); |
| } |
| } |
| ) |
| ); |
| return tmp; |
| } |
| @SuppressWarnings("unchecked") |
| private KafkaProcessor<Map<String,List<Map<String, Object>>>> getProcessor(){ |
| |
| return new KafkaProcessor<>() |
| .withKafkaComponentName("kafka") |
| .withReadTopic(Constants.INDEXING_TOPIC) |
| .withErrorTopic(ERROR_TOPIC) |
| .withValidateReadMessages(new Function<KafkaMessageSet, Boolean>() { |
| @Nullable |
| @Override |
| public Boolean apply(@Nullable KafkaMessageSet messageSet) { |
| return (messageSet.getMessages().size() == inputMessages.size()) && (messageSet.getErrors().size() == inputMessages.size()); |
| } |
| }) |
| .withProvideResult(new Function<KafkaMessageSet,Map<String,List<Map<String, Object>>>>(){ |
| @Nullable |
| @Override |
| public Map<String,List<Map<String, Object>>> apply(@Nullable KafkaMessageSet messageSet) { |
| return new HashMap<String, List<Map<String, Object>>>() {{ |
| put(Constants.INDEXING_TOPIC, loadMessages(messageSet.getMessages())); |
| put(ERROR_TOPIC, loadMessages(messageSet.getErrors())); |
| }}; |
| } |
| }); |
| } |
| } |