The unified enrichment topology uses data parallelism as opposed to the deprecated split/join topology's task parallelism. This architecture uses a worker pool to fully enrich any message within a worker. This results in
This architecture is fully backwards compatible with the old split-join topology; the only difference is how the enrichment will operate on each message (in one bolt where the split/join is done in a threadpool as opposed to split across multiple bolts).
There are two parameters which you might want to tune in this topology. Both of them are topology configuration adjustable in the flux file $METRON_HOME/config/flux/enrichment/remote-unified.yaml
:
metron.threadpool.size
: The size of the threadpool. This can take a number or a multiple of the number of cores (e.g. 5C
to 5 times the number of cores). The default is 2C
.metron.threadpool.type
: The type of threadpool. (note: descriptions taken from here).FIXED
is a fixed threadpool of size n
. n
threads will process tasks at the time, when the pool is saturated, new tasks will get added to a queue without a limit on size. Good for CPU intensive tasks. This is the default.WORK_STEALING
is a work stealing threadpool. This will create and shut down threads dynamically to accommodate the required parallelism level. It also tries to reduce the contention on the task queue, so can be really good in heavily loaded environments. Also good when your tasks create more tasks for the executor, like recursive tasks.In order to configure the parallelism for the enrichment bolt and threat intel bolt, the configurations will be taken from the respective join bolt parallelism. When proper ambari support for this is added, we will add its own property.
The now-deprecated split/join topology is also available and performs enrichments in parallel. This poses some issues in terms of ease of tuning and reasoning about performance.
In order to use the older, deprecated topology, you will need to
$METRON_HOME/bin/start_enrichment_topology.sh
and adjust it to use remote-splitjoin.yaml
instead of remote-unified.yaml
The configuration for the enrichment
topology, the topology primarily responsible for enrichment and threat intelligence enrichment, is defined by JSON documents stored in zookeeper.
Let's walk through doing a simple enrichment using Stellar on your cluster using the Squid topology.
Now let's install some prerequisites:
yum install squid
/usr/share/elasticsearch/bin/plugin install mobz/elasticsearch-head
Start Squid via service squid start
Let's adjust the configurations for the Squid topology to annotate the messages using some Stellar functions.
$METRON_HOME/config/zookeeper/enrichments/squid.json
(this file will not exist, so create a new one) to add some new fields based on stellar queries:{ "enrichment" : { "fieldMap": { "stellar" : { "config" : { "numeric" : { "foo": "1 + 1" } ,"ALL_CAPS" : "TO_UPPER(source.type)" } } } }, "threatIntel" : { "fieldMap":{ "stellar" : { "config" : { "bar" : "TO_UPPER(source.type)" } } }, "triageConfig" : { } } }
We have added the following fields as part of the enrichment phase of the enrichment topology:
foo
== 2ALL_CAPS
== SQUIDWe have added the following as part of the threat intel:
bar
== SQUIDPlease note that foo and ALL_CAPS will be applied in separate workers due to them being in separate groups.
$METRON_HOME/bin/zk_load_configs.sh --mode PUSH -i $METRON_HOME/config/zookeeper -z node1:2181
/usr/hdp/current/kafka-broker/bin/kafka-topics.sh --zookeeper node1:2181 --create --topic squid --partitions 1 --replication-factor 1
Now we need to start the topologies and send some data:
$METRON_HOME/bin/start_parser_topology.sh -k node1:6667 -z node1:2181 -s squid
squidclient http://yahoo.com
squidclient http://cnn.com
cat /var/log/squid/access.log | /usr/hdp/current/kafka-broker/bin/kafka-console-producer.sh --broker-list node1:6667 --topic squid
foo
, bar
and ALL_CAPS
with values as described above.Note that we could have used any Stellar statements here, including calling out to HBase via ENRICHMENT_GET
and ENRICHMENT_EXISTS
or even calling a machine learning model via Model as a Service.