METRON-2343 Bro Kafka plugin - ability to dynamically modify JSON (ottobackwards) closes apache/metron-bro-plugin-kafka#46
diff --git a/README.md b/README.md
index b4aa98d..b7e7e58 100644
--- a/README.md
+++ b/README.md
@@ -266,6 +266,22 @@
 
 _Note_:  Because `Kafka::tag_json` is set to True in this example, the value of `$path` is used as the tag for each `Log::Filter`. If you were to add a log filter with the same `$path` as an existing filter, Zeek will append "-N", where N is an integer starting at 2, to the end of the log path so that each filter has its own unique log path. For instance, the second instance of `conn` would become `conn-2`.
 
+### Example 7 - Add static values to each outgoing Kafka message
+It is possible to define name value pairs and have them added to each outgoing Kafka json message when tagged_json is set to true.  Each will be added to the root json object.
+    * the Kafka::additional_message_values table can be configured with each name and value
+    * based on the following configuration, each outgoing message will have "FIRST_STATIC_NAME": "FIRST_STATIC_VALUE", "SECOND_STATIC_NAME": "SECOND_STATIC_VALUE" added.
+```
+@load packages
+redef Kafka::logs_to_send = set(HTTP::LOG, DNS::LOG, Conn::LOG, DPD::LOG, FTP::LOG, Files::LOG, Known::CERTS_LOG, SMTP::LOG, SSL::LOG, Weird::LOG, Notice::LOG, DHCP::LOG, SSH::LOG, Software::LOG, RADIUS::LOG, X509::LOG, RFB::LOG, Stats::LOG, CaptureLoss::LOG, SIP::LOG);
+redef Kafka::topic_name = "zeek";
+redef Kafka::tag_json = T;
+redef Kafka::kafka_conf = table(["metadata.broker.list"] = "kafka-1:9092,kafka-2:9092");
+redef Kafka::additional_message_values = table(["FIRST_STATIC_NAME"] = "FIRST_STATIC_VALUE", ["SECOND_STATIC_NAME"] = "SECOND_STATIC_VALUE");
+redef Kafka::logs_to_exclude = set(Conn::LOG, DHCP::LOG);
+redef Known::cert_tracking = ALL_HOSTS;
+redef Software::asset_tracking = ALL_HOSTS;
+```
+
 ## Settings
 
 ### `logs_to_send`
