METRON-1469 Kafka Plugin for Bro - Configurable JSON Timestamps (dcode via nickwallen) closes apache/metron-bro-plugin-kafka#6
diff --git a/README.md b/README.md
index 0c67347..8aff3ba 100644
--- a/README.md
+++ b/README.md
@@ -37,11 +37,10 @@
### Example 1
-The goal in this example is to send all HTTP and DNS records to a Kafka topic named `bro`.
+The goal in this example is to send all HTTP and DNS records to a Kafka topic named `bro`.
* Any configuration value accepted by librdkafka can be added to the `kafka_conf` configuration table.
* By defining `topic_name` all records will be sent to the same Kafka topic.
- * Defining `logs_to_send` will ensure that only HTTP and DNS records are sent.
-
+ * Defining `logs_to_send` will ensure that only HTTP and DNS records are sent.
```
@load packages/metron-bro-plugin-kafka/Apache/Kafka
redef Kafka::logs_to_send = set(HTTP::LOG, DNS::LOG);
@@ -183,6 +182,15 @@
redef Kafka::tag_json = T;
```
+### `json_timestamps`
+
+Uses Ascii log writer for timestamp format. Default is `JSON::TS_EPOCH`. Other
+options are `JSON::TS_MILLIS` and `JSON::TS_ISO8601`.
+
+```
+redef Kafka::json_timestamps = JSON::TS_ISO8601;
+```
+
### `debug`
A comma separated list of debug contexts in librdkafka which you want to
diff --git a/scripts/init.bro b/scripts/init.bro
index 65fb9e7..ad9f0a1 100644
--- a/scripts/init.bro
+++ b/scripts/init.bro
@@ -22,6 +22,7 @@
const topic_name: string = "bro" &redef;
const max_wait_on_shutdown: count = 3000 &redef;
const tag_json: bool = F &redef;
+ const json_timestamps: JSON::TimestampFormat = JSON::TS_EPOCH &redef;
const kafka_conf: table[string] of string = table(
["metadata.broker.list"] = "localhost:9092"
) &redef;
diff --git a/src/KafkaWriter.cc b/src/KafkaWriter.cc
index c9ad44f..79b5aa0 100644
--- a/src/KafkaWriter.cc
+++ b/src/KafkaWriter.cc
@@ -20,7 +20,11 @@
using namespace logging;
using namespace writer;
-KafkaWriter::KafkaWriter(WriterFrontend* frontend): WriterBackend(frontend), formatter(NULL), producer(NULL), topic(NULL)
+KafkaWriter::KafkaWriter(WriterFrontend* frontend):
+ WriterBackend(frontend),
+ formatter(NULL),
+ producer(NULL),
+ topic(NULL)
{
// need thread-local copies of all user-defined settings coming from
// bro scripting land. accessing these is not thread-safe and 'DoInit'
@@ -29,6 +33,14 @@
// tag_json - thread local copy
tag_json = BifConst::Kafka::tag_json;
+ // json_timestamps
+ ODesc tsfmt;
+ BifConst::Kafka::json_timestamps->Describe(&tsfmt);
+ json_timestamps.assign(
+ (const char*) tsfmt.Bytes(),
+ tsfmt.Len()
+ );
+
// topic name - thread local copy
topic_name.assign(
(const char*)BifConst::Kafka::topic_name->Bytes(),
@@ -54,20 +66,53 @@
}
KafkaWriter::~KafkaWriter()
-{}
+{
+
+ // Cleanup all the things
+ delete topic;
+ delete producer;
+ delete formatter;
+ delete conf;
+ delete topic_conf;
+
+}
bool KafkaWriter::DoInit(const WriterInfo& info, int num_fields, const threading::Field* const* fields)
{
+ // Timeformat object, default to TS_EPOCH
+ threading::formatter::JSON::TimeFormat tf = threading::formatter::JSON::TS_EPOCH;
+
// if no global 'topic_name' is defined, use the log stream's 'path'
if(topic_name.empty()) {
topic_name = info.path;
}
+ // format timestamps
+ // NOTE: This string comparision implementation is currently the necessary
+ // way to do it, as there isn't a way to pass the Bro enum into C++ enum.
+ // This makes the user interface consistent with the existing Bro Logging
+ // configuration for the ASCII log output.
+ if ( strcmp(json_timestamps.c_str(), "JSON::TS_EPOCH") == 0 ) {
+ tf = threading::formatter::JSON::TS_EPOCH;
+ }
+ else if ( strcmp(json_timestamps.c_str(), "JSON::TS_MILLIS") == 0 ) {
+ tf = threading::formatter::JSON::TS_MILLIS;
+ }
+ else if ( strcmp(json_timestamps.c_str(), "JSON::TS_ISO8601") == 0 ) {
+ tf = threading::formatter::JSON::TS_ISO8601;
+ }
+ else {
+ Error(Fmt("KafkaWriter::DoInit: Invalid JSON timestamp format %s",
+ json_timestamps.c_str()));
+ return false;
+ }
+
// initialize the formatter
if(BifConst::Kafka::tag_json) {
- formatter = new threading::formatter::TaggedJSON(info.path, this, threading::formatter::JSON::TS_EPOCH);
- } else {
- formatter = new threading::formatter::JSON(this, threading::formatter::JSON::TS_EPOCH);
+ formatter = new threading::formatter::TaggedJSON(info.path, this, tf);
+ }
+ else {
+ formatter = new threading::formatter::JSON(this, tf);
}
// is debug enabled
diff --git a/src/KafkaWriter.h b/src/KafkaWriter.h
index ad3e03f..14e0f7e 100644
--- a/src/KafkaWriter.h
+++ b/src/KafkaWriter.h
@@ -47,7 +47,7 @@
class KafkaWriter : public WriterBackend {
public:
- KafkaWriter(WriterFrontend* frontend);
+ explicit KafkaWriter(WriterFrontend* frontend);
~KafkaWriter();
static WriterBackend* Instantiate(WriterFrontend* frontend)
@@ -68,6 +68,7 @@
static const string default_topic_key;
string stream_id;
bool tag_json;
+ string json_timestamps;
map<string, string> kafka_conf;
string topic_name;
threading::formatter::Formatter *formatter;
diff --git a/src/kafka.bif b/src/kafka.bif
index 2f5a2b5..2709072 100644
--- a/src/kafka.bif
+++ b/src/kafka.bif
@@ -21,4 +21,5 @@
const topic_name: string;
const max_wait_on_shutdown: count;
const tag_json: bool;
+const json_timestamps: JSON::TimestampFormat;
const debug: string;