| /** |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package org.apache.metron.integration; |
| |
| import com.fasterxml.jackson.core.type.TypeReference; |
| import com.google.common.base.*; |
| |
| import com.google.common.collect.Iterables; |
| import org.apache.hadoop.conf.Configuration; |
| import org.apache.hadoop.hbase.client.HTableInterface; |
| import org.apache.metron.common.Constants; |
| import org.apache.metron.TestConstants; |
| import org.apache.metron.common.configuration.Configurations; |
| import org.apache.metron.common.interfaces.FieldNameConverter; |
| import org.apache.metron.common.configuration.EnrichmentConfigurations; |
| import org.apache.metron.hbase.TableProvider; |
| import org.apache.metron.enrichment.converter.EnrichmentKey; |
| import org.apache.metron.enrichment.converter.EnrichmentValue; |
| import org.apache.metron.enrichment.converter.EnrichmentHelper; |
| import org.apache.metron.integration.components.ConfigUploadComponent; |
| import org.apache.metron.integration.utils.TestUtils; |
| import org.apache.metron.test.utils.UnitTestHelper; |
| import org.apache.metron.integration.components.FluxTopologyComponent; |
| import org.apache.metron.integration.components.KafkaWithZKComponent; |
| import org.apache.metron.integration.mock.MockGeoAdapter; |
| import org.apache.metron.test.mock.MockHTable; |
| import org.apache.metron.enrichment.lookup.LookupKV; |
| |
| import org.apache.metron.integration.utils.SampleUtil; |
| import org.apache.metron.common.utils.JSONUtils; |
| import org.junit.Assert; |
| import org.junit.Test; |
| |
| import javax.annotation.Nullable; |
| import java.io.File; |
| import java.io.IOException; |
| import java.io.Serializable; |
| import java.util.ArrayList; |
| import java.util.HashMap; |
| import java.util.HashSet; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.Properties; |
| import java.util.Set; |
| import java.util.Stack; |
| |
| public abstract class EnrichmentIntegrationTest extends BaseIntegrationTest { |
| |
| private static final String SRC_IP = "ip_src_addr"; |
| private static final String DST_IP = "ip_dst_addr"; |
| private static final String MALICIOUS_IP_TYPE = "malicious_ip"; |
| private static final String PLAYFUL_CLASSIFICATION_TYPE = "playful_classification"; |
| private static final Map<String, String> PLAYFUL_ENRICHMENT = new HashMap<String, String>() {{ |
| put("orientation", "north"); |
| }}; |
| protected String testSensorType = "test"; |
| protected String hdfsDir = "target/enrichmentIntegrationTest/hdfs"; |
| protected String fluxPath = "../metron-enrichment/src/main/flux/enrichment/test.yaml"; |
| protected String sampleParsedPath = TestConstants.SAMPLE_DATA_PARSED_PATH + "TestExampleParsed"; |
| private String sampleIndexedPath = TestConstants.SAMPLE_DATA_INDEXED_PATH + "TestIndexed"; |
| |
| public static class Provider implements TableProvider, Serializable { |
| MockHTable.Provider provider = new MockHTable.Provider(); |
| @Override |
| public HTableInterface getTable(Configuration config, String tableName) throws IOException { |
| return provider.getTable(config, tableName); |
| } |
| } |
| |
| public static void cleanHdfsDir(String hdfsDirStr) { |
| File hdfsDir = new File(hdfsDirStr); |
| Stack<File> fs = new Stack<>(); |
| if(hdfsDir.exists()) { |
| fs.push(hdfsDir); |
| while(!fs.empty()) { |
| File f = fs.pop(); |
| if (f.isDirectory()) { |
| for(File child : f.listFiles()) { |
| fs.push(child); |
| } |
| } |
| else { |
| if (f.getName().startsWith("enrichment") || f.getName().endsWith(".json")) { |
| f.delete(); |
| } |
| } |
| } |
| } |
| } |
| |
| public static List<Map<String, Object> > readDocsFromDisk(String hdfsDirStr) throws IOException { |
| List<Map<String, Object>> ret = new ArrayList<>(); |
| File hdfsDir = new File(hdfsDirStr); |
| Stack<File> fs = new Stack<>(); |
| if(hdfsDir.exists()) { |
| fs.push(hdfsDir); |
| while(!fs.empty()) { |
| File f = fs.pop(); |
| if(f.isDirectory()) { |
| for (File child : f.listFiles()) { |
| fs.push(child); |
| } |
| } |
| else { |
| System.out.println("Processed " + f); |
| if (f.getName().startsWith("enrichment") || f.getName().endsWith(".json")) { |
| List<byte[]> data = TestUtils.readSampleData(f.getPath()); |
| Iterables.addAll(ret, Iterables.transform(data, new Function<byte[], Map<String, Object>>() { |
| @Nullable |
| @Override |
| public Map<String, Object> apply(@Nullable byte[] bytes) { |
| String s = new String(bytes); |
| try { |
| return JSONUtils.INSTANCE.load(s, new TypeReference<Map<String, Object>>() { |
| }); |
| } catch (IOException e) { |
| throw new RuntimeException(e); |
| } |
| } |
| })); |
| } |
| } |
| } |
| } |
| return ret; |
| } |
| |
| |
| @Test |
| public void test() throws Exception { |
| cleanHdfsDir(hdfsDir); |
| final EnrichmentConfigurations configurations = SampleUtil.getSampleEnrichmentConfigs(); |
| final String dateFormat = "yyyy.MM.dd.HH"; |
| final List<byte[]> inputMessages = TestUtils.readSampleData(sampleParsedPath); |
| final String cf = "cf"; |
| final String trackerHBaseTableName = "tracker"; |
| final String threatIntelTableName = "threat_intel"; |
| final String enrichmentsTableName = "enrichments"; |
| final Properties topologyProperties = new Properties() {{ |
| setProperty("org.apache.metron.enrichment.host.known_hosts", "[{\"ip\":\"10.1.128.236\", \"local\":\"YES\", \"type\":\"webserver\", \"asset_value\" : \"important\"},\n" + |
| "{\"ip\":\"10.1.128.237\", \"local\":\"UNKNOWN\", \"type\":\"unknown\", \"asset_value\" : \"important\"},\n" + |
| "{\"ip\":\"10.60.10.254\", \"local\":\"YES\", \"type\":\"printer\", \"asset_value\" : \"important\"},\n" + |
| "{\"ip\":\"10.0.2.15\", \"local\":\"YES\", \"type\":\"printer\", \"asset_value\" : \"important\"}]"); |
| setProperty("hbase.provider.impl","" + Provider.class.getName()); |
| setProperty("threat.intel.tracker.table", trackerHBaseTableName); |
| setProperty("threat.intel.tracker.cf", cf); |
| setProperty("threat.intel.simple.hbase.table", threatIntelTableName); |
| setProperty("threat.intel.simple.hbase.cf", cf); |
| setProperty("enrichment.simple.hbase.table", enrichmentsTableName); |
| setProperty("enrichment.simple.hbase.cf", cf); |
| setProperty("es.clustername", "metron"); |
| setProperty("es.port", "9300"); |
| setProperty("es.ip", "localhost"); |
| setProperty("index.date.format", dateFormat); |
| setProperty("index.hdfs.output", hdfsDir); |
| }}; |
| setAdditionalProperties(topologyProperties); |
| final KafkaWithZKComponent kafkaComponent = getKafkaComponent(topologyProperties, new ArrayList<KafkaWithZKComponent.Topic>() {{ |
| add(new KafkaWithZKComponent.Topic(Constants.ENRICHMENT_TOPIC, 1)); |
| }}); |
| |
| ConfigUploadComponent configUploadComponent = new ConfigUploadComponent() |
| .withTopologyProperties(topologyProperties) |
| .withGlobalConfigsPath(TestConstants.SAMPLE_CONFIG_PATH) |
| .withEnrichmentConfigsPath(TestConstants.SAMPLE_CONFIG_PATH); |
| |
| //create MockHBaseTables |
| final MockHTable trackerTable = (MockHTable)MockHTable.Provider.addToCache(trackerHBaseTableName, cf); |
| final MockHTable threatIntelTable = (MockHTable)MockHTable.Provider.addToCache(threatIntelTableName, cf); |
| EnrichmentHelper.INSTANCE.load(threatIntelTable, cf, new ArrayList<LookupKV<EnrichmentKey, EnrichmentValue>>(){{ |
| add(new LookupKV<>(new EnrichmentKey(MALICIOUS_IP_TYPE, "10.0.2.3"), new EnrichmentValue(new HashMap<String, String>()))); |
| }}); |
| final MockHTable enrichmentTable = (MockHTable)MockHTable.Provider.addToCache(enrichmentsTableName, cf); |
| EnrichmentHelper.INSTANCE.load(enrichmentTable, cf, new ArrayList<LookupKV<EnrichmentKey, EnrichmentValue>>(){{ |
| add(new LookupKV<>(new EnrichmentKey(PLAYFUL_CLASSIFICATION_TYPE, "10.0.2.3") |
| , new EnrichmentValue(PLAYFUL_ENRICHMENT ) |
| ) |
| ); |
| }}); |
| FluxTopologyComponent fluxComponent = new FluxTopologyComponent.Builder() |
| .withTopologyLocation(new File(fluxPath)) |
| .withTopologyName("test") |
| .withTopologyProperties(topologyProperties) |
| .build(); |
| |
| InMemoryComponent searchComponent = getSearchComponent(topologyProperties); |
| |
| UnitTestHelper.verboseLogging(); |
| ComponentRunner runner = new ComponentRunner.Builder() |
| .withComponent("kafka", kafkaComponent) |
| .withComponent("config", configUploadComponent) |
| .withComponent("search", searchComponent) |
| .withComponent("storm", fluxComponent) |
| .withMillisecondsBetweenAttempts(10000) |
| .withNumRetries(10) |
| .build(); |
| runner.start(); |
| |
| try { |
| fluxComponent.submitTopology(); |
| |
| kafkaComponent.writeMessages(Constants.ENRICHMENT_TOPIC, inputMessages); |
| List<Map<String, Object>> docs = runner.process(getProcessor(inputMessages)); |
| Assert.assertEquals(inputMessages.size(), docs.size()); |
| List<Map<String, Object>> cleanedDocs = cleanDocs(docs); |
| validateAll(cleanedDocs, getFieldNameConverter()); |
| |
| |
| List<Map<String, Object>> docsFromDisk = readDocsFromDisk(hdfsDir); |
| Assert.assertEquals(docsFromDisk.size(), docs.size()) ; |
| Assert.assertEquals(new File(hdfsDir).list().length, 1); |
| Assert.assertEquals(new File(hdfsDir).list()[0], testSensorType); |
| //we want the identity transformation when dealing with docs on disk. |
| validateAll(docsFromDisk, fieldName -> fieldName); |
| } |
| finally { |
| cleanHdfsDir(hdfsDir); |
| runner.stop(); |
| } |
| } |
| |
| public List<Map<String, Object>> cleanDocs(List<Map<String, Object>> docs) { |
| List<Map<String, Object>> cleanedDocs = new ArrayList<>(); |
| for(Map<String, Object> doc: docs) { |
| Map<String, Object> cleanedFields = new HashMap<>(); |
| for(String field: doc.keySet()) { |
| cleanedFields.put(cleanField(field), doc.get(field)); |
| } |
| cleanedDocs.add(cleanedFields); |
| } |
| return cleanedDocs; |
| } |
| |
| public static void validateAll(List<Map<String, Object>> docs, FieldNameConverter fnc) { |
| |
| for (Map<String, Object> doc : docs) { |
| baseValidation(doc, fnc); |
| hostEnrichmentValidation(doc, fnc); |
| geoEnrichmentValidation(doc, fnc); |
| threatIntelValidation(doc, fnc); |
| simpleEnrichmentValidation(doc, fnc); |
| } |
| } |
| |
| public static void baseValidation(Map<String, Object> jsonDoc, FieldNameConverter fnc) { |
| assertEnrichmentsExists("threatintels.", setOf("hbaseThreatIntel"), jsonDoc.keySet()); |
| assertEnrichmentsExists("enrichments.", setOf("geo", "host", "hbaseEnrichment" ), jsonDoc.keySet()); |
| for(Map.Entry<String, Object> kv : jsonDoc.entrySet()) { |
| //ensure no values are empty. |
| Assert.assertTrue(kv.getValue().toString().length() > 0); |
| } |
| //ensure we always have a source ip and destination ip |
| Assert.assertNotNull(jsonDoc.get(SRC_IP)); |
| Assert.assertNotNull(jsonDoc.get(DST_IP)); |
| } |
| |
| private static class EvaluationPayload { |
| Map<String, Object> indexedDoc; |
| String key; |
| FieldNameConverter fnc; |
| public EvaluationPayload(Map<String, Object> indexedDoc, String key, FieldNameConverter fnc) { |
| this.indexedDoc = indexedDoc; |
| this.key = key; |
| this.fnc = fnc; |
| } |
| } |
| |
| private static enum HostEnrichments implements Predicate<EvaluationPayload>{ |
| |
| LOCAL_LOCATION(new Predicate<EvaluationPayload>() { |
| |
| @Override |
| public boolean apply(@Nullable EvaluationPayload evaluationPayload) { |
| |
| return evaluationPayload.indexedDoc.getOrDefault(evaluationPayload.fnc.convert("enrichments.host." + evaluationPayload.key + ".known_info.local"),"").equals("YES"); |
| |
| } |
| }) |
| |
| ,UNKNOWN_LOCATION(new Predicate<EvaluationPayload>() { |
| |
| @Override |
| public boolean apply(@Nullable EvaluationPayload evaluationPayload) { |
| return evaluationPayload.indexedDoc.getOrDefault(evaluationPayload.fnc.convert("enrichments.host." + evaluationPayload.key + ".known_info.local"),"").equals("UNKNOWN"); |
| } |
| }) |
| ,IMPORTANT(new Predicate<EvaluationPayload>() { |
| @Override |
| public boolean apply(@Nullable EvaluationPayload evaluationPayload) { |
| return evaluationPayload.indexedDoc.getOrDefault(evaluationPayload.fnc.convert("enrichments.host." + evaluationPayload.key + ".known_info.asset_value"),"").equals("important"); |
| } |
| }) |
| ,PRINTER_TYPE(new Predicate<EvaluationPayload>() { |
| @Override |
| public boolean apply(@Nullable EvaluationPayload evaluationPayload) { |
| return evaluationPayload.indexedDoc.getOrDefault(evaluationPayload.fnc.convert("enrichments.host." + evaluationPayload.key + ".known_info.type"),"").equals("printer"); |
| } |
| }) |
| ,WEBSERVER_TYPE(new Predicate<EvaluationPayload>() { |
| @Override |
| public boolean apply(@Nullable EvaluationPayload evaluationPayload) { |
| return evaluationPayload.indexedDoc.getOrDefault(evaluationPayload.fnc.convert("enrichments.host." + evaluationPayload.key + ".known_info.type"),"").equals("webserver"); |
| } |
| }) |
| ,UNKNOWN_TYPE(new Predicate<EvaluationPayload>() { |
| @Override |
| public boolean apply(@Nullable EvaluationPayload evaluationPayload) { |
| return evaluationPayload.indexedDoc.getOrDefault(evaluationPayload.fnc.convert("enrichments.host." + evaluationPayload.key + ".known_info.type"),"").equals("unknown"); |
| } |
| }) |
| ; |
| |
| Predicate<EvaluationPayload> _predicate; |
| HostEnrichments(Predicate<EvaluationPayload> predicate) { |
| this._predicate = predicate; |
| } |
| |
| public boolean apply(EvaluationPayload payload) { |
| return _predicate.apply(payload); |
| } |
| |
| } |
| |
| private static void assertEnrichmentsExists(String topLevel, Set<String> expectedEnrichments, Set<String> keys) { |
| for(String key : keys) { |
| if(key.startsWith(topLevel)) { |
| String secondLevel = Iterables.get(Splitter.on(".").split(key), 1); |
| String message = "Found an enrichment/threat intel (" + secondLevel + ") that I didn't expect (expected enrichments :" |
| + Joiner.on(",").join(expectedEnrichments) + "), but it was not there. If you've created a new" |
| + " enrichment, then please add a validation method to this unit test. Otherwise, it's a solid error" |
| + " and should be investigated."; |
| Assert.assertTrue( message, expectedEnrichments.contains(secondLevel)); |
| } |
| } |
| } |
| private static void simpleEnrichmentValidation(Map<String, Object> indexedDoc, FieldNameConverter fnc) { |
| if(indexedDoc.getOrDefault(fnc.convert(SRC_IP),"").equals("10.0.2.3") |
| || indexedDoc.getOrDefault(fnc.convert(DST_IP),"").equals("10.0.2.3") |
| ) { |
| Assert.assertTrue(keyPatternExists(fnc.convert("enrichments.hbaseEnrichment"), indexedDoc)); |
| if(indexedDoc.getOrDefault(fnc.convert(SRC_IP),"").equals("10.0.2.3")) { |
| Assert.assertEquals(indexedDoc.get(fnc.convert("enrichments.hbaseEnrichment." + SRC_IP + "." + PLAYFUL_CLASSIFICATION_TYPE+ ".orientation")) |
| , PLAYFUL_ENRICHMENT.get("orientation") |
| ); |
| } |
| else if(indexedDoc.getOrDefault(fnc.convert(DST_IP),"").equals("10.0.2.3")) { |
| Assert.assertEquals( indexedDoc.get(fnc.convert("enrichments.hbaseEnrichment." + DST_IP + "." + PLAYFUL_CLASSIFICATION_TYPE + ".orientation")) |
| , PLAYFUL_ENRICHMENT.get("orientation") |
| ); |
| } |
| } |
| |
| } |
| private static void threatIntelValidation(Map<String, Object> indexedDoc, FieldNameConverter fnc) { |
| if(indexedDoc.getOrDefault(fnc.convert(SRC_IP),"").equals("10.0.2.3") |
| || indexedDoc.getOrDefault(fnc.convert(DST_IP),"").equals("10.0.2.3") |
| ) { |
| //if we have any threat intel messages, we want to tag is_alert to true |
| Assert.assertTrue(keyPatternExists(fnc.convert("threatintels."), indexedDoc)); |
| Assert.assertTrue(indexedDoc.containsKey(fnc.convert("threat.triage.level"))); |
| Assert.assertEquals(indexedDoc.getOrDefault(fnc.convert("is_alert"),""), "true"); |
| Assert.assertEquals((double)indexedDoc.get(fnc.convert("threat.triage.level")), 10d, 1e-7); |
| } |
| else { |
| //For YAF this is the case, but if we do snort later on, this will be invalid. |
| Assert.assertNull(indexedDoc.get(fnc.convert("is_alert"))); |
| Assert.assertFalse(keyPatternExists(fnc.convert("threatintels."), indexedDoc)); |
| } |
| //ip threat intels |
| if(keyPatternExists(fnc.convert("threatintels.hbaseThreatIntel."), indexedDoc)) { |
| if(indexedDoc.getOrDefault(fnc.convert(SRC_IP),"").equals("10.0.2.3")) { |
| Assert.assertEquals(indexedDoc.get(fnc.convert("threatintels.hbaseThreatIntel." + SRC_IP + "." + MALICIOUS_IP_TYPE)), "alert"); |
| } |
| else if(indexedDoc.getOrDefault(fnc.convert(DST_IP),"").equals("10.0.2.3")) { |
| Assert.assertEquals(indexedDoc.get(fnc.convert("threatintels.hbaseThreatIntel." + DST_IP + "." + MALICIOUS_IP_TYPE)), "alert"); |
| } |
| else { |
| Assert.fail("There was a threat intels that I did not expect: " + indexedDoc); |
| } |
| } |
| |
| } |
| |
| private static void geoEnrichmentValidation(Map<String, Object> indexedDoc, FieldNameConverter fnc) { |
| //should have geo enrichment on every message due to mock geo adapter |
| Assert.assertEquals(indexedDoc.get(fnc.convert("enrichments.geo." + DST_IP + ".location_point")), MockGeoAdapter.DEFAULT_LOCATION_POINT); |
| Assert.assertEquals(indexedDoc.get(fnc.convert("enrichments.geo." + SRC_IP +".location_point")), MockGeoAdapter.DEFAULT_LOCATION_POINT); |
| Assert.assertEquals(indexedDoc.get(fnc.convert("enrichments.geo." + DST_IP + ".longitude")), MockGeoAdapter.DEFAULT_LONGITUDE); |
| Assert.assertEquals(indexedDoc.get(fnc.convert("enrichments.geo." + SRC_IP + ".longitude")), MockGeoAdapter.DEFAULT_LONGITUDE); |
| Assert.assertEquals(indexedDoc.get(fnc.convert("enrichments.geo." + DST_IP + ".city")), MockGeoAdapter.DEFAULT_CITY); |
| Assert.assertEquals(indexedDoc.get(fnc.convert("enrichments.geo." + SRC_IP + ".city")), MockGeoAdapter.DEFAULT_CITY); |
| Assert.assertEquals(indexedDoc.get(fnc.convert("enrichments.geo." + DST_IP + ".latitude")), MockGeoAdapter.DEFAULT_LATITUDE); |
| Assert.assertEquals(indexedDoc.get(fnc.convert("enrichments.geo." + SRC_IP + ".latitude")), MockGeoAdapter.DEFAULT_LATITUDE); |
| Assert.assertEquals(indexedDoc.get(fnc.convert("enrichments.geo." + DST_IP + ".country")), MockGeoAdapter.DEFAULT_COUNTRY); |
| Assert.assertEquals(indexedDoc.get(fnc.convert("enrichments.geo." + SRC_IP + ".country")), MockGeoAdapter.DEFAULT_COUNTRY); |
| Assert.assertEquals(indexedDoc.get(fnc.convert("enrichments.geo." + DST_IP + ".dmaCode")), MockGeoAdapter.DEFAULT_DMACODE); |
| Assert.assertEquals(indexedDoc.get(fnc.convert("enrichments.geo." + SRC_IP + ".dmaCode")), MockGeoAdapter.DEFAULT_DMACODE); |
| Assert.assertEquals(indexedDoc.get(fnc.convert("enrichments.geo." + DST_IP + ".postalCode")), MockGeoAdapter.DEFAULT_POSTAL_CODE); |
| Assert.assertEquals(indexedDoc.get(fnc.convert("enrichments.geo." + SRC_IP + ".postalCode")), MockGeoAdapter.DEFAULT_POSTAL_CODE); |
| } |
| |
| private static void hostEnrichmentValidation(Map<String, Object> indexedDoc, FieldNameConverter fnc) { |
| boolean enriched = false; |
| //important local printers |
| { |
| Set<String> ips = setOf("10.0.2.15", "10.60.10.254"); |
| if (ips.contains(indexedDoc.get(fnc.convert(SRC_IP)))) { |
| //this is a local, important, printer |
| Assert.assertTrue(Predicates.and(HostEnrichments.LOCAL_LOCATION |
| ,HostEnrichments.IMPORTANT |
| ,HostEnrichments.PRINTER_TYPE |
| ).apply(new EvaluationPayload(indexedDoc, SRC_IP, fnc)) |
| ); |
| enriched = true; |
| } |
| if (ips.contains(indexedDoc.get(fnc.convert(DST_IP)))) { |
| Assert.assertTrue(Predicates.and(HostEnrichments.LOCAL_LOCATION |
| ,HostEnrichments.IMPORTANT |
| ,HostEnrichments.PRINTER_TYPE |
| ).apply(new EvaluationPayload(indexedDoc, DST_IP, fnc)) |
| ); |
| enriched = true; |
| } |
| } |
| //important local webservers |
| { |
| Set<String> ips = setOf("10.1.128.236"); |
| if (ips.contains(indexedDoc.get(fnc.convert(SRC_IP)))) { |
| //this is a local, important, printer |
| Assert.assertTrue(Predicates.and(HostEnrichments.LOCAL_LOCATION |
| ,HostEnrichments.IMPORTANT |
| ,HostEnrichments.WEBSERVER_TYPE |
| ).apply(new EvaluationPayload(indexedDoc, SRC_IP, fnc)) |
| ); |
| enriched = true; |
| } |
| if (ips.contains(indexedDoc.get(fnc.convert(DST_IP)))) { |
| Assert.assertTrue(Predicates.and(HostEnrichments.LOCAL_LOCATION |
| ,HostEnrichments.IMPORTANT |
| ,HostEnrichments.WEBSERVER_TYPE |
| ).apply(new EvaluationPayload(indexedDoc, DST_IP, fnc)) |
| ); |
| enriched = true; |
| } |
| } |
| if(!enriched) { |
| Assert.assertFalse(keyPatternExists("enrichments.host", indexedDoc)); |
| } |
| } |
| |
| |
| private static boolean keyPatternExists(String pattern, Map<String, Object> indexedObj) { |
| for(String k : indexedObj.keySet()) { |
| if(k.startsWith(pattern)) { |
| return true; |
| } |
| } |
| return false; |
| } |
| private static Set<String> setOf(String... items) { |
| Set<String> ret = new HashSet<>(); |
| for(String item : items) { |
| ret.add(item); |
| } |
| return ret; |
| } |
| |
| abstract public FieldNameConverter getFieldNameConverter(); |
| abstract public InMemoryComponent getSearchComponent(Properties topologyProperties) throws Exception; |
| abstract public Processor<List<Map<String, Object>>> getProcessor(List<byte[]> inputMessages); |
| abstract public void setAdditionalProperties(Properties topologyProperties); |
| abstract public String cleanField(String field); |
| |
| } |