| Logging Bro Output to Kafka |
| =========================== |
| |
| A Bro log writer that sends logging output to Kafka. This provides a convenient means for tools in the Hadoop ecosystem, such as Storm, Spark, and others, to process the data generated by Bro. |
| |
| Installation |
| ------------ |
| |
| 1. Install [librdkafka](https://github.com/edenhill/librdkafka), a native client library for Kafka. This plugin has been tested against the latest release of librdkafka, which at the time of this writing is v0.9.4. |
| |
| In order to use this plugin within a kerberized Kafka environment, you will also need `libsasl2` installed and will need to pass `--enable-sasl` to the `configure` script. |
| |
| ``` |
| curl -L https://github.com/edenhill/librdkafka/archive/v0.9.4.tar.gz | tar xvz |
| cd librdkafka-0.9.4/ |
| ./configure --enable-sasl |
| make |
| sudo make install |
| ``` |
| |
| 1. Build the plugin using the following commands. |
| |
| ``` |
| ./configure --bro-dist=$BRO_SRC |
| make |
| sudo make install |
| ``` |
| |
| 1. Run the following command to ensure that the plugin was installed successfully. |
| |
| ``` |
| $ bro -N Bro::Kafka |
| Bro::Kafka - Writes logs to Kafka (dynamic, version 0.1) |
| ``` |
| |
| Activation |
| ---------- |
| |
| The following examples highlight different ways that the plugin can be used. Simply add the Bro script language to your `local.bro` file (for example, `/usr/share/bro/site/local.bro`) as shown to demonstrate the example. |
| |
| ### Example 1 |
| |
| The goal in this example is to send all HTTP and DNS records to a Kafka topic named `bro`. |
| * Any configuration value accepted by librdkafka can be added to the `kafka_conf` configuration table. |
| * By defining `topic_name` all records will be sent to the same Kafka topic. |
| * Defining `logs_to_send` will ensure that only HTTP and DNS records are sent. |
| |
| ``` |
| @load Bro/Kafka/logs-to-kafka.bro |
| redef Kafka::logs_to_send = set(HTTP::LOG, DNS::LOG); |
| redef Kafka::topic_name = "bro"; |
| redef Kafka::kafka_conf = table( |
| ["metadata.broker.list"] = "localhost:9092" |
| ); |
| ``` |
| |
| ### Example 2 |
| |
| It is also possible to send each log stream to a uniquely named topic. The goal in this example is to send all HTTP records to a Kafka topic named `http` and all DNS records to a separate Kafka topic named `dns`. |
| * The `topic_name` value must be set to an empty string. |
| * The `$path` value of Bro's Log Writer mechanism is used to define the topic name. |
| * Any configuration value accepted by librdkafka can be added to the `$config` configuration table. |
| * Each log writer accepts a separate configuration table. |
| |
| ``` |
| @load Bro/Kafka/logs-to-kafka.bro |
| redef Kafka::topic_name = ""; |
| redef Kafka::tag_json = T; |
| |
| event bro_init() |
| { |
| # handles HTTP |
| local http_filter: Log::Filter = [ |
| $name = "kafka-http", |
| $writer = Log::WRITER_KAFKAWRITER, |
| $config = table( |
| ["metadata.broker.list"] = "localhost:9092" |
| ), |
| $path = "http" |
| ]; |
| Log::add_filter(HTTP::LOG, http_filter); |
| |
| # handles DNS |
| local dns_filter: Log::Filter = [ |
| $name = "kafka-dns", |
| $writer = Log::WRITER_KAFKAWRITER, |
| $config = table( |
| ["metadata.broker.list"] = "localhost:9092" |
| ), |
| $path = "dns" |
| ]; |
| Log::add_filter(DNS::LOG, dns_filter); |
| } |
| ``` |
| |
| ### Example 3 |
| |
| You may want to configure bro to filter log messages with certain characteristics from being sent to your kafka topics. For instance, Metron currently doesn't support IPv6 source or destination IPs in the default enrichments, so it may be helpful to filter those log messages from being sent to kafka (although there are [multiple ways](#notes) to approach this). In this example we will do that that, and are assuming a somewhat standard bro kafka plugin configuration, such that: |
| * All bro logs are sent to the `bro` topic, by configuring `Kafka::topic_name`. |
| * Each JSON message is tagged with the appropriate log type (such as `http`, `dns`, or `conn`), by setting `tag_json` to true. |
| * If the log message contains a 128 byte long source or destination IP address, the log is not sent to kafka. |
| |
| ``` |
| @load Bro/Kafka/logs-to-kafka.bro |
| redef Kafka::topic_name = "bro"; |
| redef Kafka::tag_json = T; |
| |
| event bro_init() &priority=-5 |
| { |
| # handles HTTP |
| Log::add_filter(HTTP::LOG, [ |
| $name = "kafka-http", |
| $writer = Log::WRITER_KAFKAWRITER, |
| $pred(rec: HTTP::Info) = { return ! (( |rec$id$orig_h| == 128 || |rec$id$resp_h| == 128 )); }, |
| $config = table( |
| ["metadata.broker.list"] = "localhost:9092" |
| ) |
| ]); |
| |
| # handles DNS |
| Log::add_filter(DNS::LOG, [ |
| $name = "kafka-dns", |
| $writer = Log::WRITER_KAFKAWRITER, |
| $pred(rec: DNS::Info) = { return ! (( |rec$id$orig_h| == 128 || |rec$id$resp_h| == 128 )); }, |
| $config = table( |
| ["metadata.broker.list"] = "localhost:9092" |
| ) |
| ]); |
| |
| # handles Conn |
| Log::add_filter(Conn::LOG, [ |
| $name = "kafka-conn", |
| $writer = Log::WRITER_KAFKAWRITER, |
| $pred(rec: Conn::Info) = { return ! (( |rec$id$orig_h| == 128 || |rec$id$resp_h| == 128 )); }, |
| $config = table( |
| ["metadata.broker.list"] = "localhost:9092" |
| ) |
| ]); |
| } |
| ``` |
| |
| #### Notes |
| * `logs_to_send` is mutually exclusive with `$pred`, thus for each log you want to set `$pred` on, you must individually setup a `Log::add_filter` and refrain from including that log in `logs_to_send`. |
| * You can also filter IPv6 logs from within your Metron cluster [using Stellar](../../metron-platform/metron-common#IS_IP). In that case, you wouldn't apply a predicate in your bro configuration, and instead Stellar would filter the logs out before they were processed by the enrichment layer of Metron. |
| * It is also possible to use the `is_v6_subnet()` bro function in your predicate, as of their [2.5 release](https://www.bro.org/sphinx-git/install/release-notes.html#bro-2-5), however the above example should work on [bro 2.4](https://www.bro.org/sphinx-git/install/release-notes.html#bro-2-4) and newer, which has been the focus of the kafka plugin. |
| |
| Settings |
| -------- |
| |
| ### `kafka_conf` |
| |
| The global configuration settings for Kafka. These values are passed through |
| directly to librdkafka. Any valid librdkafka settings can be defined in this |
| table. The full set of valid librdkafka settings are available |
| [here](https://github.com/edenhill/librdkafka/blob/v0.9.4/CONFIGURATION.md). |
| |
| ``` |
| redef Kafka::kafka_conf = table( |
| ["metadata.broker.list"] = "localhost:9092", |
| ["client.id"] = "bro" |
| ); |
| ``` |
| |
| ### `topic_name` |
| |
| The name of the topic in Kafka where all Bro logs will be sent to. |
| |
| ``` |
| redef Kafka::topic_name = "bro"; |
| ``` |
| |
| ### `max_wait_on_shutdown` |
| |
| The maximum number of milliseconds that the plugin will wait for any backlog of |
| queued messages to be sent to Kafka before forced shutdown. |
| |
| ``` |
| redef Kafka::max_wait_on_shutdown = 3000; |
| ``` |
| |
| ### `tag_json` |
| |
| If true, a log stream identifier is appended to each JSON-formatted message. For |
| example, a Conn::LOG message will look like `{ 'conn' : { ... }}`. |
| |
| ``` |
| redef Kafka::tag_json = T; |
| ``` |
| |
| ### `debug` |
| |
| A comma separated list of debug contexts in librdkafka which you want to |
| enable. The available contexts are: |
| * generic |
| * broker |
| * topic |
| * metadata |
| * queue |
| * msg |
| * protocol |
| * cgrp |
| * security |
| * fetch |
| * feature |
| * all |
| |
| Kerberos |
| -------- |
| |
| This plugin supports producing messages from a kerberized kafka. There |
| are a couple of prerequisites and a couple of settings to set. |
| |
| ### SASL |
| If you are using SASL as a security protocol for kafka, then you must have |
| libsasl or libsasl2 installed. You can tell if sasl is enabled by |
| running the following from the directory in which you have build |
| librdkafka: |
| ``` |
| examples/rdkafka_example -X builtin.features |
| builtin.features = gzip,snappy,ssl,sasl,regex |
| ``` |
| |
| ### Producer Config |
| |
| As stated above, you can configure the producer kafka configs in |
| `${BRO_HOME}/share/bro/site/local.bro`. There are a few configs |
| necessary to set, which are described |
| [here](https://github.com/edenhill/librdkafka/wiki/Using-SASL-with-librdkafka). |
| For an environment where the following is true: |
| * The broker is `node1:6667` |
| * This kafka is using `SASL_PLAINTEXT` as the security protocol |
| * The keytab used is the `metron` keytab |
| * The service principal for `metron` is `metron@EXAMPLE.COM` |
| |
| The kafka topic `bro` has been given permission for the `metron` user to |
| write: |
| ``` |
| # login using the metron user |
| kinit -kt /etc/security/keytabs/metron.headless.keytab metron@EXAMPLE.COM |
| ${KAFKA_HOME}/kafka-broker/bin/kafka-acls.sh --authorizer kafka.security.auth.SimpleAclAuthorizer --authorizer-properties zookeeper.connect=node1:2181 --add --allow-principal User:metron --topic bro |
| ``` |
| |
| The following is how the `${BRO_HOME}/share/bro/site/local.bro` looks: |
| ``` |
| @load Bro/Kafka/logs-to-kafka.bro |
| redef Kafka::logs_to_send = set(HTTP::LOG, DNS::LOG); |
| redef Kafka::topic_name = "bro"; |
| redef Kafka::tag_json = T; |
| redef Kafka::kafka_conf = table( ["metadata.broker.list"] = "node1:6667" |
| , ["security.protocol"] = "SASL_PLAINTEXT" |
| , ["sasl.kerberos.keytab"] = "/etc/security/keytabs/metron.headless.keytab" |
| , ["sasl.kerberos.principal"] = "metron@EXAMPLE.COM" |
| ); |
| ``` |