METRON-2343 Bro Kafka plugin - ability to dynamically modify JSON (ottobackwards) closes apache/metron-bro-plugin-kafka#46
diff --git a/README.md b/README.md
index b4aa98d..b7e7e58 100644
--- a/README.md
+++ b/README.md
@@ -266,6 +266,22 @@
_Note_: Because `Kafka::tag_json` is set to True in this example, the value of `$path` is used as the tag for each `Log::Filter`. If you were to add a log filter with the same `$path` as an existing filter, Zeek will append "-N", where N is an integer starting at 2, to the end of the log path so that each filter has its own unique log path. For instance, the second instance of `conn` would become `conn-2`.
+### Example 7 - Add static values to each outgoing Kafka message
+It is possible to define name value pairs and have them added to each outgoing Kafka json message when tagged_json is set to true. Each will be added to the root json object.
+ * the Kafka::additional_message_values table can be configured with each name and value
+ * based on the following configuration, each outgoing message will have "FIRST_STATIC_NAME": "FIRST_STATIC_VALUE", "SECOND_STATIC_NAME": "SECOND_STATIC_VALUE" added.
+```
+@load packages
+redef Kafka::logs_to_send = set(HTTP::LOG, DNS::LOG, Conn::LOG, DPD::LOG, FTP::LOG, Files::LOG, Known::CERTS_LOG, SMTP::LOG, SSL::LOG, Weird::LOG, Notice::LOG, DHCP::LOG, SSH::LOG, Software::LOG, RADIUS::LOG, X509::LOG, RFB::LOG, Stats::LOG, CaptureLoss::LOG, SIP::LOG);
+redef Kafka::topic_name = "zeek";
+redef Kafka::tag_json = T;
+redef Kafka::kafka_conf = table(["metadata.broker.list"] = "kafka-1:9092,kafka-2:9092");
+redef Kafka::additional_message_values = table(["FIRST_STATIC_NAME"] = "FIRST_STATIC_VALUE", ["SECOND_STATIC_NAME"] = "SECOND_STATIC_VALUE");
+redef Kafka::logs_to_exclude = set(Conn::LOG, DHCP::LOG);
+redef Known::cert_tracking = ALL_HOSTS;
+redef Software::asset_tracking = ALL_HOSTS;
+```
+
## Settings
### `logs_to_send`
@@ -315,6 +331,18 @@
);
```
+### `additonal_message_values`
+
+A table of of name value pairs. Each item in this table will be added to each outgoing message
+at the root level if tag_json is set to T.
+
+```
+redef Kafka::additional_message_values = table(
+ ["FIRST_STATIC_NAME"] = "FIRST_STATIC_VALUE",
+ ["SECOND_STATIC_NAME"] = "SECOND_STATIC_VALUE"
+);
+```
+
### `tag_json`
If true, a log stream identifier is appended to each JSON-formatted message. For
diff --git a/docker/in_docker_scripts/configure_plugin.sh b/docker/in_docker_scripts/configure_plugin.sh
index c4479db..8d2f3da 100755
--- a/docker/in_docker_scripts/configure_plugin.sh
+++ b/docker/in_docker_scripts/configure_plugin.sh
@@ -23,6 +23,7 @@
# Configures the zeek kafka plugin
# Configures the kafka broker
# Configures the plugin for all the traffic types
+# Configures the plugin to add some additional json values
#
function help {
@@ -74,6 +75,7 @@
echo "redef Kafka::topic_name = \"${KAFKA_TOPIC}\";"
echo "redef Kafka::tag_json = T;"
echo "redef Kafka::kafka_conf = table([\"metadata.broker.list\"] = \"kafka-1:9092,kafka-2:9092\");"
+ echo "redef Kafka::additional_message_values = table([\"FIRST_STATIC_NAME\"] = \"FIRST_STATIC_VALUE\", [\"SECOND_STATIC_NAME\"] = \"SECOND_STATIC_VALUE\");"
echo "redef Kafka::logs_to_exclude = set(Conn::LOG, DHCP::LOG);"
echo "redef Known::cert_tracking = ALL_HOSTS;"
echo "redef Software::asset_tracking = ALL_HOSTS;"
diff --git a/scripts/init.zeek b/scripts/init.zeek
index 6f5a7ae..5636a13 100644
--- a/scripts/init.zeek
+++ b/scripts/init.zeek
@@ -53,6 +53,13 @@
["metadata.broker.list"] = "localhost:9092"
) &redef;
+ ## Key value pairs that will be added to outgoing messages at the root level
+ ## for example: ["zeek_server"] = "this_server_name"
+ ## will results in a "zeek_server":"this_server_name" field added to the outgoing
+ ## json
+ ## note this depends on tag_json being T
+ const additional_message_values: table[string] of string = table() &redef;
+
## A comma separated list of librdkafka debug contexts
const debug: string = "" &redef;
diff --git a/src/KafkaWriter.cc b/src/KafkaWriter.cc
index 7f26092..deeea95 100644
--- a/src/KafkaWriter.cc
+++ b/src/KafkaWriter.cc
@@ -68,6 +68,22 @@
Unref(index);
delete k;
}
+
+ Val* mvals = BifConst::Kafka::additional_message_values->AsTableVal();
+ c = val->AsTable()->InitForIteration();
+ while ((v = mvals->AsTable()->NextEntry(k, c))) {
+
+ // fetch the key and value
+ ListVal* index = mvals->AsTableVal()->RecoverIndex(k);
+ string key = index->Index(0)->AsString()->CheckString();
+ string val = v->Value()->AsString()->CheckString();
+ additional_message_values.insert (additional_message_values.begin(), pair<string, string> (key, val));
+
+ // cleanup
+ Unref(index);
+ delete k;
+ }
+
}
KafkaWriter::~KafkaWriter()
@@ -126,7 +142,7 @@
}
else if ( strcmp(json_timestamps.c_str(), "JSON::TS_ISO8601") == 0 ) {
tf = threading::formatter::JSON::TS_ISO8601;
- }
+ }
else {
Error(Fmt("KafkaWriter::DoInit: Invalid JSON timestamp format %s",
json_timestamps.c_str()));
@@ -136,7 +152,7 @@
// initialize the formatter
if(BifConst::Kafka::tag_json) {
formatter = new threading::formatter::TaggedJSON(info.path, this, tf);
- }
+ }
else {
formatter = new threading::formatter::JSON(this, tf);
}
@@ -148,9 +164,6 @@
if(is_debug) {
MsgThread::Info(Fmt("Debug is turned on and set to: %s. Available debug context: %s.", debug.c_str(), RdKafka::get_debug_contexts().c_str()));
}
- else {
- MsgThread::Info(Fmt("Debug is turned off."));
- }
// kafka global configuration
string err;
@@ -249,7 +262,12 @@
buff.Clear();
// format the log entry
+ if(BifConst::Kafka::tag_json) {
+ dynamic_cast<threading::formatter::TaggedJSON*>(formatter)->Describe(&buff, num_fields, fields, vals,
+ additional_message_values);
+ } else {
formatter->Describe(&buff, num_fields, fields, vals);
+ }
// send the formatted log entry to kafka
const char *raw = (const char *) buff.Bytes();
diff --git a/src/KafkaWriter.h b/src/KafkaWriter.h
index 5ebf4ef..ad29ac7 100644
--- a/src/KafkaWriter.h
+++ b/src/KafkaWriter.h
@@ -73,6 +73,7 @@
bool mocking;
string json_timestamps;
map<string, string> kafka_conf;
+ map<string, string> additional_message_values;
string topic_name;
string topic_name_override;
threading::formatter::Formatter *formatter;
diff --git a/src/TaggedJSON.cc b/src/TaggedJSON.cc
index db3f305..f182d95 100644
--- a/src/TaggedJSON.cc
+++ b/src/TaggedJSON.cc
@@ -25,7 +25,7 @@
TaggedJSON::~TaggedJSON()
{}
-bool TaggedJSON::Describe(ODesc* desc, int num_fields, const Field* const* fields, Value** vals) const
+bool TaggedJSON::Describe(ODesc* desc, int num_fields, const Field* const* fields, Value** vals, map<string,string> &const_vals) const
{
desc->AddRaw("{");
@@ -34,10 +34,26 @@
desc->AddRaw(stream_name);
desc->AddRaw("\": ");
+
+
// append the JSON formatted log record itself
JSON::Describe(desc, num_fields, fields, vals);
+ if (const_vals.size() > 0) {
- desc->AddRaw("}");
+ map<string, string>::iterator it = const_vals.begin();
+ while (it != const_vals.end()) {
+ desc->AddRaw(",");
+ desc->AddRaw("\"");
+ desc->AddRaw(it->first);
+ desc->AddRaw("\": ");
+ desc->AddRaw("\"");
+ desc->AddRaw(it->second);
+ desc->AddRaw("\"");
+ it++;
+ }
+ }
+
+ desc->AddRaw("}");
return true;
}
}}
diff --git a/src/TaggedJSON.h b/src/TaggedJSON.h
index 51b1bf3..9135bf2 100644
--- a/src/TaggedJSON.h
+++ b/src/TaggedJSON.h
@@ -41,7 +41,7 @@
public:
TaggedJSON(string stream_name, MsgThread* t, JSON::TimeFormat tf);
virtual ~TaggedJSON();
- virtual bool Describe(ODesc* desc, int num_fields, const Field* const* fields, Value** vals) const;
+ virtual bool Describe(ODesc* desc, int num_fields, const Field* const* fields, Value** vals, map<string,string> &const_vals) const;
private:
string stream_name;
diff --git a/src/kafka.bif b/src/kafka.bif
index 53e3bfe..47983b1 100644
--- a/src/kafka.bif
+++ b/src/kafka.bif
@@ -18,6 +18,7 @@
module Kafka;
const kafka_conf: config;
+const additional_message_values : config;
const topic_name: string;
const max_wait_on_shutdown: count;
const tag_json: bool;
diff --git a/tests/Baseline/kafka.show-plugin/output b/tests/Baseline/kafka.show-plugin/output
index 978febc..6e82dd3 100644
--- a/tests/Baseline/kafka.show-plugin/output
+++ b/tests/Baseline/kafka.show-plugin/output
@@ -1,6 +1,7 @@
Apache::Kafka - Writes logs to Kafka (dynamic)
[Writer] KafkaWriter (Log::WRITER_KAFKAWRITER)
[Constant] Kafka::kafka_conf
+ [Constant] Kafka::additional_message_values
[Constant] Kafka::topic_name
[Constant] Kafka::max_wait_on_shutdown
[Constant] Kafka::tag_json