METRON-2360 BRO-PLUGIN: does not build with 3.2.1 (ottobackwards) closes apache/metron-bro-plugin-kafka#48
diff --git a/docker/containers/zeek/Dockerfile b/docker/containers/zeek/Dockerfile
index dba31d7..d8eda31 100644
--- a/docker/containers/zeek/Dockerfile
+++ b/docker/containers/zeek/Dockerfile
@@ -57,7 +57,7 @@
# install pip3 and zkg
WORKDIR /root
COPY requirements.txt requirements.txt
-RUN dnf -y install python3-pip && \
+RUN dnf -y install python3-pip diffutils && \
dnf clean all && \
python3 -m pip install --upgrade pip && \
python3 -m pip install -r requirements.txt && \
diff --git a/docker/docker-compose.yml b/docker/docker-compose.yml
index 0579887..4f8ba10 100644
--- a/docker/docker-compose.yml
+++ b/docker/docker-compose.yml
@@ -29,7 +29,7 @@
build:
context: containers/zeek
args:
- ZEEK_VERSION: "3.1.3"
+ ZEEK_VERSION: "3.2.1"
LIBRDKAFKA_VERSION: "1.4.2"
image: metron-bro-plugin-kafka_zeek:latest
depends_on:
diff --git a/src/KafkaWriter.cc b/src/KafkaWriter.cc
index deeea95..1e19b3b 100644
--- a/src/KafkaWriter.cc
+++ b/src/KafkaWriter.cc
@@ -22,12 +22,8 @@
using namespace writer;
// The Constructor is called once for each log filter that uses this log writer.
-KafkaWriter::KafkaWriter(WriterFrontend* frontend):
- WriterBackend(frontend),
- formatter(NULL),
- producer(NULL),
- topic(NULL)
-{
+KafkaWriter::KafkaWriter(WriterFrontend *frontend)
+ : WriterBackend(frontend), formatter(NULL), producer(NULL), topic(NULL) {
/**
* We need thread-local copies of all user-defined settings coming from zeek
* scripting land. accessing these is not thread-safe and 'DoInit' is
@@ -41,177 +37,170 @@
// json_timestamps
ODesc tsfmt;
BifConst::Kafka::json_timestamps->Describe(&tsfmt);
- json_timestamps.assign(
- (const char*) tsfmt.Bytes(),
- tsfmt.Len()
- );
+ json_timestamps.assign((const char *)tsfmt.Bytes(), tsfmt.Len());
// topic name - thread local copy
- topic_name.assign(
- (const char*)BifConst::Kafka::topic_name->Bytes(),
- BifConst::Kafka::topic_name->Len());
+ topic_name.assign((const char *)BifConst::Kafka::topic_name->Bytes(),
+ BifConst::Kafka::topic_name->Len());
// kafka_conf - thread local copy
- Val* val = BifConst::Kafka::kafka_conf->AsTableVal();
- IterCookie* c = val->AsTable()->InitForIteration();
- HashKey* k;
- TableEntryVal* v;
+ Val *val = BifConst::Kafka::kafka_conf->AsTableVal();
+ IterCookie *c = val->AsTable()->InitForIteration();
+ HashKey *k;
+ TableEntryVal *v;
while ((v = val->AsTable()->NextEntry(k, c))) {
-
- // fetch the key and value
- ListVal* index = val->AsTableVal()->RecoverIndex(k);
- string key = index->Index(0)->AsString()->CheckString();
- string val = v->Value()->AsString()->CheckString();
- kafka_conf.insert (kafka_conf.begin(), pair<string, string> (key, val));
-
- // cleanup
- Unref(index);
- delete k;
- }
-
- Val* mvals = BifConst::Kafka::additional_message_values->AsTableVal();
- c = val->AsTable()->InitForIteration();
- while ((v = mvals->AsTable()->NextEntry(k, c))) {
-
// fetch the key and value
- ListVal* index = mvals->AsTableVal()->RecoverIndex(k);
- string key = index->Index(0)->AsString()->CheckString();
- string val = v->Value()->AsString()->CheckString();
- additional_message_values.insert (additional_message_values.begin(), pair<string, string> (key, val));
+ ListVal *index = val->AsTableVal()->RecoverIndex(k);
+ std::string key = index->Index(0)->AsString()->CheckString();
+ std::string val = v->Value()->AsString()->CheckString();
+ kafka_conf.insert(kafka_conf.begin(),
+ std::pair<std::string, std::string>(key, val));
// cleanup
Unref(index);
delete k;
}
+ Val *mvals = BifConst::Kafka::additional_message_values->AsTableVal();
+ c = val->AsTable()->InitForIteration();
+ while ((v = mvals->AsTable()->NextEntry(k, c))) {
+ ListVal *index = mvals->AsTableVal()->RecoverIndex(k);
+ std::string key = index->Index(0)->AsString()->CheckString();
+ std::string val = v->Value()->AsString()->CheckString();
+ additional_message_values.insert(additional_message_values.begin(),
+ std::pair<std::string, std::string>(key, val));
+ Unref(index);
+ delete k;
+ }
}
-KafkaWriter::~KafkaWriter()
-{
+KafkaWriter::~KafkaWriter() {
// Cleanup must happen in DoFinish, not in the destructor
}
-string KafkaWriter::GetConfigValue(const WriterInfo& info, const string name) const
-{
- map<const char*, const char*>::const_iterator it = info.config.find(name.c_str());
- if (it == info.config.end())
- return string();
- else
- return it->second;
+std::string KafkaWriter::GetConfigValue(const WriterInfo &info,
+ const std::string name) const {
+ std::map<const char *, const char *>::const_iterator it =
+ info.config.find(name.c_str());
+ if (it == info.config.end())
+ return std::string();
+ else
+ return it->second;
}
/**
* DoInit is called once for each call to the constructor, but in a separate
* thread
*/
-bool KafkaWriter::DoInit(const WriterInfo& info, int num_fields, const threading::Field* const* fields)
-{
+bool KafkaWriter::DoInit(const WriterInfo &info, int num_fields,
+ const threading::Field *const *fields) {
+ // TimeFormat object, default to TS_EPOCH
+ threading::formatter::JSON::TimeFormat tf =
+ threading::formatter::JSON::TS_EPOCH;
- // Timeformat object, default to TS_EPOCH
- threading::formatter::JSON::TimeFormat tf = threading::formatter::JSON::TS_EPOCH;
+ // Allow overriding of the kafka topic via the Zeek script constant
+ // 'topic_name' which can be applied when adding a new Zeek log filter.
+ topic_name_override = GetConfigValue(info, "topic_name");
- // Allow overriding of the kafka topic via the Zeek script constant
- // 'topic_name' which can be applied when adding a new Zeek log filter.
- topic_name_override = GetConfigValue(info, "topic_name");
+ if (!topic_name_override.empty()) {
+ // Override the topic name if 'topic_name' is specified in the log
+ // filter's $conf
+ topic_name = topic_name_override;
+ } else if (topic_name.empty()) {
+ // If no global 'topic_name' is defined, use the log stream's 'path'
+ topic_name = info.path;
+ }
- if(!topic_name_override.empty()) {
- // Override the topic name if 'topic_name' is specified in the log
- // filter's $conf
- topic_name = topic_name_override;
- } else if(topic_name.empty()) {
- // If no global 'topic_name' is defined, use the log stream's 'path'
- topic_name = info.path;
- }
+ if (mocking) {
+ raise_topic_resolved_event(topic_name);
+ }
- if (mocking) {
- raise_topic_resolved_event(topic_name);
- }
+ /**
+ * Format the timestamps
+ * NOTE: This string comparision implementation is currently the necessary
+ * way to do it, as there isn't a way to pass the Zeek enum into C++ enum.
+ * This makes the user interface consistent with the existing Zeek Logging
+ * configuration for the ASCII log output.
+ */
+ if (strcmp(json_timestamps.c_str(), "JSON::TS_EPOCH") == 0) {
+ tf = threading::formatter::JSON::TS_EPOCH;
+ } else if (strcmp(json_timestamps.c_str(), "JSON::TS_MILLIS") == 0) {
+ tf = threading::formatter::JSON::TS_MILLIS;
+ } else if (strcmp(json_timestamps.c_str(), "JSON::TS_ISO8601") == 0) {
+ tf = threading::formatter::JSON::TS_ISO8601;
+ } else {
+ Error(Fmt("KafkaWriter::DoInit: Invalid JSON timestamp format %s",
+ json_timestamps.c_str()));
+ return false;
+ }
- /**
- * Format the timestamps
- * NOTE: This string comparision implementation is currently the necessary
- * way to do it, as there isn't a way to pass the Zeek enum into C++ enum.
- * This makes the user interface consistent with the existing Zeek Logging
- * configuration for the ASCII log output.
- */
- if ( strcmp(json_timestamps.c_str(), "JSON::TS_EPOCH") == 0 ) {
- tf = threading::formatter::JSON::TS_EPOCH;
+ // initialize the formatter
+ if (BifConst::Kafka::tag_json) {
+ formatter = new threading::formatter::TaggedJSON(info.path, this, tf);
+ } else {
+ formatter = new threading::formatter::JSON(this, tf);
+ }
+
+ // is debug enabled
+ std::string debug;
+ debug.assign((const char *)BifConst::Kafka::debug->Bytes(),
+ BifConst::Kafka::debug->Len());
+ bool is_debug(!debug.empty());
+ if (is_debug) {
+ MsgThread::Info(
+ Fmt("Debug is turned on and set to: %s. Available debug context: %s.",
+ debug.c_str(), RdKafka::get_debug_contexts().c_str()));
+ }
+
+ // kafka global configuration
+ std::string err;
+ conf = RdKafka::Conf::create(RdKafka::Conf::CONF_GLOBAL);
+
+ // apply the user-defined settings to kafka
+ std::map<std::string, std::string>::iterator i;
+ for (i = kafka_conf.begin(); i != kafka_conf.end(); ++i) {
+ std::string key = i->first;
+ std::string val = i->second;
+
+ // apply setting to kafka
+ if (RdKafka::Conf::CONF_OK != conf->set(key, val, err)) {
+ Error(Fmt("Failed to set '%s'='%s': %s", key.c_str(), val.c_str(),
+ err.c_str()));
+ return false;
}
- else if ( strcmp(json_timestamps.c_str(), "JSON::TS_MILLIS") == 0 ) {
- tf = threading::formatter::JSON::TS_MILLIS;
+ }
+
+ if (is_debug) {
+ std::string key("debug");
+ std::string val(debug);
+ if (RdKafka::Conf::CONF_OK != conf->set(key, val, err)) {
+ Error(Fmt("Failed to set '%s'='%s': %s", key.c_str(), val.c_str(),
+ err.c_str()));
+ return false;
}
- else if ( strcmp(json_timestamps.c_str(), "JSON::TS_ISO8601") == 0 ) {
- tf = threading::formatter::JSON::TS_ISO8601;
- }
- else {
- Error(Fmt("KafkaWriter::DoInit: Invalid JSON timestamp format %s",
- json_timestamps.c_str()));
+ }
+
+ if (!mocking) {
+ // create kafka producer
+ producer = RdKafka::Producer::create(conf, err);
+ if (!producer) {
+ Error(Fmt("Failed to create producer: %s", err.c_str()));
return false;
}
- // initialize the formatter
- if(BifConst::Kafka::tag_json) {
- formatter = new threading::formatter::TaggedJSON(info.path, this, tf);
- }
- else {
- formatter = new threading::formatter::JSON(this, tf);
+ // create handle to topic
+ topic_conf = RdKafka::Conf::create(RdKafka::Conf::CONF_TOPIC);
+ topic = RdKafka::Topic::create(producer, topic_name, topic_conf, err);
+ if (!topic) {
+ Error(Fmt("Failed to create topic handle: %s", err.c_str()));
+ return false;
}
- // is debug enabled
- string debug;
- debug.assign((const char*)BifConst::Kafka::debug->Bytes(), BifConst::Kafka::debug->Len());
- bool is_debug(!debug.empty());
- if(is_debug) {
- MsgThread::Info(Fmt("Debug is turned on and set to: %s. Available debug context: %s.", debug.c_str(), RdKafka::get_debug_contexts().c_str()));
+ if (is_debug) {
+ MsgThread::Info(Fmt("Successfully created producer."));
}
-
- // kafka global configuration
- string err;
- conf = RdKafka::Conf::create(RdKafka::Conf::CONF_GLOBAL);
-
- // apply the user-defined settings to kafka
- map<string,string>::iterator i;
- for (i = kafka_conf.begin(); i != kafka_conf.end(); ++i) {
- string key = i->first;
- string val = i->second;
-
- // apply setting to kafka
- if (RdKafka::Conf::CONF_OK != conf->set(key, val, err)) {
- Error(Fmt("Failed to set '%s'='%s': %s", key.c_str(), val.c_str(), err.c_str()));
- return false;
- }
- }
-
- if(is_debug) {
- string key("debug");
- string val(debug);
- if (RdKafka::Conf::CONF_OK != conf->set(key, val, err)) {
- Error(Fmt("Failed to set '%s'='%s': %s", key.c_str(), val.c_str(), err.c_str()));
- return false;
- }
- }
-
- if (!mocking) {
- // create kafka producer
- producer = RdKafka::Producer::create(conf, err);
- if (!producer) {
- Error(Fmt("Failed to create producer: %s", err.c_str()));
- return false;
- }
-
- // create handle to topic
- topic_conf = RdKafka::Conf::create(RdKafka::Conf::CONF_TOPIC);
- topic = RdKafka::Topic::create(producer, topic_name, topic_conf, err);
- if (!topic) {
- Error(Fmt("Failed to create topic handle: %s", err.c_str()));
- return false;
- }
-
- if (is_debug) {
- MsgThread::Info(Fmt("Successfully created producer."));
- }
- }
- return true;
+ }
+ return true;
}
/**
@@ -220,69 +209,68 @@
* the thread can be safely terminated. As such, all resources created must be
* removed here.
*/
-bool KafkaWriter::DoFinish(double network_time)
-{
- bool success = false;
- int poll_interval = 1000;
- int waited = 0;
- int max_wait = BifConst::Kafka::max_wait_on_shutdown;
+bool KafkaWriter::DoFinish(double network_time) {
+ bool success = false;
+ int poll_interval = 1000;
+ int waited = 0;
+ int max_wait = BifConst::Kafka::max_wait_on_shutdown;
- if (!mocking) {
- // wait a bit for queued messages to be delivered
- while (producer->outq_len() > 0 && waited <= max_wait) {
- producer->poll(poll_interval);
- waited += poll_interval;
- }
-
- // successful only if all messages delivered
- if (producer->outq_len() == 0) {
- success = true;
- } else {
- Error(Fmt("Unable to deliver %0d message(s)", producer->outq_len()));
- }
-
- delete topic;
- delete producer;
- delete topic_conf;
+ if (!mocking) {
+ // wait a bit for queued messages to be delivered
+ while (producer->outq_len() > 0 && waited <= max_wait) {
+ producer->poll(poll_interval);
+ waited += poll_interval;
}
- delete formatter;
- delete conf;
- return success;
+ // successful only if all messages delivered
+ if (producer->outq_len() == 0) {
+ success = true;
+ } else {
+ Error(Fmt("Unable to deliver %0d message(s)", producer->outq_len()));
+ }
+
+ delete topic;
+ delete producer;
+ delete topic_conf;
+ }
+ delete formatter;
+ delete conf;
+
+ return success;
}
/**
* Writer-specific output method implementing recording of one log
* entry.
*/
-bool KafkaWriter::DoWrite(int num_fields, const threading::Field* const* fields, threading::Value** vals)
-{
- if (!mocking) {
- ODesc buff;
- buff.Clear();
+bool KafkaWriter::DoWrite(int num_fields, const threading::Field *const *fields,
+ threading::Value **vals) {
+ if (!mocking) {
+ ODesc buff;
+ buff.Clear();
- // format the log entry
- if(BifConst::Kafka::tag_json) {
- dynamic_cast<threading::formatter::TaggedJSON*>(formatter)->Describe(&buff, num_fields, fields, vals,
- additional_message_values);
- } else {
- formatter->Describe(&buff, num_fields, fields, vals);
- }
-
- // send the formatted log entry to kafka
- const char *raw = (const char *) buff.Bytes();
- RdKafka::ErrorCode resp = producer->produce(
- topic, RdKafka::Topic::PARTITION_UA, RdKafka::Producer::RK_MSG_COPY,
- const_cast<char *>(raw), strlen(raw), NULL, NULL);
-
- if (RdKafka::ERR_NO_ERROR == resp) {
- producer->poll(0);
- } else {
- string err = RdKafka::err2str(resp);
- Error(Fmt("Kafka send failed: %s", err.c_str()));
- }
+ // format the log entry
+ if (BifConst::Kafka::tag_json) {
+ dynamic_cast<threading::formatter::TaggedJSON *>(formatter)->Describe(
+ &buff, num_fields, fields, vals, additional_message_values);
+ } else {
+ formatter->Describe(&buff, num_fields, fields, vals);
}
- return true;
+
+ // send the formatted log entry to kafka
+ const char *raw = (const char *)buff.Bytes();
+ RdKafka::ErrorCode resp = producer->produce(
+ topic, RdKafka::Topic::PARTITION_UA, RdKafka::Producer::RK_MSG_COPY,
+ const_cast<char *>(raw), strlen(raw), NULL, NULL);
+
+ if (RdKafka::ERR_NO_ERROR == resp) {
+ producer->poll(0);
+ } else {
+ std::string err = RdKafka::err2str(resp);
+ Error(Fmt("Kafka send failed: %s", err.c_str()));
+ }
+ }
+ return true;
}
/**
@@ -294,10 +282,9 @@
* optimized for performance. The current buffering state can be
* queried via IsBuf().
*/
-bool KafkaWriter::DoSetBuf(bool enabled)
-{
- // no change in behavior
- return true;
+bool KafkaWriter::DoSetBuf(bool enabled) {
+ // no change in behavior
+ return true;
}
/**
@@ -305,12 +292,11 @@
* implementation must override this method but it can just
* ignore calls if flushing doesn't align with its semantics.
*/
-bool KafkaWriter::DoFlush(double network_time)
-{
- if (!mocking) {
- producer->flush(0);
- }
- return true;
+bool KafkaWriter::DoFlush(double network_time) {
+ if (!mocking) {
+ producer->flush(0);
+ }
+ return true;
}
/**
@@ -322,31 +308,31 @@
* FinishedRotation() to signal the log manager that potential
* postprocessors can now run.
*/
-bool KafkaWriter::DoRotate(const char* rotated_path, double open, double close, bool terminating)
-{
- // no need to perform log rotation
- return FinishedRotation();
+bool KafkaWriter::DoRotate(const char *rotated_path, double open, double close,
+ bool terminating) {
+ // no need to perform log rotation
+ return FinishedRotation();
}
/**
* Triggered by regular heartbeat messages from the main thread.
*/
-bool KafkaWriter::DoHeartbeat(double network_time, double current_time)
-{
- if (!mocking) {
- producer->poll(0);
- }
- return true;
+bool KafkaWriter::DoHeartbeat(double network_time, double current_time) {
+ if (!mocking) {
+ producer->poll(0);
+ }
+ return true;
}
/**
- * Triggered when the topic is resolved from the configuration, when mocking/testing
+ * Triggered when the topic is resolved from the configuration, when
+ * mocking/testing
* @param topic
*/
-void KafkaWriter::raise_topic_resolved_event(const string topic) {
- if (kafka_topic_resolved_event) {
- val_list *vl = new val_list;
- vl->append(new StringVal(topic.c_str()));
- mgr.QueueEvent(kafka_topic_resolved_event, vl);
- }
+void KafkaWriter::raise_topic_resolved_event(const std::string topic) {
+ if (kafka_topic_resolved_event) {
+ val_list *vl = new val_list;
+ vl->append(new StringVal(topic.c_str()));
+ mgr.QueueEvent(kafka_topic_resolved_event, vl);
+ }
}
diff --git a/src/KafkaWriter.h b/src/KafkaWriter.h
index ad29ac7..ad5fc09 100644
--- a/src/KafkaWriter.h
+++ b/src/KafkaWriter.h
@@ -19,6 +19,7 @@
#define ZEEK_PLUGIN_BRO_KAFKA_KAFKAWRITER_H
#include <librdkafka/rdkafkacpp.h>
+#include <map>
#include <string>
#include <Desc.h>
#include <logging/WriterBackend.h>
@@ -65,17 +66,17 @@
virtual bool DoHeartbeat(double network_time, double current_time);
private:
- string GetConfigValue(const WriterInfo& info, const string name) const;
- void raise_topic_resolved_event(const string topic);
- static const string default_topic_key;
- string stream_id;
+ std::string GetConfigValue(const WriterInfo& info, const std::string name) const;
+ void raise_topic_resolved_event(const std::string topic);
+ static const std::string default_topic_key;
+ std::string stream_id;
bool tag_json;
bool mocking;
- string json_timestamps;
- map<string, string> kafka_conf;
- map<string, string> additional_message_values;
- string topic_name;
- string topic_name_override;
+ std::string json_timestamps;
+ std::map<std::string, std::string> kafka_conf;
+ std::map<std::string, std::string> additional_message_values;
+ std::string topic_name;
+ std::string topic_name_override;
threading::formatter::Formatter *formatter;
RdKafka::Producer* producer;
RdKafka::Topic* topic;
diff --git a/src/TaggedJSON.cc b/src/TaggedJSON.cc
index f182d95..071dc30 100644
--- a/src/TaggedJSON.cc
+++ b/src/TaggedJSON.cc
@@ -19,13 +19,13 @@
namespace threading { namespace formatter {
-TaggedJSON::TaggedJSON(string sn, MsgThread* t, JSON::TimeFormat tf): JSON(t, tf), stream_name(sn)
+TaggedJSON::TaggedJSON(std::string sn, MsgThread* t, JSON::TimeFormat tf): JSON(t, tf), stream_name(sn)
{}
TaggedJSON::~TaggedJSON()
{}
-bool TaggedJSON::Describe(ODesc* desc, int num_fields, const Field* const* fields, Value** vals, map<string,string> &const_vals) const
+bool TaggedJSON::Describe(ODesc* desc, int num_fields, const Field* const* fields, Value** vals, std::map<std::string,std::string> &const_vals) const
{
desc->AddRaw("{");
@@ -40,7 +40,7 @@
JSON::Describe(desc, num_fields, fields, vals);
if (const_vals.size() > 0) {
- map<string, string>::iterator it = const_vals.begin();
+ std::map<std::string, std::string>::iterator it = const_vals.begin();
while (it != const_vals.end()) {
desc->AddRaw(",");
desc->AddRaw("\"");
@@ -56,4 +56,5 @@
desc->AddRaw("}");
return true;
}
-}}
+} // namespace formatter
+} // namespace threading
diff --git a/src/TaggedJSON.h b/src/TaggedJSON.h
index 9135bf2..f8d3005 100644
--- a/src/TaggedJSON.h
+++ b/src/TaggedJSON.h
@@ -18,17 +18,19 @@
#ifndef ZEEK_PLUGIN_BRO_KAFKA_TAGGEDJSON_H
#define ZEEK_PLUGIN_BRO_KAFKA_TAGGEDJSON_H
-#include <string>
#include <Desc.h>
+#include <map>
+#include <string>
#include <threading/Formatter.h>
#include <threading/formatters/JSON.h>
-using threading::formatter::JSON;
+using threading::Field;
using threading::MsgThread;
using threading::Value;
-using threading::Field;
+using threading::formatter::JSON;
-namespace threading { namespace formatter {
+namespace threading {
+namespace formatter {
/*
* A JSON formatter that prepends or 'tags' the content with a log stream
@@ -37,15 +39,16 @@
* { 'http' : { ... }}
*/
class TaggedJSON : public JSON {
-
public:
- TaggedJSON(string stream_name, MsgThread* t, JSON::TimeFormat tf);
- virtual ~TaggedJSON();
- virtual bool Describe(ODesc* desc, int num_fields, const Field* const* fields, Value** vals, map<string,string> &const_vals) const;
+ TaggedJSON(std::string stream_name, MsgThread *t, JSON::TimeFormat tf);
+ virtual ~TaggedJSON();
+ virtual bool Describe(ODesc *desc, int num_fields, const Field *const *fields,
+ Value **vals, std::map<std::string, std::string> &const_vals) const;
private:
- string stream_name;
+ std::string stream_name;
};
-}}
+} // namespace formatter
+} // namespace threading
#endif