When we say ‘routing’, we mean how the messages get split up and sent to the enrichment adapters. The simplest, by far, is just providing a simple list as in
"fieldMap": { "geo": [ "ip_src_addr", "ip_dst_addr" ], "host": [ "ip_src_addr", "ip_dst_addr" ], "hbaseEnrichment": [ "ip_src_addr", "ip_dst_addr" ] }
Based on this sample config, both ip_src_addr
and ip_dst_addr
will go to the geo
, host
, and hbaseEnrichment
adapters.
For the geo
, host
and hbaseEnrichment
, this is sufficient. However, more complex enrichments may contain their own configuration. Currently, the stellar
enrichment is more adaptable and thus requires a more nuanced configuration.
At its most basic, we want to take a message and apply a couple of enrichments, such as converting the hostname
field to lowercase. We do this by specifying the transformation inside of the config
for the stellar
fieldMap. There are two syntaxes that are supported, specifying the transformations as a map with the key as the field and the value the stellar expression:
"fieldMap": { ... "stellar" : { "config" : { "hostname" : "TO_LOWER(hostname)" } } }
Another approach is to make the transformations as a list with the same var := expr
syntax as is used in the Stellar REPL, such as:
"fieldMap": { ... "stellar" : { "config" : [ "hostname := TO_LOWER(hostname)" ] } }
Sometimes arbitrary stellar enrichments may take enough time that you would prefer to split some of them into groups and execute the groups of stellar enrichments in parallel. Take, for instance, if you wanted to do an HBase enrichment and a profiler call which were independent of one another. This usecase is supported by splitting the enrichments up as groups.
Consider the following example:
"fieldMap": { ... "stellar" : { "config" : { "malicious_domain_enrichment" : { "is_bad_domain" : "ENRICHMENT_EXISTS('malicious_domains', ip_dst_addr, 'enrichments', 'cf')" }, "login_profile" : [ "profile_window := PROFILE_WINDOW('from 6 months ago')", "global_login_profile := PROFILE_GET('distinct_login_attempts', 'global', profile_window)", "stats := STATS_MERGE(global_login_profile)", "auth_attempts_median := STATS_PERCENTILE(stats, 0.5)", "auth_attempts_sd := STATS_SD(stats)", "profile_window := null", "global_login_profile := null", "stats := null" ] } } }
Here we want to perform two enrichments that hit HBase and we would rather not run in sequence. These enrichments are entirely independent of one another (i.e. neither relies on the output of the other). In this case, we've created a group called malicious_domain_enrichment
to inquire about whether the destination address exists in the HBase enrichment table in the malicious_domains
enrichment type. This is a simple enrichment, so we can express the enrichment group as a map with the new field is_bad_domain
being a key and the stellar expression associated with that operation being the associated value.
In contrast, the stellar enrichment group login_profile
is interacting with the profiler, has multiple temporary expressions (i.e. profile_window
, global_login_profile
, and stats
) that are useful only within the context of this group of stellar expressions. In this case, we would need to ensure that we use the list construct when specifying the group and remember to set the temporary variables to null
so they are not passed along.
In general, things to note from this section are as follows:
stellar
enrichment adapter are specified in the config
for the stellar
enrichment adapter in the fieldMap
null
before the end of the group.foo
which assigns a map such as foo := { 'grok' : 1, 'bar' : 'baz'}
would yield the following fields:foo.grok
== 1
foo.bar
== 'baz'
threatIntel
ConfigurationField | Description | Example |
---|---|---|
fieldToTypeMap | In the case of a simple HBase threat intel enrichment (i.e. a key/value lookup), the mapping between fields and the enrichment types associated with those fields must be known. This enrichment type is used as part of the HBase key. Note: applies to hbaseThreatIntel only. | "fieldToTypeMap" : { "ip_src_addr" : [ "malicious_ips" ] } |
fieldMap | The map of threat intel enrichment bolts names to fields in the JSON messages. Each field is sent to the threat intel enrichment bolt referenced in the key. Each field listed in the array arg is sent to the enrichment referenced in the key. Cardinality of fields to enrichments is many-to-many. | "fieldMap": {"hbaseThreatIntel": ["ip_src_addr","ip_dst_addr"]} |
triageConfig | The configuration of the threat triage scorer. In the situation where a threat is detected, a score is assigned to the message and embedded in the indexed message. | "riskLevelRules" : { "IN_SUBNET(ip_dst_addr, '192.168.0.0/24')" : 10 } |
config | The general configuration for the Threat Intel | "config": {"typeToColumnFamily": { "malicious_ips","cf" } } |
The config
map is intended to house threat intel specific configuration. For instance, for the hbaseThreatIntel
threat intel adapter, the mappings between the enrichment types to the column families is specified. The fieldMap
configuration is similar to the enrichment
configuration in that the adapters available are the same.
The triageConfig
field is also a complex field and it bears some description:
Field | Description | Example |
---|---|---|
riskLevelRules | This is a list of rules (represented as Stellar expressions) associated with scores with optional names and comments | see below |
aggregator | An aggregation function that takes all non-zero scores representing the matching queries from riskLevelRules and aggregates them into a single score. | "MAX" |
A message is triaged by applying a set of risk scoring rules. These rules are used to calculate an overall threat score that can be used to prioritize threats. For each message a rule may either apply and attribute to the overall risk score or the rule may be ignored. A set of rules might look like the following.
"riskLevelRules" : [ { "name" : "Destination IP is internal", "comment" : "Determines if the destination IP is on the internal network.", "rule" : "IN_SUBNET(ip_dst_addr, '192.168.0.0/24')", "score" : 10, "reason" : "FORMAT('%s is an internal IP', ip_dst_addr)" }, { "name" : "Originates outside of the United States", "comments": "External to US, but lesser risk applies to North America.", "rule" : "geo.country != 'US'", "score" : "if geo.country in ['MX','CA'] then 10 else 200", "reason" : "FORMAT('%s originates from %s', ip_dst_addr, geo.country)" } ]
A risk level rule can contain the following fields.
The name of the threat triage rule.
A comment describing the threat triage rule.
A Stellar expression that determines whether this Risk Level Rule applies to a given message. If the rule expression returns true, the score will be aggregated into the message's overall threat score.
The score that is aggregated into a message's overall threat score.
Provides a reason for why the Risk Level Rule was applied. This allows additional context to be retrieved from the message. This is intended to enable SOC operators to better address the threat.
For best performance, the Stellar expressions contained within a Risk Level Rule should avoid I/O intensive operations like making queries to external platforms like HBase. Instead, the query should be performed as an Enrichment (see Stellar Enrichment Configuration) and the value should be stored within the message. This value can then be directly referenced by the field name when defining the rule, score, or reason fields of a Risk Level Rule.
The supported aggregation functions are:
MAX
: The max of all of the associated values for matching queriesMIN
: The min of all of the associated values for matching queriesMEAN
: The mean of all of the associated values for matching queriesSUM
: The sum of all the associated values for matching queriesPOSITIVE_MEAN
: The mean of the positive associated values for the matching queries.An example configuration for the YAF sensor is as follows:
{ "enrichment": { "fieldMap": { "geo": [ "ip_src_addr", "ip_dst_addr" ], "host": [ "ip_src_addr", "ip_dst_addr" ], "hbaseEnrichment": [ "ip_src_addr", "ip_dst_addr" ] } ,"fieldToTypeMap": { "ip_src_addr": [ "playful_classification" ], "ip_dst_addr": [ "playful_classification" ] } }, "threatIntel": { "fieldMap": { "hbaseThreatIntel": [ "ip_src_addr", "ip_dst_addr" ] }, "fieldToTypeMap": { "ip_src_addr": [ "malicious_ip" ], "ip_dst_addr": [ "malicious_ip" ] }, "triageConfig" : { "riskLevelRules" : [ { "rule" : "ip_src_addr == '10.0.2.3' or ip_dst_addr == '10.0.2.3'", "score" : 10 } ], "aggregator" : "MAX" } } }
ThreatIntel alert levels are emitted as a new field “threat.triage.level.” So for the example above, an incoming message that trips the ip_src_addr
rule will have a new field threat.triage.level=10.