METRON-1469 Kafka Plugin for Bro - Configurable JSON Timestamps (dcode via nickwallen) closes apache/metron-bro-plugin-kafka#6
diff --git a/README.md b/README.md
index 0c67347..8aff3ba 100644
--- a/README.md
+++ b/README.md
@@ -37,11 +37,10 @@
 
 ### Example 1
 
-The goal in this example is to send all HTTP and DNS records to a Kafka topic named `bro`. 
+The goal in this example is to send all HTTP and DNS records to a Kafka topic named `bro`.
  * Any configuration value accepted by librdkafka can be added to the `kafka_conf` configuration table.  
  * By defining `topic_name` all records will be sent to the same Kafka topic.
- * Defining `logs_to_send` will ensure that only HTTP and DNS records are sent.
-
+ * Defining `logs_to_send` will ensure that only HTTP and DNS records are sent. 
 ```
 @load packages/metron-bro-plugin-kafka/Apache/Kafka
 redef Kafka::logs_to_send = set(HTTP::LOG, DNS::LOG);
@@ -183,6 +182,15 @@
 redef Kafka::tag_json = T;
 ```
 
+### `json_timestamps`
+
+Uses Ascii log writer for timestamp format. Default is `JSON::TS_EPOCH`. Other
+options are `JSON::TS_MILLIS` and `JSON::TS_ISO8601`.
+
+```
+redef Kafka::json_timestamps = JSON::TS_ISO8601;
+```
+
 ### `debug`
 
 A comma separated list of debug contexts in librdkafka which you want to
diff --git a/scripts/init.bro b/scripts/init.bro
index 65fb9e7..ad9f0a1 100644
--- a/scripts/init.bro
+++ b/scripts/init.bro
@@ -22,6 +22,7 @@
   const topic_name: string = "bro" &redef;
   const max_wait_on_shutdown: count = 3000 &redef;
   const tag_json: bool = F &redef;
+  const json_timestamps: JSON::TimestampFormat = JSON::TS_EPOCH &redef;
   const kafka_conf: table[string] of string = table(
     ["metadata.broker.list"] = "localhost:9092"
   ) &redef;
diff --git a/src/KafkaWriter.cc b/src/KafkaWriter.cc
index c9ad44f..79b5aa0 100644
--- a/src/KafkaWriter.cc
+++ b/src/KafkaWriter.cc
@@ -20,7 +20,11 @@
 using namespace logging;
 using namespace writer;
 
-KafkaWriter::KafkaWriter(WriterFrontend* frontend): WriterBackend(frontend), formatter(NULL), producer(NULL), topic(NULL)
+KafkaWriter::KafkaWriter(WriterFrontend* frontend):
+    WriterBackend(frontend),
+    formatter(NULL),
+    producer(NULL),
+    topic(NULL)
 {
   // need thread-local copies of all user-defined settings coming from
   // bro scripting land.  accessing these is not thread-safe and 'DoInit'
@@ -29,6 +33,14 @@
   // tag_json - thread local copy
   tag_json = BifConst::Kafka::tag_json;
 
+  // json_timestamps
+  ODesc tsfmt;
+  BifConst::Kafka::json_timestamps->Describe(&tsfmt);
+  json_timestamps.assign(
+      (const char*) tsfmt.Bytes(),
+      tsfmt.Len()
+    );
+
   // topic name - thread local copy
   topic_name.assign(
     (const char*)BifConst::Kafka::topic_name->Bytes(),
@@ -54,20 +66,53 @@
 }
 
 KafkaWriter::~KafkaWriter()
-{}
+{
+
+    // Cleanup all the things
+    delete topic;
+    delete producer;
+    delete formatter;
+    delete conf;
+    delete topic_conf;
+
+}
 
 bool KafkaWriter::DoInit(const WriterInfo& info, int num_fields, const threading::Field* const* fields)
 {
+    // Timeformat object, default to TS_EPOCH
+    threading::formatter::JSON::TimeFormat tf = threading::formatter::JSON::TS_EPOCH;
+
     // if no global 'topic_name' is defined, use the log stream's 'path'
     if(topic_name.empty()) {
         topic_name = info.path;
     }
 
+    // format timestamps
+    // NOTE: This string comparision implementation is currently the necessary
+    // way to do it, as there isn't a way to pass the Bro enum into C++ enum.
+    // This makes the user interface consistent with the existing Bro Logging
+    // configuration for the ASCII log output.
+    if ( strcmp(json_timestamps.c_str(), "JSON::TS_EPOCH") == 0 ) {
+      tf = threading::formatter::JSON::TS_EPOCH;
+    }
+    else if ( strcmp(json_timestamps.c_str(), "JSON::TS_MILLIS") == 0 ) {
+      tf = threading::formatter::JSON::TS_MILLIS;
+    }
+    else if ( strcmp(json_timestamps.c_str(), "JSON::TS_ISO8601") == 0 ) {
+      tf = threading::formatter::JSON::TS_ISO8601;
+    } 
+    else {
+      Error(Fmt("KafkaWriter::DoInit: Invalid JSON timestamp format %s",
+        json_timestamps.c_str()));
+      return false;
+    }
+
     // initialize the formatter
     if(BifConst::Kafka::tag_json) {
-      formatter = new threading::formatter::TaggedJSON(info.path, this, threading::formatter::JSON::TS_EPOCH);
-    } else {
-      formatter = new threading::formatter::JSON(this, threading::formatter::JSON::TS_EPOCH);
+      formatter = new threading::formatter::TaggedJSON(info.path, this, tf);
+    } 
+    else {
+      formatter = new threading::formatter::JSON(this, tf);
     }
 
     // is debug enabled
diff --git a/src/KafkaWriter.h b/src/KafkaWriter.h
index ad3e03f..14e0f7e 100644
--- a/src/KafkaWriter.h
+++ b/src/KafkaWriter.h
@@ -47,7 +47,7 @@
 class KafkaWriter : public WriterBackend {
 
 public:
-    KafkaWriter(WriterFrontend* frontend);
+    explicit KafkaWriter(WriterFrontend* frontend);
     ~KafkaWriter();
 
     static WriterBackend* Instantiate(WriterFrontend* frontend)
@@ -68,6 +68,7 @@
     static const string default_topic_key;
     string stream_id;
     bool tag_json;
+    string json_timestamps;
     map<string, string> kafka_conf;
     string topic_name;
     threading::formatter::Formatter *formatter;
diff --git a/src/kafka.bif b/src/kafka.bif
index 2f5a2b5..2709072 100644
--- a/src/kafka.bif
+++ b/src/kafka.bif
@@ -21,4 +21,5 @@
 const topic_name: string;
 const max_wait_on_shutdown: count;
 const tag_json: bool;
+const json_timestamps: JSON::TimestampFormat;
 const debug: string;