blob: 0aba22f94ef9c6e9d8428c558822d10ae4f21d6e [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.metron.integration.pcap;
import com.google.common.base.Function;
import com.google.common.base.Joiner;
import com.google.common.base.Splitter;
import com.google.common.collect.Iterables;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.metron.integration.util.UnitTestHelper;
import org.apache.metron.integration.util.integration.ComponentRunner;
import org.apache.metron.integration.util.integration.Processor;
import org.apache.metron.integration.util.integration.ReadinessState;
import org.apache.metron.integration.util.integration.components.ElasticSearchComponent;
import org.apache.metron.integration.util.integration.components.FluxTopologyComponent;
import org.apache.metron.integration.util.mock.MockHTable;
import org.apache.metron.integration.util.threatintel.ThreatIntelHelper;
import org.apache.metron.parsing.parsers.PcapParser;
import org.apache.metron.test.converters.HexStringConverter;
import org.apache.metron.threatintel.ThreatIntelKey;
import org.apache.metron.threatintel.ThreatIntelResults;
import org.json.simple.JSONObject;
import org.junit.Assert;
import org.junit.Test;
import javax.annotation.Nullable;
import java.io.BufferedReader;
import java.io.File;
import java.io.FileReader;
import java.io.IOException;
import java.text.SimpleDateFormat;
import java.util.*;
public class PcapIntegrationTest {
private String topologiesDir = "src/main/resources/Metron_Configs/topologies";
private String targetDir = "target";
@Test
public void testTopology() throws Exception {
if(!new File(topologiesDir).exists()) {
topologiesDir = UnitTestHelper.findDir("topologies");
}
if(!new File(targetDir).exists()) {
targetDir = UnitTestHelper.findDir("target");
}
Assert.assertNotNull(topologiesDir);
Assert.assertNotNull(targetDir);
final List<String> expectedPcapIds= getExpectedPcap(new File(topologiesDir + "/../../SampleInput/PCAPExampleOutput"));
Assert.assertTrue("Expected non-zero number of PCAP Ids from the sample data", expectedPcapIds.size() > 0);
System.out.println("Using topologies directory: " + topologiesDir);
ElasticSearchComponent esComponent = new ElasticSearchComponent.Builder()
.withHttpPort(9211)
.withIndexDir(new File(targetDir + "/elasticsearch"))
.build();
final String cf = "cf";
final String trackerHBaseTable = "tracker";
final String ipThreatIntelTable = "ip_threat_intel";
Properties topologyProperties = new Properties() {{
setProperty("input.path", "src/main/resources/");
setProperty("es.port", "9300");
setProperty("es.ip", "localhost");
setProperty("es.clustername", "metron");
setProperty("mysql.ip", "node1");
setProperty("mysql.port", "3306");
setProperty("mysql.username", "root");
setProperty("mysql.password", "P@ssw0rd");
setProperty("pcap.binary.converter", "FROM_HEX_STRING");
setProperty("testing.repeating", "false");
setProperty("org.apache.metron.metrics.reporter.graphite", "false");
setProperty("org.apache.metron.metrics.reporter.console", "false");
setProperty("org.apache.metron.metrics.reporter.jmx", "false");
setProperty("org.apache.metron.metrics.TelemetryParserBolt.acks","true");
setProperty("org.apache.metron.metrics.TelemetryParserBolt.emits", "true");
setProperty("org.apache.metron.metrics.TelemetryParserBolt.fails","true");
setProperty("org.apache.metron.metrics.GenericEnrichmentBolt.acks","true");
setProperty("org.apache.metron.metrics.GenericEnrichmentBolt.emits","true");
setProperty("org.apache.metron.metrics.GenericEnrichmentBolt.fails","true");
setProperty("org.apache.metron.metrics.TelemetryIndexingBolt.acks", "true");
setProperty("org.apache.metron.metrics.TelemetryIndexingBolt.emits","true");
setProperty("org.apache.metron.metrics.TelemetryIndexingBolt.fails","true");
setProperty("kafka.zk", "localhost:2000,localhost:2000");
setProperty("bolt.hbase.table.name", "pcap_test");
setProperty("bolt.hbase.table.fields", "t:value");
setProperty("bolt.hbase.table.key.tuple.field.name", "key");
setProperty("bolt.hbase.table.timestamp.tuple.field.name", "timestamp");
setProperty("bolt.hbase.enable.batching", "false");
setProperty("bolt.hbase.write.buffer.size.in.bytes", "2000000");
setProperty("bolt.hbase.durability", "SKIP_WAL");
setProperty("bolt.hbase.partitioner.region.info.refresh.interval.mins","60");
setProperty("hbase.provider.impl","" + MockHTable.Provider.class.getName());
setProperty("threat.intel.tracker.table", trackerHBaseTable);
setProperty("threat.intel.tracker.cf", cf);
setProperty("threat.intel.ip.table", ipThreatIntelTable);
setProperty("threat.intel.ip.cf", cf);
setProperty("org.apache.metron.enrichment.host.known_hosts", "[{\"ip\":\"10.1.128.236\", \"local\":\"YES\", \"type\":\"webserver\", \"asset_value\" : \"important\"}," +
"{\"ip\":\"10.1.128.237\", \"local\":\"UNKNOWN\", \"type\":\"unknown\", \"asset_value\" : \"important\"}," +
"{\"ip\":\"10.60.10.254\", \"local\":\"YES\", \"type\":\"printer\", \"asset_value\" : \"important\"}," +
"{\"ip\":\"10.0.2.15\", \"local\":\"YES\", " +
"\"type\":\"printer\", \"asset_value\" : \"important\"}]");
}};
//create MockHBaseTables
final MockHTable trackerTable = (MockHTable)MockHTable.Provider.addToCache(trackerHBaseTable, cf);
final MockHTable ipTable = (MockHTable)MockHTable.Provider.addToCache(ipThreatIntelTable, cf);
ThreatIntelHelper.INSTANCE.load(ipTable, cf, new ArrayList<ThreatIntelResults>(){{
add(new ThreatIntelResults(new ThreatIntelKey("10.0.2.3"), new HashMap<String, String>()));
}}, 0L);
final MockHTable pcapTable = (MockHTable) MockHTable.Provider.addToCache("pcap_test", "t");
FluxTopologyComponent fluxComponent = new FluxTopologyComponent.Builder()
.withTopologyLocation(new File(topologiesDir + "/pcap/local.yaml"))
.withTopologyName("pcap")
.withTopologyProperties(topologyProperties)
.build();
//UnitTestHelper.verboseLogging();
ComponentRunner runner = new ComponentRunner.Builder()
.withComponent("elasticsearch", esComponent)
.withComponent("storm", fluxComponent)
.build();
final String index = getIndex();
System.out.println("Index of the run: " + index);
runner.start();
fluxComponent.submitTopology();
List<Map<String, Object>> docs =
runner.process(new Processor<List<Map<String, Object>>> () {
List<Map<String, Object>> docs = null;
public ReadinessState process(ComponentRunner runner){
ElasticSearchComponent elasticSearchComponent = runner.getComponent("elasticsearch", ElasticSearchComponent.class);
if(elasticSearchComponent.hasIndex(index)) {
try {
docs = elasticSearchComponent.getAllIndexedDocs(index);
} catch (IOException e) {
throw new IllegalStateException("Unable to retrieve indexed documents.", e);
}
if(docs.size() < expectedPcapIds.size() && pcapTable.getPutLog().size() < expectedPcapIds.size()) {
return ReadinessState.NOT_READY;
}
else {
return ReadinessState.READY;
}
}
else {
return ReadinessState.NOT_READY;
}
}
public List<Map<String, Object>> getResult() {
return docs;
}
});
Assert.assertEquals(expectedPcapIds.size(), pcapTable.getPutLog().size());
UnitTestHelper.assertSetEqual("PCap IDs from Index"
, new HashSet<>(expectedPcapIds)
, convertToSet(Iterables.transform(docs, DOC_TO_PCAP_ID))
);
UnitTestHelper.assertSetEqual("PCap IDs from HBase"
, new HashSet<>(expectedPcapIds)
, convertToSet(Iterables.transform(pcapTable.getPutLog(), RK_TO_PCAP_ID))
);
Iterable<JSONObject> packetsFromHBase = Iterables.transform(pcapTable.getPutLog(), PUT_TO_PCAP);
Assert.assertEquals(expectedPcapIds.size(), Iterables.size(packetsFromHBase));
List<Map<String, Object>> allDocs= runner.getComponent("elasticsearch", ElasticSearchComponent.class).getAllIndexedDocs(index, null);
boolean hasThreat = false;
for(Map<String, Object> d : allDocs) {
Map<String, Object> message = (Map<String, Object>) d.get("message");
Set<String> ips = new HashSet<>(Arrays.asList((String)message.get("ip_dst_addr"), (String)message.get("ip_src_addr")));
if(ips.contains("10.0.2.3")) {
hasThreat = true;
Map<String, Object> alerts = (Map<String, Object>) ((Map<String, Object>) d.get("alerts")).get("ip");
Assert.assertTrue( ((Map<String,Object>)alerts.get("ip_dst_addr")).size() > 0
|| ((Map<String,Object>)alerts.get("ip_src_addr")).size() > 0
);
}
}
Assert.assertTrue(hasThreat);
MockHTable.Provider.clear();
runner.stop();
}
public static Set<String> convertToSet(Iterable<String> strings) {
Set<String> ret = new HashSet<String>();
Iterables.addAll(ret, strings);
return ret;
}
public static final Function<Put, String> RK_TO_PCAP_ID = new Function<Put, String>() {
@Nullable
public String apply(@Nullable Put put) {
String rk =new String(put.getRow());
return Joiner.on("-").join(Iterables.limit(Splitter.on('-').split(rk), 5));
}
};
public static final Function<Map<String, Object>, String> DOC_TO_PCAP_ID = new Function<Map<String, Object>, String>() {
@Nullable
public String apply(@Nullable Map<String, Object> doc) {
return (String)doc.get("pcap_id");
}
};
public static final Function<Put, JSONObject> PUT_TO_PCAP = new
Function<Put, JSONObject>() {
@Nullable
public JSONObject apply(@Nullable Put put) {
try {
return putToPcap(put);
} catch (IOException e) {
throw new RuntimeException("Unable to convert put to PCAP: " + put);
}
}
};
private static List<String> getExpectedPcap(File rawFile) throws IOException {
List<String> ret = new ArrayList<String>();
PcapParser parser = new PcapParser();
parser.withTsPrecision("MICRO");
parser.init();
BufferedReader br = new BufferedReader(new FileReader(rawFile));
for(String line = null; (line = br.readLine()) != null;) {
byte[] pcapBytes = new HexStringConverter().convert(line);
List<JSONObject> list = parser.parse(pcapBytes);
for(JSONObject message : list) {
ret.add((String) message.get("pcap_id"));
}
}
return ret;
}
private static String getIndex() {
SimpleDateFormat sdf = new SimpleDateFormat("yyyy.MM.dd.hh");
Date d = new Date();
return "pcap_index_" + sdf.format(d);
}
private static JSONObject putToPcap(Put p) throws IOException {
PcapParser parser = new PcapParser();
parser.init();
List<Cell> cells = p.get(Bytes.toBytes("t"), Bytes.toBytes("value"));
Assert.assertEquals(1, cells.size());
List<JSONObject> messages = parser.parse(cells.get(0).getValueArray());
Assert.assertEquals(1, messages.size());
return messages.get(0);
}
}