A variety of non-trivial and advanced analytics make use of statistics and advanced mathematical functions. Particular, capturing the statistical snapshots in a scalable way can open up doors for more advanced analytics such as outlier analysis. As such, this project is aimed at capturing a robust set of statistical functions and statistical-based algorithms in the form of Stellar functions. These functions can be used from everywhere where Stellar is used.
HLLP_ADD
HLLP_CARDINALITY
HLLP_INIT
HLLP_MERGE
ABS
BIN
SAMPLE_ADD
SAMPLE_GET
SAMPLE_INIT
SAMPLE_MERGE
STATS_ADD
STATS_BIN
STATS_COUNT
STATS_GEOMETRIC_MEAN
STATS_INIT
STATS_KURTOSIS
STATS_MAX
STATS_MEAN
STATS_MERGE
STATS_MIN
STATS_PERCENTILE
STATS_POPULATION_VARIANCE
STATS_QUADRATIC_MEAN
STATS_SD
STATS_SKEWNESS
STATS_SUM
STATS_SUM_LOGS
STATS_SUM_SQUARES
STATS_VARIANCE
IT_ENTROPY
OUTLIER_MAD_STATE_MERGE
OUTLIER_MAD_ADD
OUTLIER_MAD_SCORE
0.6745
, see the first page of http://web.ipac.caltech.edu/staff/fmasci/home/astro_refs/BetterThanMAD.pdfA common desire is to find anomalies in numerical data. To that end, we have some simple statistical anomaly detectors.
Much has been written about this robust estimator. See the first page of http://web.ipac.caltech.edu/staff/fmasci/home/astro_refs/BetterThanMAD.pdf for a good coverage of the good and the bad of MAD. The usage, however is fairly straightforward:
There are a couple of issues which make MAD a bit hard to compute. First, the statistical state requires computing median, which can be computationally expensive to compute exactly. To get around this, we use the OnlineStatisticalProvider to compute a sketch rather than the exact median. Secondly, the statistical state for seasonal data should be limited to a fixed, trailing window. We do this by ensuring that the MAD state is mergeable and able to be queried from within the Profiler.
We will create a dummy data stream of gaussian noise to illustrate how to use the MAD functionality along with the profiler to tag messages as outliers or not.
To do this, we will create a
We can create a simple python script to generate a stream of gaussian noise at the frequency of one message per second as a python script which should be saved at ~/rand_gen.py
:
#!/usr/bin/python import random import sys import time def main(): mu = float(sys.argv[1]) sigma = float(sys.argv[2]) freq_s = int(sys.argv[3]) while True: print str(random.gauss(mu, sigma)) sys.stdout.flush() time.sleep(freq_s) if __name__ == '__main__': main()
This script will take the following as arguments:
If, however, you'd like to test a longer tailed distribution, like the student t-distribution and have numpy installed, you can use the following as ~/rand_gen.py
:
#!/usr/bin/python import random import sys import time import numpy as np def main(): df = float(sys.argv[1]) freq_s = int(sys.argv[2]) while True: print str(np.random.standard_t(df)) sys.stdout.flush() time.sleep(freq_s) if __name__ == '__main__': main()
This script will take the following as arguments:
We will create a parser that will take the single numbers in and create a message with a field called value
in them using the CSVParser
.
Add the following file to $METRON_HOME/config/zookeeper/parsers/mad.json
:
{ "parserClassName" : "org.apache.metron.parsers.csv.CSVParser" ,"sensorTopic" : "mad" ,"parserConfig" : { "columns" : { "value_str" : 0 } } ,"fieldTransformations" : [ { "transformation" : "STELLAR" ,"output" : [ "value" ] ,"config" : { "value" : "TO_DOUBLE(value_str)" } } ] }
We will set a threat triage level of 10
if a message generates a outlier score of more than 3.5. This cutoff will depend on your data and should be adjusted based on the assumed underlying distribution. Note that under the assumptions of normality, MAD will act as a robust estimator of the standard deviation, so the cutoff should be considered the number of standard deviations away. For other distributions, there are other interpretations which will make sense in the context of measuring the “degree different”. See http://eurekastatistics.com/using-the-median-absolute-deviation-to-find-outliers/ for a brief discussion of this.
Create the following in $METRON_HOME/config/zookeeper/enrichments/mad.json
:
{ "enrichment": { "fieldMap": { "stellar" : { "config" : { "parser_score" : "OUTLIER_MAD_SCORE(OUTLIER_MAD_STATE_MERGE( PROFILE_GET( 'sketchy_mad', 'global', PROFILE_FIXED(10, 'MINUTES')) ), value)" ,"is_alert" : "if parser_score > 3.5 then true else is_alert" } } } ,"fieldToTypeMap": { } }, "threatIntel": { "fieldMap": { }, "fieldToTypeMap": { }, "triageConfig" : { "riskLevelRules" : [ { "rule" : "parser_score > 3.5", "score" : 10 } ], "aggregator" : "MAX" } } }
We also need an indexing configuration. Create the following in $METRON_HOME/config/zookeeper/indexing/mad.json
:
{ "hdfs" : { "index": "mad", "batchSize": 1, "enabled" : true }, "elasticsearch" : { "index": "mad", "batchSize": 1, "enabled" : true } }
We can set up the profiler to track the MAD statistical state required to compute MAD. For the purposes of this demonstration, we will configure the profiler to capture statistics on the minute mark. We will capture a global statistical state for the value
field and we will look back for a 5 minute window when computing the median.
Create the following file at $METRON_HOME/config/zookeeper/profiler.json
:
{ "profiles": [ { "profile": "sketchy_mad", "foreach": "'global'", "onlyif": "true", "init" : { "s": "OUTLIER_MAD_STATE_MERGE(PROFILE_GET('sketchy_mad', 'global', PROFILE_FIXED(5, 'MINUTES')))" }, "update": { "s": "OUTLIER_MAD_ADD(s, value)" }, "result": "s" } ] }
Adjust $METRON_HOME/config/zookeeper/global.json
to adjust the capture duration:
"profiler.client.period.duration" : "1", "profiler.client.period.duration.units" : "MINUTES"
Adjust $METRON_HOME/config/profiler.properties
to adjust the capture duration by changing profiler.period.duration=15
to profiler.period.duration=1
Install the elasticsearch head plugin by executing: /usr/share/elasticsearch/bin/plugin install mobz/elasticsearch-head
Stopping all other parser topologies via monit
Create the mad
kafka topic by executing: /usr/hdp/current/kafka-broker/bin/kafka-topics.sh --zookeeper node1:2181 --create --topic mad --partitions 1 --replication-factor 1
Push the modified configs by executing: $METRON_HOME/bin/zk_load_configs.sh --mode PUSH -z node1:2181 -i $METRON_HOME/config/zookeeper/
Start the profiler by executing: $METRON_HOME/bin/start_profiler_topology.sh
Start the parser topology by executing: $METRON_HOME/bin/start_parser_topology.sh -k node1:6667 -z node1:2181 -s mad
Ensure that the enrichment and indexing topologies are started. If not, then start those via monit or by hand.
Generate data into kafka by executing the following for at least 10 minutes: ~/rand_gen.py 0 1 1 | /usr/hdp/current/kafka-broker/bin/kafka-console-producer.sh --broker-list node1:6667 --topic mad
Note: if you chose the use the t-distribution script above, you would adjust the parameters of the rand_gen.py
script accordingly.
Stop the above with ctrl-c and send in an obvious outlier into kafka: echo "1000" | /usr/hdp/current/kafka-broker/bin/kafka-console-producer.sh --broker-list node1:6667 --topic mad
You should be able to find the outlier via the elasticsearch head plugin by searching for the messages where is_alert
is true
.