@@ -315,6 +331,18 @@
 );
 ```
 
+### `additonal_message_values`
+
+A table of of name value pairs.  Each item in this table will be added to each outgoing message
+at the root level if tag_json is set to T.
+
+```
+redef Kafka::additional_message_values = table(
+    ["FIRST_STATIC_NAME"] = "FIRST_STATIC_VALUE",
+    ["SECOND_STATIC_NAME"] = "SECOND_STATIC_VALUE"
+);
+```
+
 ### `tag_json`
 
 If true, a log stream identifier is appended to each JSON-formatted message. For
diff --git a/docker/in_docker_scripts/configure_plugin.sh b/docker/in_docker_scripts/configure_plugin.sh
index c4479db..8d2f3da 100755
--- a/docker/in_docker_scripts/configure_plugin.sh
+++ b/docker/in_docker_scripts/configure_plugin.sh
@@ -23,6 +23,7 @@
 # Configures the zeek kafka plugin
 # Configures the kafka broker
 # Configures the plugin for all the traffic types
+# Configures the plugin to add some additional json values
 #
 
 function help {
@@ -74,6 +75,7 @@
   echo "redef Kafka::topic_name = \"${KAFKA_TOPIC}\";"
   echo "redef Kafka::tag_json = T;"
   echo "redef Kafka::kafka_conf = table([\"metadata.broker.list\"] = \"kafka-1:9092,kafka-2:9092\");"
+  echo "redef Kafka::additional_message_values = table([\"FIRST_STATIC_NAME\"] = \"FIRST_STATIC_VALUE\", [\"SECOND_STATIC_NAME\"] = \"SECOND_STATIC_VALUE\");"
   echo "redef Kafka::logs_to_exclude = set(Conn::LOG, DHCP::LOG);"
   echo "redef Known::cert_tracking = ALL_HOSTS;"
   echo "redef Software::asset_tracking = ALL_HOSTS;"
diff --git a/scripts/init.zeek b/scripts/init.zeek
index 6f5a7ae..5636a13 100644
--- a/scripts/init.zeek
+++ b/scripts/init.zeek
@@ -53,6 +53,13 @@
                 ["metadata.broker.list"] = "localhost:9092"
         ) &redef;
 
+        ##  Key value pairs that will be added to outgoing messages at the root level
+        ##  for example:          ["zeek_server"] = "this_server_name"
+        ##  will results in a  "zeek_server":"this_server_name" field added to the outgoing
+        ##  json
+        ##  note this depends on tag_json being T
+        const additional_message_values: table[string] of string = table() &redef;
+
         ## A comma separated list of librdkafka debug contexts
         const debug: string = "" &redef;
 
diff --git a/src/KafkaWriter.cc b/src/KafkaWriter.cc
index 7f26092..deeea95 100644
--- a/src/KafkaWriter.cc
+++ b/src/KafkaWriter.cc
@@ -68,6 +68,22 @@
       Unref(index);
       delete k;
   }
+
+  Val* mvals = BifConst::Kafka::additional_message_values->AsTableVal();
+  c = val->AsTable()->InitForIteration();
+  while ((v = mvals->AsTable()->NextEntry(k, c))) {
+
+    // fetch the key and value
+    ListVal* index = mvals->AsTableVal()->RecoverIndex(k);
+    string key = index->Index(0)->AsString()->CheckString();
+    string val = v->Value()->AsString()->CheckString();
+    additional_message_values.insert (additional_message_values.begin(), pair<string, string> (key, val));
+
+    // cleanup
+    Unref(index);
+    delete k;
+  }
+
 }
 
 KafkaWriter::~KafkaWriter()
@@ -126,7 +142,7 @@
     }
     else if ( strcmp(json_timestamps.c_str(), "JSON::TS_ISO8601") == 0 ) {
       tf = threading::formatter::JSON::TS_ISO8601;
-    } 
+    }
     else {
       Error(Fmt("KafkaWriter::DoInit: Invalid JSON timestamp format %s",
         json_timestamps.c_str()));
@@ -136,7 +152,7 @@
     // initialize the formatter
     if(BifConst::Kafka::tag_json) {
       formatter = new threading::formatter::TaggedJSON(info.path, this, tf);
-    } 
+    }
     else {
       formatter = new threading::formatter::JSON(this, tf);
     }
@@ -148,9 +164,6 @@
     if(is_debug) {
       MsgThread::Info(Fmt("Debug is turned on and set to: %s.  Available debug context: %s.", debug.c_str(), RdKafka::get_debug_contexts().c_str()));
     }
-    else {
-      MsgThread::Info(Fmt("Debug is turned off."));
-    }
 
     // kafka global configuration
     string err;
@@ -249,7 +262,12 @@
         buff.Clear();
 
         // format the log entry
+      if(BifConst::Kafka::tag_json) {
+          dynamic_cast<threading::formatter::TaggedJSON*>(formatter)->Describe(&buff, num_fields, fields, vals,
+                              additional_message_values);
+       } else {
         formatter->Describe(&buff, num_fields, fields, vals);
+       }
 
         // send the formatted log entry to kafka
         const char *raw = (const char *) buff.Bytes();
diff --git a/src/KafkaWriter.h b/src/KafkaWriter.h
index 5ebf4ef..ad29ac7 100644
--- a/src/KafkaWriter.h
+++ b/src/KafkaWriter.h
@@ -73,6 +73,7 @@
     bool mocking;
     string json_timestamps;
     map<string, string> kafka_conf;
+    map<string, string> additional_message_values;
     string topic_name;
     string topic_name_override;
     threading::formatter::Formatter *formatter;
diff --git a/src/TaggedJSON.cc b/src/TaggedJSON.cc
index db3f305..f182d95 100644
--- a/src/TaggedJSON.cc
+++ b/src/TaggedJSON.cc
@@ -25,7 +25,7 @@
 TaggedJSON::~TaggedJSON()
 {}
 
-bool TaggedJSON::Describe(ODesc* desc, int num_fields, const Field* const* fields, Value** vals) const
+bool TaggedJSON::Describe(ODesc* desc, int num_fields, const Field* const* fields, Value** vals, map<string,string> &const_vals) const
 {
     desc->AddRaw("{");
 
@@ -34,10 +34,26 @@
     desc->AddRaw(stream_name);
     desc->AddRaw("\": ");
 
+
+
     // append the JSON formatted log record itself
     JSON::Describe(desc, num_fields, fields, vals);
+    if (const_vals.size() > 0) {
 
-    desc->AddRaw("}");
+      map<string, string>::iterator it = const_vals.begin();
+      while (it != const_vals.end()) {
+        desc->AddRaw(",");
+        desc->AddRaw("\"");
+        desc->AddRaw(it->first);
+        desc->AddRaw("\": ");
+        desc->AddRaw("\"");
+        desc->AddRaw(it->second);
+        desc->AddRaw("\"");
+        it++;
+      }
+    }
+
+  desc->AddRaw("}");
     return true;
 }
 }}
diff --git a/src/TaggedJSON.h b/src/TaggedJSON.h
index 51b1bf3..9135bf2 100644
--- a/src/TaggedJSON.h
+++ b/src/TaggedJSON.h
@@ -41,7 +41,7 @@
 public:
     TaggedJSON(string stream_name, MsgThread* t, JSON::TimeFormat tf);
     virtual ~TaggedJSON();
-    virtual bool Describe(ODesc* desc, int num_fields, const Field* const* fields, Value** vals) const;
+    virtual bool Describe(ODesc* desc, int num_fields, const Field* const* fields, Value** vals, map<string,string> &const_vals) const;
 
 private:
     string stream_name;
diff --git a/src/kafka.bif b/src/kafka.bif
index 53e3bfe..47983b1 100644
--- a/src/kafka.bif
+++ b/src/kafka.bif
@@ -18,6 +18,7 @@
 module Kafka;
 
 const kafka_conf: config;
+const additional_message_values : config;
 const topic_name: string;
 const max_wait_on_shutdown: count;
 const tag_json: bool;
diff --git a/tests/Baseline/kafka.show-plugin/output b/tests/Baseline/kafka.show-plugin/output
index 978febc..6e82dd3 100644
--- a/tests/Baseline/kafka.show-plugin/output
+++ b/tests/Baseline/kafka.show-plugin/output
@@ -1,6 +1,7 @@
 Apache::Kafka - Writes logs to Kafka (dynamic)
     [Writer] KafkaWriter (Log::WRITER_KAFKAWRITER)
     [Constant] Kafka::kafka_conf
+    [Constant] Kafka::additional_message_values
     [Constant] Kafka::topic_name
     [Constant] Kafka::max_wait_on_shutdown
     [Constant] Kafka::tag_json