Aggregating many different types sensors into a single data source (e.g. syslog) and ingesting that aggregate sensor into Metron is a common pattern. It is not obvious precisely how to manage these types of aggregate sensors as they require two-pass parsing. This document will walk through an example of supporting this kind of multi-pass ingest.
Multi-pass parser involves the following requirements:
At a high level, we continue to maintain the architectural invariant of a 1-1 relationship between logical sensors and storm topologies. Eventually this relationship may become more complex, but at the moment the approach is to construct a routing parser which will have two responsibilities:
Because the data emitted from the routing parser is just like any data emitted from any other parser, in that it is a JSON blob like any data emitted from any parser, we will need to adjust the downstream parsers to extract the enveloped data from the JSON blob and treat it as the data to parse.
Currently the approach to fulfill this requirement involves a couple knobs in the Parser infrastructure for Metron.
Consider the case, for instance, where we have many different TYPES of messages wrapped inside of syslog. As an architectural abstraction, we would want to have the following properties:
Parsers allow users to configure the topic which the kafka producer uses in a couple of ways (from the parser config in an individual parser):
kafka.topic
- Specify the topic in the config. This can be updated by updating the config, but it is data independent (e.g. not dependent on the data in a message).kafka.topicField
- Specify the topic as the value of a particular field. If unpopulated, then the message is dropped. This is inherrently data dependent.The kafka.topicField
parameter allows for data dependent topic selection and this inherrently enables the routing capabilities necessary for handling enveloped data.
Before we continue, let's briefly talk about metadata. We have exposed the ability to pass along metadata and interact with metadata in a decoupled way from the actual parser logic (i.e. the GrokParser should not have to consider how to interpret metadata).
There are three choices about manipulating metadata in Metron:
This enables users to specify metadata independent of the data that is persisted downstream and can inform the operations of enrichment and the profiler.
Now that we have an approach which enables the routing of the data, the remaining question is how to decouple parsing data from interpreting data and metadata. By default, Metron operates like so:
Beyond that, we presume defaults for this default strategy around handling metadata. In particular, by default we do not merge metadata and use a metron.metadata
prefix for all metadata.
In order to enable chained parser WITH metadata, we allow the following to be specified via strategy in the parser config:
The available strategies, specified by the rawMessageStrategy
configuration is eitherENVELOPE
or DEFAULT
.
Specifically, to enable parsing enveloped data (i.e. data in a field of a JSON blob with the other fields being metadata), one can specify the strategy and configuration of that strategy in the parser config. One must specify the rawMessageStrategy
as ENVELOPE
in the parser and the rawMessageStrategyConfig
to indicate the field which contains the data.
Together with routing, we have the complete solution to chain parsers which can:
rawMessageStrategy
whereby they pull the data out from JSON Map that they receiveTogether this enables a directed acyclic graph of parsers to handle single or multi-layer parsing.
For a complete example, look at the parser chaining use-case, however for a simple example the following should suffice.
If I want to configure a CSV parser to parse data which has 3 columns f1
, f2
and f3
and is held in a field called payload
inside of a JSON Map, I can do so like this:
{ "parserClassName" : "org.apache.metron.parsers.csv.CSVParser" ,"sensorTopic" : "my_topic" ,"rawMessageStrategy" : "ENVELOPE" ,"rawMessageStrategyConfig" : { "messageField" : "payload", "metadataPrefix" : "" } , "parserConfig": { "columns" : { "f1": 0, , "f2": 1, , "f3": 2 } } }
This would parse the following message:
{ "meta_f1" : "val1", "payload" : "foo,bar,grok", "original_string" : "2019 Jul, 01: val1 foo,bar,grok", "timestamp" : 10000 }
into
{ "meta_f1" : "val1", "f1" : "foo", "f2" : "bar", "f3" : "grok", "original_string" : "2019 Jul, 01: val1 foo,bar,grok", "timestamp" : 10002 }
Note a couple of things here:
meta_f1
is not prefixed here because we configured the strategy with metadataPrefix
as empty string.timestamp
is not inherited from the metadataoriginal_string
is inherited from the metadata