METRON-2003 Bro plugin topic should fall back to the log writer's path (JonZeolla) closes apache/metron-bro-plugin-kafka#26
diff --git a/docker/README.md b/docker/README.md
index a965d8b..bde7b5e 100644
--- a/docker/README.md
+++ b/docker/README.md
@@ -150,14 +150,13 @@
###### Parameters
```bash
--network-name [OPTIONAL] The Docker network name. Default: bro-network
- --offset [OPTIONAL] The kafka offset. Default: -1
+ --offset [OPTIONAL] The kafka offset. Default: 0
--kafka-topic [OPTIONAL] The kafka topic to consume from. Default: bro
```
- `docker_run_get_offset_kafka.sh`: Runs an instance of the kafka container and gets the current offset for the specified topic
###### Parameters
```bash
--network-name [OPTIONAL] The Docker network name. Default: bro-network
- --offset [OPTIONAL] The kafka offset. Default: -1
--kafka-topic [OPTIONAL] The kafka topic to get the offset from. Default: bro
```
- `docker_run_create_topic_in_kafka.sh`: Runs an instance of the kafka container, creating the specified topic
diff --git a/docker/run_end_to_end.sh b/docker/run_end_to_end.sh
index 6dfd146..4c61560 100755
--- a/docker/run_end_to_end.sh
+++ b/docker/run_end_to_end.sh
@@ -173,7 +173,7 @@
exit ${rc}
fi
-# Configure it the bro plugin
+# Configure the bro plugin
bash "${SCRIPT_DIR}"/docker_execute_configure_bro_plugin.sh
rc=$?; if [[ ${rc} != 0 ]]; then
echo "ERROR> FAILED TO CONFIGURE PLUGIN. CHECK LOGS ${rc}"
diff --git a/src/KafkaWriter.cc b/src/KafkaWriter.cc
index 563ef74..396c861 100644
--- a/src/KafkaWriter.cc
+++ b/src/KafkaWriter.cc
@@ -91,12 +91,17 @@
// Timeformat object, default to TS_EPOCH
threading::formatter::JSON::TimeFormat tf = threading::formatter::JSON::TS_EPOCH;
- // Allow overriding of the kafka topic via the Bro script constant "topic_name"
+ // Allow overriding of the kafka topic via the Bro script constant 'topic_name'
// which can be applied when adding a new Bro log filter.
topic_name_override = GetConfigValue(info, "topic_name");
if(!topic_name_override.empty()) {
- topic_name = topic_name_override;
+ // Override the topic name if 'topic_name' is specified in the log
+ // filter's $conf
+ topic_name = topic_name_override;
+ } else if(topic_name.empty()) {
+ // If no global 'topic_name' is defined, use the log stream's 'path'
+ topic_name = info.path;
}
/**