One way to find anomalous behavior in a network is by inspecting user login behavior. In particular, if a user is logging in via vastly differing geographic locations in a short period of time, this may be evidence of malicious behavior.
More formally, we can encode this potentially malicious event in terms of how far from the geographic centroid of the user's historic logins as compared to all users. For instance, if we track all users and the median distance from the central geographic location of all of their logins for the last 2 hours is 3 km and the standard deviation is 1 km, if we see a user logging in 1700 km from the central geographic location of their logins for the last 2 hours, then they MAY be exhibiting a deviation that we want to monitor since it would be hard to travel that distance in 4 hours. On the other hand, the user may have just used a VPN or proxy. Ultimately, this sort of analytic must be considered only one piece of evidence in addition to many others before we want to indicate an alert.
For the purposes of demonstration, we will construct synthetic data whereby 2 users are logging into a system rather quickly (once per second) from various hosts. Each user's locations share the same first 2 octets, but will choose the last 2 randomly. We will then inject a data point indicating user1
is logging in via a russian IP address.
We assume that the following environment variables are set:
METRON_HOME
- the home directory for metronZOOKEEPER
- The zookeeper quorum (comma separated with port specified: e.g. node1:2181
for full-dev)BROKERLIST
- The Kafka broker list (comma separated with port specified: e.g. node1:6667
for full-dev)ES_HOST
- The elasticsearch master (and port) e.g. node1:9200
for full-dev.Also, this does not assume that you are using a kerberized cluster. If you are, then the parser start command will adjust slightly to include the security protocol.
Before editing configurations, be sure to pull the configs from zookeeper locally via
$METRON_HOME/bin/zk_load_configs.sh --mode PULL -z $ZOOKEEPER -o $METRON_HOME/config/zookeeper/ -f
First, we'll configure the profiler to emit a profiler every 1 minute:
1
minute via the Profiler config section.$METRON_HOME/config/zookeeper/global.json
to adjust the capture duration:"profiler.client.period.duration" : "1", "profiler.client.period.duration.units" : "MINUTES"
We want to create a new sensor for our synthetic data called auth
. To feed it, we need a synthetic data generator. In particular, we want a process which will feed authentication events per second for a set of users where the IPs are randomly chosen, but each user's login ip addresses share the same first 2 octets.
Edit ~/gen_data.py
and paste the following into it:
#!/usr/bin/python import random import sys import time domains = { 'user1' : '173.90', 'user2' : '156.33' } def get_ip(base): return base + '.' + str(random.randint(1,255)) + '.' + str(random.randint(1, 255)) def main(): freq_s = 1 while True: user='user' + str(random.randint(1,len(domains))) epoch_time = int(time.time()) ip=get_ip(domains[user]) print user + ',' + ip + ',' + str(epoch_time) sys.stdout.flush() time.sleep(freq_s) if __name__ == '__main__': main()
auth
ParserThe message format for our simple synthetic data is a CSV with:
We will need to parse this via our CSVParser
and add the geohash of the login ip address.
$METRON_HOME/config/zookeeper/parsers/auth.json
and paste the following:{ "parserClassName" : "org.apache.metron.parsers.csv.CSVParser" ,"sensorTopic" : "auth" ,"parserConfig" : { "columns" : { "user" : 0, "ip" : 1, "timestamp" : 2 } } ,"fieldTransformations" : [ { "transformation" : "STELLAR" ,"output" : [ "hash" ] ,"config" : { "hash" : "GEOHASH_FROM_LOC(GEO_GET(ip))" } } ] }
/usr/hdp/current/kafka-broker/bin/kafka-topics.sh --zookeeper $ZOOKEEPER --create --topic auth --partitions 1 --replication-factor 1
We will need to track 2 profiles to accomplish this task:
locations_by_user
- The geohashes of the locations the user has logged in from. This is a multiset of geohashes per user. Note that the multiset in this case is effectively a map of geohashes to occurrance counts.geo_distribution_from_centroid
- The statistical distribution of the distance between a login location and the geographic centroid of the user's previous logins from the last 2 minutes. Note, in a real installation this would be a larger temporal lookback.We can represent these in the $METRON_HOME/config/zookeeper/profiler.json
via the following:
{ "profiles": [ { "profile": "geo_distribution_from_centroid", "foreach": "'global'", "onlyif": "geo_distance != null", "init" : { "s": "STATS_INIT()" }, "update": { "s": "STATS_ADD(s, geo_distance)" }, "result": "s" }, { "profile": "locations_by_user", "foreach": "user", "onlyif": "hash != null && LENGTH(hash) > 0", "init" : { "s": "MULTISET_INIT()" }, "update": { "s": "MULTISET_ADD(s, hash)" }, "result": "s" } ] }
We will need to enrich the authentication records in a couple of ways to use in the threat triage section as well as the profiles:
geo_distance
: representing the distance between the current geohash and the geographic centroid for the last 2 minutes.geo_centroid
: representing the geographic centroid for the last 2 minutesBeyond that, we will need to determine if the authentication event is a geographic outlier by computing the following fields:
dist_median
: representing the median distance between a user's login location and the geographic centroid for the last 2 minutes (essentially the median of the geo_distance
values across all users).dist_sd
: representing the standard deviation of the distance between a user's login location and the geographic centroid for the last 2 minutes (essentially the standard deviation of the geo_distance
values across all users).geo_outlier
: whether geo_distance
is more than 5 standard deviations from the median across all users.We also want to set up a triage rule associating a score and setting an alert if geo_outlier
is true. In reality, this would be more complex as this metric is at best circumstantial and would need supporting evidence, but for simplicity we'll deal with the false positives.
$METRON_HOME/config/zookeeper/enrichments/auth.json
and paste the following:{ "enrichment": { "fieldMap": { "stellar" : { "config" : [ "geo_locations := MULTISET_MERGE( PROFILE_GET( 'locations_by_user', user, PROFILE_FIXED( 2, 'MINUTES')))", "geo_centroid := GEOHASH_CENTROID(geo_locations)", "geo_distance := TO_INTEGER(GEOHASH_DIST(geo_centroid, hash))", "geo_locations := null" ] } } ,"fieldToTypeMap": { } }, "threatIntel": { "fieldMap": { "stellar" : { "config" : [ "geo_distance_distr:= STATS_MERGE( PROFILE_GET( 'geo_distribution_from_centroid', 'global', PROFILE_FIXED( 2, 'MINUTES')))", "dist_median := STATS_PERCENTILE(geo_distance_distr, 50.0)", "dist_sd := STATS_SD(geo_distance_distr)", "geo_outlier := ABS(dist_median - geo_distance) >= 5*dist_sd", "is_alert := is_alert || (geo_outlier != null && geo_outlier == true)", "geo_distance_distr := null" ] } }, "fieldToTypeMap": { }, "triageConfig" : { "riskLevelRules" : [ { "name" : "Geographic Outlier", "comment" : "Determine if the user's geographic distance from the centroid of the historic logins is an outlier as compared to all users.", "rule" : "geo_outlier != null && geo_outlier", "score" : 10, "reason" : "FORMAT('user %s has a distance (%d) from the centroid of their last login is 5 std deviations (%f) from the median (%f)', user, geo_distance, dist_sd, dist_median)" } ], "aggregator" : "MAX" } } }
From here, we've set up our configuration and can push the configs:
$METRON_HOME/bin/zk_load_configs.sh --mode PUSH -z $ZOOKEEPER -i $METRON_HOME/config/zookeeper/
$METRON_HOME/bin/start_parser_topology.sh -k $BROKERLIST -z $ZOOKEEPER -s auth
auth
topic viapython ~/gen_data.py | /usr/hdp/current/kafka-broker/bin/kafka-console-producer.sh --broker-list $BROKERLIST --topic auth
5
minutes and kill the previous commanduser1
has logged in from a russian IP (109.252.227.173
):echo -e "import time\nprint 'user1,109.252.227.173,'+str(int(time.time()))" | python | /usr/hdp/current/kafka-broker/bin/kafka-console-producer.sh --broker-list $BROKERLIST --topic auth
curl -XPOST "http://$ES_HOST/auth*/_search?pretty" -d ' { "_source" : [ "is_alert", "threat:triage:rules:0:reason", "user", "ip", "geo_distance" ], "query": { "exists" : { "field" : "threat:triage:rules:0:reason" } } } '
You should see, among a few other false positive results, something like the following:
{ "_index" : "auth_index_2017.09.07.20", "_type" : "auth_doc", "_id" : "f5bdbf76-9d78-48cc-b21d-bc434c96e62e", "_score" : 1.0, "_source" : { "geo_distance" : 7879, "threat:triage:rules:0:reason" : "user user1 has a distance (7879) from the centroid of their last login is 5 std deviations (334.814719) from the median (128.000000)", "ip" : "109.252.227.173", "is_alert" : "true", "user" : "user1" } }