METRON-1992 Support sending a log to multiple topics (JonZeolla) closes apache/metron-bro-plugin-kafka#23
diff --git a/README.md b/README.md
index 7cc2c46..72436e9 100644
--- a/README.md
+++ b/README.md
@@ -114,7 +114,7 @@
### Example 2 - Send all active logs
-This plugin has the ability send all active logs to kafka with the following configuration.
+This plugin has the ability send all active logs to the "bro" kafka topic with the following configuration.
```
@load packages/metron-bro-plugin-kafka/Apache/Kafka
@@ -138,7 +138,7 @@
);
```
-### Example 4 - Send logs to unique topics
+### Example 4 - Send each bro log to a unique topic
It is also possible to send each log stream to a uniquely named topic. The goal in this example is to send all HTTP records to a Kafka topic named `http` and all DNS records to a separate Kafka topic named `dns`.
* The `topic_name` value must be set to an empty string.
@@ -228,6 +228,43 @@
* You can also filter IPv6 logs from within your Metron cluster [using Stellar](https://github.com/apache/metron/tree/master/metron-stellar/stellar-common#is_ip). In that case, you wouldn't apply a predicate in your bro configuration, and instead Stellar would filter the logs out before they were processed by the enrichment layer of Metron.
* It is also possible to use the `is_v6_subnet()` bro function in your predicate, as of their [2.5 release](https://www.bro.org/sphinx-git/install/release-notes.html#bro-2-5), however the above example should work on [bro 2.4](https://www.bro.org/sphinx-git/install/release-notes.html#bro-2-4) and newer, which has been the focus of the kafka plugin.
+### Example 6 - Sending a log to multiple topics
+
+You are able to send a single bro log to multiple different kafka topics in the same kafka cluster by overriding the default topic (configured with `Kafka::topic_name`) by creating a custom bro `Log::Filter`. In this example, the DHCP, RADIUS, and DNS logs are sent to the "bro" topic; the RADIUS log is duplicated to the "shew_bro_radius" topic; and the DHCP log is duplicated to the "shew_bro_dhcp" topic.
+
+```
+@load packages/metron-bro-plugin-kafka/Apache/Kafka
+redef Kafka::logs_to_send = set(DHCP::LOG, RADIUS::LOG, DNS::LOG);
+redef Kafka::topic_name = "bro";
+redef Kafka::kafka_conf = table(
+ ["metadata.broker.list"] = "server1.example.com:9092,server2.example.com:9092"
+);
+redef Kafka::tag_json = T;
+
+event bro_init() &priority=-10
+{
+ # Send RADIUS to the shew_bro_radius topic
+ local shew_radius_filter: Log::Filter = [
+ $name = "kafka-radius-shew",
+ $writer = Log::WRITER_KAFKAWRITER,
+ $path = "shew_bro_radius"
+ $config = table(["topic_name"] = "shew_bro_radius")
+ ];
+ Log::add_filter(RADIUS::LOG, shew_radius_filter);
+
+ # Send DHCP to the shew_bro_dhcp topic
+ local shew_dhcp_filter: Log::Filter = [
+ $name = "kafka-dhcp-shew",
+ $writer = Log::WRITER_KAFKAWRITER,
+ $path = "shew_bro_dhcp"
+ $config = table(["topic_name"] = "shew_bro_dhcp")
+ ];
+ Log::add_filter(DHCP::LOG, shew_dhcp_filter);
+}
+```
+
+_Note_: Because `Kafka::tag_json` is set to True in this example, the value of `$path` is used as the tag for each `Log::Filter`. If you were to add a log filter with the same `$path` as an existing filter, Bro will append "-N", where N is an integer starting at 2, to the end of the log path so that each filter has its own unique log path. For instance, the second instance of `conn` would become `conn-2`.
+
## Settings
### `logs_to_send`
diff --git a/src/KafkaWriter.cc b/src/KafkaWriter.cc
index 1d4a28a..563ef74 100644
--- a/src/KafkaWriter.cc
+++ b/src/KafkaWriter.cc
@@ -73,6 +73,15 @@
// Cleanup must happen in DoFinish, not in the destructor
}
+string KafkaWriter::GetConfigValue(const WriterInfo& info, const string name) const
+{
+ map<const char*, const char*>::const_iterator it = info.config.find(name.c_str());
+ if (it == info.config.end())
+ return string();
+ else
+ return it->second;
+}
+
/**
* DoInit is called once for each call to the constructor, but in a separate
* thread
@@ -82,9 +91,12 @@
// Timeformat object, default to TS_EPOCH
threading::formatter::JSON::TimeFormat tf = threading::formatter::JSON::TS_EPOCH;
- // if no global 'topic_name' is defined, use the log stream's 'path'
- if(topic_name.empty()) {
- topic_name = info.path;
+ // Allow overriding of the kafka topic via the Bro script constant "topic_name"
+ // which can be applied when adding a new Bro log filter.
+ topic_name_override = GetConfigValue(info, "topic_name");
+
+ if(!topic_name_override.empty()) {
+ topic_name = topic_name_override;
}
/**
diff --git a/src/KafkaWriter.h b/src/KafkaWriter.h
index 14e0f7e..c67c664 100644
--- a/src/KafkaWriter.h
+++ b/src/KafkaWriter.h
@@ -65,12 +65,14 @@
virtual bool DoHeartbeat(double network_time, double current_time);
private:
+ string GetConfigValue(const WriterInfo& info, const string name) const;
static const string default_topic_key;
string stream_id;
bool tag_json;
string json_timestamps;
map<string, string> kafka_conf;
string topic_name;
+ string topic_name_override;
threading::formatter::Formatter *formatter;
RdKafka::Producer* producer;
RdKafka::Topic* topic;