METRON-2069 Add btests for bro plugin topic_name selection (ottobackwards) closes apache/metron-bro-plugin-kafka#36
diff --git a/.gitignore b/.gitignore
index 735a17e..e08b80e 100644
--- a/.gitignore
+++ b/.gitignore
@@ -3,7 +3,7 @@
*.iml
*.iws
.DS_Store
-
+/cmake-build-*
.state
build
@@ -38,6 +38,3 @@
# Log files
*.log
-# pcap files
-*.pcap
-*.pcapng
diff --git a/CMakeLists.txt b/CMakeLists.txt
index 2bc8d4b..18b8a14 100644
--- a/CMakeLists.txt
+++ b/CMakeLists.txt
@@ -28,6 +28,7 @@
bro_plugin_cc(src/Plugin.cc)
bro_plugin_cc(src/TaggedJSON.cc)
bro_plugin_bif(src/kafka.bif)
+ bro_plugin_bif(src/events.bif)
bro_plugin_dist_files(README CHANGES COPYING VERSION)
bro_plugin_link_library(${LibRDKafka_LIBRARIES})
bro_plugin_link_library(${LibRDKafka_C_LIBRARIES})
diff --git a/docker/README.md b/docker/README.md
index eac2919..dbfbd89 100644
--- a/docker/README.md
+++ b/docker/README.md
@@ -295,6 +295,7 @@
###### Parameters
```bash
--skip-docker-build [OPTIONAL] Skip build of bro docker machine.
+--no-pcaps [OPTIONAL] Do not run pcaps.
--data-path [OPTIONAL] The pcap data path. Default: ./data
--kafka-topic [OPTIONAL] The kafka topic name to use. Default: bro
--plugin-version [OPTIONAL] The plugin version. Default: the current branch name
diff --git a/docker/containers/bro-localbuild-container/Dockerfile b/docker/containers/bro-localbuild-container/Dockerfile
index 6a881f5..b35724e 100644
--- a/docker/containers/bro-localbuild-container/Dockerfile
+++ b/docker/containers/bro-localbuild-container/Dockerfile
@@ -24,7 +24,7 @@
openssl-devel python-devel swig \
zlib-devel perl \
cyrus-sasl cyrus-sasl-devel cyrus-sasl-gssapi \
- git jq screen
+ git jq screen tree vim
# copy in the screen -rc
COPY .screenrc /root
diff --git a/docker/in_docker_scripts/build_bro_plugin.sh b/docker/in_docker_scripts/build_bro_plugin.sh
index 064fe91..401b2a5 100755
--- a/docker/in_docker_scripts/build_bro_plugin.sh
+++ b/docker/in_docker_scripts/build_bro_plugin.sh
@@ -82,19 +82,26 @@
cd /root || exit 1
-echo "================================"
+echo "==================================================="
-bro-pkg install code --version "${PLUGIN_VERSION}" --force
+bro-pkg -vvv install code --version "${PLUGIN_VERSION}" --force
rc=$?; if [[ ${rc} != 0 ]]; then
echo "ERROR running bro-pkg install ${rc}"
exit ${rc}
fi
-
-echo "================================"
+echo "==================================================="
+echo "ERR"
+cat /root/.zkg/testing/code/clones/code/zkg.test_command.stderr
+echo "==================================================="
+echo "OUT"
+cat /root/.zkg/testing/code/clones/code/zkg.test_command.stdout
+echo "==================================================="
+echo ""
+echo "==================================================="
echo ""
-bro -N Apache::Kafka
+bro -NN Apache::Kafka
-echo "================================"
+echo "==================================================="
echo ""
diff --git a/docker/run_end_to_end.sh b/docker/run_end_to_end.sh
index 3ec0145..9a7036f 100755
--- a/docker/run_end_to_end.sh
+++ b/docker/run_end_to_end.sh
@@ -30,6 +30,7 @@
echo " --data-path [OPTIONAL] The pcap data path. Default: ./data"
echo " --kafka-topic [OPTIONAL] The kafka topic to consume from. Default: bro"
echo " --plugin-version [OPTIONAL] The plugin version. Default: the current branch name"
+ echo " --no-pcap [OPTIONAL] Do not run pcaps."
echo " -h/--help Usage information."
echo " "
echo "COMPATABILITY"
@@ -45,7 +46,7 @@
fi
SKIP_REBUILD_BRO=false
-
+NO_PCAP=false
ROOT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" > /dev/null && pwd)"
SCRIPT_DIR="${ROOT_DIR}"/scripts
CONTAINER_DIR="${ROOT_DIR}"/containers/bro-localbuild-container
@@ -68,6 +69,15 @@
SKIP_REBUILD_BRO=true
shift # past argument
;;
+ #
+ # NO_PCAP
+ #
+ # --no-pcap
+ #
+ --no-pcap)
+ NO_PCAP=true
+ shift # past argument
+ ;;
#
# DATA_PATH
@@ -196,58 +206,58 @@
exit ${rc}
fi
-
-# for each pcap in the data directory, we want to
-# run bro then read the output from kafka
-# and output both of them to the same directory named
-# for the date/pcap
+if [[ "$NO_PCAP" = false ]]; then
+ # for each pcap in the data directory, we want to
+ # run bro then read the output from kafka
+ # and output both of them to the same directory named
+ # for the date/pcap
-for file in "${DATA_PATH}"/**/*.pcap*
-do
- # get the file name
- BASE_FILE_NAME=$(basename "${file}")
- DOCKER_DIRECTORY_NAME=${BASE_FILE_NAME//\./_}
+ for file in "${DATA_PATH}"/**/*.pcap*
+ do
+ # get the file name
+ BASE_FILE_NAME=$(basename "${file}")
+ DOCKER_DIRECTORY_NAME=${BASE_FILE_NAME//\./_}
- mkdir "${TEST_OUTPUT_PATH}/${DOCKER_DIRECTORY_NAME}" || exit 1
- echo "MADE ${TEST_OUTPUT_PATH}/${DOCKER_DIRECTORY_NAME}"
+ mkdir "${TEST_OUTPUT_PATH}/${DOCKER_DIRECTORY_NAME}" || exit 1
+ echo "MADE ${TEST_OUTPUT_PATH}/${DOCKER_DIRECTORY_NAME}"
- # get the current offset in kafka
- # this is where we are going to _start_
- OFFSET=$(bash "${SCRIPT_DIR}"/docker_run_get_offset_kafka.sh --kafka-topic="${KAFKA_TOPIC}" | sed "s/^${KAFKA_TOPIC}:0:\(.*\)$/\1/")
- echo "OFFSET------------------> ${OFFSET}"
+ # get the current offset in kafka
+ # this is where we are going to _start_
+ OFFSET=$(bash "${SCRIPT_DIR}"/docker_run_get_offset_kafka.sh --kafka-topic="${KAFKA_TOPIC}" | sed "s/^${KAFKA_TOPIC}:0:\(.*\)$/\1/")
+ echo "OFFSET------------------> ${OFFSET}"
- bash "${SCRIPT_DIR}"/docker_execute_process_data_file.sh --pcap-file-name="${BASE_FILE_NAME}" --output-directory-name="${DOCKER_DIRECTORY_NAME}"
+ bash "${SCRIPT_DIR}"/docker_execute_process_data_file.sh --pcap-file-name="${BASE_FILE_NAME}" --output-directory-name="${DOCKER_DIRECTORY_NAME}"
+ rc=$?; if [[ ${rc} != 0 ]]; then
+ echo "ERROR> FAILED TO PROCESS ${file} DATA. CHECK LOGS, please run the finish_end_to_end.sh when you are done."
+ exit ${rc}
+ fi
+
+ KAFKA_OUTPUT_FILE="${TEST_OUTPUT_PATH}/${DOCKER_DIRECTORY_NAME}/kafka-output.log"
+ bash "${SCRIPT_DIR}"/docker_run_consume_kafka.sh --offset="${OFFSET}" --kafka-topic="${KAFKA_TOPIC}" | "${ROOT_DIR}"/remove_timeout_message.sh | tee "${KAFKA_OUTPUT_FILE}"
+
+ rc=$?; if [[ ${rc} != 0 ]]; then
+ echo "ERROR> FAILED TO PROCESS ${DATA_PATH} DATA. CHECK LOGS"
+ fi
+
+ "${SCRIPT_DIR}"/split_kafka_output_by_log.sh --log-directory="${TEST_OUTPUT_PATH}/${DOCKER_DIRECTORY_NAME}"
+ rc=$?; if [[ ${rc} != 0 ]]; then
+ echo "ERROR> ISSUE ENCOUNTERED WHEN SPLITTING KAFKA OUTPUT LOGS"
+ fi
+ done
+
+ "${SCRIPT_DIR}"/print_results.sh --test-directory="${TEST_OUTPUT_PATH}"
rc=$?; if [[ ${rc} != 0 ]]; then
- echo "ERROR> FAILED TO PROCESS ${file} DATA. CHECK LOGS, please run the finish_end_to_end.sh when you are done."
+ echo "ERROR> ISSUE ENCOUNTERED WHEN PRINTING RESULTS"
exit ${rc}
fi
- KAFKA_OUTPUT_FILE="${TEST_OUTPUT_PATH}/${DOCKER_DIRECTORY_NAME}/kafka-output.log"
- bash "${SCRIPT_DIR}"/docker_run_consume_kafka.sh --offset="${OFFSET}" --kafka-topic="${KAFKA_TOPIC}" | "${ROOT_DIR}"/remove_timeout_message.sh | tee "${KAFKA_OUTPUT_FILE}"
-
+ "${SCRIPT_DIR}"/analyze_results.sh --test-directory="${TEST_OUTPUT_PATH}"
rc=$?; if [[ ${rc} != 0 ]]; then
- echo "ERROR> FAILED TO PROCESS ${DATA_PATH} DATA. CHECK LOGS"
+ echo "ERROR> ISSUE ENCOUNTERED WHEN ANALYZING RESULTS"
+ exit ${rc}
fi
-
- "${SCRIPT_DIR}"/split_kakfa_output_by_log.sh --log-directory="${TEST_OUTPUT_PATH}/${DOCKER_DIRECTORY_NAME}"
- rc=$?; if [[ ${rc} != 0 ]]; then
- echo "ERROR> ISSUE ENCOUNTERED WHEN SPLITTING KAFKA OUTPUT LOGS"
- fi
-done
-
-"${SCRIPT_DIR}"/print_results.sh --test-directory="${TEST_OUTPUT_PATH}"
-rc=$?; if [[ ${rc} != 0 ]]; then
- echo "ERROR> ISSUE ENCOUNTERED WHEN PRINTING RESULTS"
- exit ${rc}
fi
-
-"${SCRIPT_DIR}"/analyze_results.sh --test-directory="${TEST_OUTPUT_PATH}"
-rc=$?; if [[ ${rc} != 0 ]]; then
- echo "ERROR> ISSUE ENCOUNTERED WHEN ANALYZING RESULTS"
- exit ${rc}
-fi
-
echo ""
echo "Run complete"
echo "The kafka and bro output can be found at ${TEST_OUTPUT_PATH}"
diff --git a/docker/scripts/split_kakfa_output_by_log.sh b/docker/scripts/split_kafka_output_by_log.sh
similarity index 100%
rename from docker/scripts/split_kakfa_output_by_log.sh
rename to docker/scripts/split_kafka_output_by_log.sh
diff --git a/scripts/Apache/Kafka/logs-to-kafka.bro b/scripts/Apache/Kafka/logs-to-kafka.bro
index 24d88a6..5852505 100644
--- a/scripts/Apache/Kafka/logs-to-kafka.bro
+++ b/scripts/Apache/Kafka/logs-to-kafka.bro
@@ -19,6 +19,7 @@
module Kafka;
+
function send_to_kafka(id: Log::ID): bool
{
if (|logs_to_send| == 0 && send_all_active_logs == F)
@@ -51,3 +52,7 @@
}
}
}
+
+event kafka_topic_resolved_event(topic: string) {
+ print(fmt("Kafka topic set to %s",topic));
+}
\ No newline at end of file
diff --git a/scripts/init.bro b/scripts/init.bro
index 08d46cd..7e2c56c 100644
--- a/scripts/init.bro
+++ b/scripts/init.bro
@@ -55,5 +55,7 @@
## A comma separated list of librdkafka debug contexts
const debug: string = "" &redef;
+
+ const mock: bool = F &redef;
}
diff --git a/src/KafkaWriter.cc b/src/KafkaWriter.cc
index 396c861..d2287bf 100644
--- a/src/KafkaWriter.cc
+++ b/src/KafkaWriter.cc
@@ -16,6 +16,7 @@
*/
#include "KafkaWriter.h"
+#include "events.bif.h"
using namespace logging;
using namespace writer;
@@ -35,6 +36,7 @@
// tag_json - thread local copy
tag_json = BifConst::Kafka::tag_json;
+ mocking = BifConst::Kafka::mock;
// json_timestamps
ODesc tsfmt;
@@ -88,6 +90,7 @@
*/
bool KafkaWriter::DoInit(const WriterInfo& info, int num_fields, const threading::Field* const* fields)
{
+
// Timeformat object, default to TS_EPOCH
threading::formatter::JSON::TimeFormat tf = threading::formatter::JSON::TS_EPOCH;
@@ -104,6 +107,10 @@
topic_name = info.path;
}
+ if (mocking) {
+ raise_topic_resolved_event(topic_name);
+ }
+
/**
* Format the timestamps
* NOTE: This string comparision implementation is currently the necessary
@@ -171,25 +178,26 @@
}
}
- // create kafka producer
- producer = RdKafka::Producer::create(conf, err);
- if (!producer) {
- Error(Fmt("Failed to create producer: %s", err.c_str()));
- return false;
- }
+ if (!mocking) {
+ // create kafka producer
+ producer = RdKafka::Producer::create(conf, err);
+ if (!producer) {
+ Error(Fmt("Failed to create producer: %s", err.c_str()));
+ return false;
+ }
- // create handle to topic
- topic_conf = RdKafka::Conf::create(RdKafka::Conf::CONF_TOPIC);
- topic = RdKafka::Topic::create(producer, topic_name, topic_conf, err);
- if (!topic) {
- Error(Fmt("Failed to create topic handle: %s", err.c_str()));
- return false;
- }
+ // create handle to topic
+ topic_conf = RdKafka::Conf::create(RdKafka::Conf::CONF_TOPIC);
+ topic = RdKafka::Topic::create(producer, topic_name, topic_conf, err);
+ if (!topic) {
+ Error(Fmt("Failed to create topic handle: %s", err.c_str()));
+ return false;
+ }
- if(is_debug) {
- MsgThread::Info(Fmt("Successfully created producer."));
+ if (is_debug) {
+ MsgThread::Info(Fmt("Successfully created producer."));
+ }
}
-
return true;
}
@@ -206,21 +214,23 @@
int waited = 0;
int max_wait = BifConst::Kafka::max_wait_on_shutdown;
- // wait a bit for queued messages to be delivered
- while (producer->outq_len() > 0 && waited <= max_wait) {
- producer->poll(poll_interval);
- waited += poll_interval;
- }
+ if (!mocking) {
+ // wait a bit for queued messages to be delivered
+ while (producer->outq_len() > 0 && waited <= max_wait) {
+ producer->poll(poll_interval);
+ waited += poll_interval;
+ }
- // successful only if all messages delivered
- if (producer->outq_len() == 0) {
- success = true;
- } else {
- Error(Fmt("Unable to deliver %0d message(s)", producer->outq_len()));
- }
+ // successful only if all messages delivered
+ if (producer->outq_len() == 0) {
+ success = true;
+ } else {
+ Error(Fmt("Unable to deliver %0d message(s)", producer->outq_len()));
+ }
- delete topic;
- delete producer;
+ delete topic;
+ delete producer;
+ }
delete formatter;
delete conf;
delete topic_conf;
@@ -234,26 +244,26 @@
*/
bool KafkaWriter::DoWrite(int num_fields, const threading::Field* const* fields, threading::Value** vals)
{
- ODesc buff;
- buff.Clear();
+ if (!mocking) {
+ ODesc buff;
+ buff.Clear();
- // format the log entry
- formatter->Describe(&buff, num_fields, fields, vals);
+ // format the log entry
+ formatter->Describe(&buff, num_fields, fields, vals);
- // send the formatted log entry to kafka
- const char* raw = (const char*)buff.Bytes();
- RdKafka::ErrorCode resp = producer->produce(
- topic, RdKafka::Topic::PARTITION_UA, RdKafka::Producer::RK_MSG_COPY,
- const_cast<char*>(raw), strlen(raw), NULL, NULL);
+ // send the formatted log entry to kafka
+ const char *raw = (const char *) buff.Bytes();
+ RdKafka::ErrorCode resp = producer->produce(
+ topic, RdKafka::Topic::PARTITION_UA, RdKafka::Producer::RK_MSG_COPY,
+ const_cast<char *>(raw), strlen(raw), NULL, NULL);
- if (RdKafka::ERR_NO_ERROR == resp) {
- producer->poll(0);
+ if (RdKafka::ERR_NO_ERROR == resp) {
+ producer->poll(0);
+ } else {
+ string err = RdKafka::err2str(resp);
+ Error(Fmt("Kafka send failed: %s", err.c_str()));
+ }
}
- else {
- string err = RdKafka::err2str(resp);
- Error(Fmt("Kafka send failed: %s", err.c_str()));
- }
-
return true;
}
@@ -279,7 +289,9 @@
*/
bool KafkaWriter::DoFlush(double network_time)
{
- producer->poll(0);
+ if (!mocking) {
+ producer->poll(0);
+ }
return true;
}
@@ -303,6 +315,20 @@
*/
bool KafkaWriter::DoHeartbeat(double network_time, double current_time)
{
- producer->poll(0);
+ if (!mocking) {
+ producer->poll(0);
+ }
return true;
}
+
+/**
+ * Triggered when the topic is resolved from the configuration, when mocking/testing
+ * @param topic
+ */
+void KafkaWriter::raise_topic_resolved_event(const string topic) {
+ if (kafka_topic_resolved_event) {
+ val_list *vl = new val_list;
+ vl->append(new StringVal(topic.c_str()));
+ mgr.QueueEvent(kafka_topic_resolved_event, vl);
+ }
+}
diff --git a/src/KafkaWriter.h b/src/KafkaWriter.h
index c67c664..0ef0fb1 100644
--- a/src/KafkaWriter.h
+++ b/src/KafkaWriter.h
@@ -66,9 +66,11 @@
private:
string GetConfigValue(const WriterInfo& info, const string name) const;
+ void raise_topic_resolved_event(const string topic);
static const string default_topic_key;
string stream_id;
bool tag_json;
+ bool mocking;
string json_timestamps;
map<string, string> kafka_conf;
string topic_name;
diff --git a/src/events.bif b/src/events.bif
new file mode 100644
index 0000000..515fad5
--- /dev/null
+++ b/src/events.bif
@@ -0,0 +1 @@
+event kafka_topic_resolved_event%(topic: string%);
\ No newline at end of file
diff --git a/src/kafka.bif b/src/kafka.bif
index 2709072..53e3bfe 100644
--- a/src/kafka.bif
+++ b/src/kafka.bif
@@ -23,3 +23,4 @@
const tag_json: bool;
const json_timestamps: JSON::TimestampFormat;
const debug: string;
+const mock: bool;
diff --git a/tests/Baseline/kafka.resolved-topic-config/output b/tests/Baseline/kafka.resolved-topic-config/output
new file mode 100644
index 0000000..50438b7
--- /dev/null
+++ b/tests/Baseline/kafka.resolved-topic-config/output
@@ -0,0 +1 @@
+Kafka topic set to const-variable-topic
diff --git a/tests/Baseline/kafka.resolved-topic-default/output b/tests/Baseline/kafka.resolved-topic-default/output
new file mode 100644
index 0000000..1cfb642
--- /dev/null
+++ b/tests/Baseline/kafka.resolved-topic-default/output
@@ -0,0 +1 @@
+Kafka topic set to bro
diff --git a/tests/Baseline/kafka.resolved-topic-override-and-config/output b/tests/Baseline/kafka.resolved-topic-override-and-config/output
new file mode 100644
index 0000000..54bee9c
--- /dev/null
+++ b/tests/Baseline/kafka.resolved-topic-override-and-config/output
@@ -0,0 +1,2 @@
+Kafka topic set to const-variable-topic
+Kafka topic set to configuration-table-topic
diff --git a/tests/Baseline/kafka.resolved-topic-override-only/output b/tests/Baseline/kafka.resolved-topic-override-only/output
new file mode 100644
index 0000000..fc3ecea
--- /dev/null
+++ b/tests/Baseline/kafka.resolved-topic-override-only/output
@@ -0,0 +1 @@
+Kafka topic set to configuration-table-topic
diff --git a/tests/Baseline/kafka.show-plugin/output b/tests/Baseline/kafka.show-plugin/output
index e6ad77a..978febc 100644
--- a/tests/Baseline/kafka.show-plugin/output
+++ b/tests/Baseline/kafka.show-plugin/output
@@ -6,4 +6,6 @@
[Constant] Kafka::tag_json
[Constant] Kafka::json_timestamps
[Constant] Kafka::debug
+ [Constant] Kafka::mock
+ [Event] kafka_topic_resolved_event
diff --git a/tests/kafka/resolved-topic-config.bro b/tests/kafka/resolved-topic-config.bro
new file mode 100644
index 0000000..56fa093
--- /dev/null
+++ b/tests/kafka/resolved-topic-config.bro
@@ -0,0 +1,26 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+# @TEST-EXEC: bro -r ../../../tests/pcaps/exercise-traffic.pcap ../../../scripts/Apache/Kafka/ %INPUT > output
+# @TEST-EXEC: btest-diff output
+
+module Kafka;
+
+
+redef Kafka::logs_to_send = set(Conn::LOG);
+redef Kafka::topic_name = "const-variable-topic";
+redef Kafka::mock = T;
diff --git a/tests/kafka/resolved-topic-default.bro b/tests/kafka/resolved-topic-default.bro
new file mode 100644
index 0000000..ea9d217
--- /dev/null
+++ b/tests/kafka/resolved-topic-default.bro
@@ -0,0 +1,24 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+# @TEST-EXEC: bro -r ../../../tests/pcaps/exercise-traffic.pcap ../../../scripts/Apache/Kafka/ %INPUT > output
+# @TEST-EXEC: btest-diff output
+
+module Kafka;
+
+redef Kafka::logs_to_send = set(Conn::LOG);
+redef Kafka::mock = T;
diff --git a/tests/kafka/resolved-topic-override-and-config.bro b/tests/kafka/resolved-topic-override-and-config.bro
new file mode 100644
index 0000000..d75ce61
--- /dev/null
+++ b/tests/kafka/resolved-topic-override-and-config.bro
@@ -0,0 +1,37 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+# @TEST-EXEC: bro -r ../../../tests/pcaps/exercise-traffic.pcap ../../../scripts/Apache/Kafka/ %INPUT > output
+# @TEST-EXEC: btest-diff output
+
+module Kafka;
+
+
+redef Kafka::logs_to_send = set(Conn::LOG);
+redef Kafka::topic_name = "const-variable-topic";
+redef Kafka::mock = T;
+
+event bro_init() &priority=-10
+{
+ local xxx_filter: Log::Filter = [
+ $name = "kafka-xxx",
+ $writer = Log::WRITER_KAFKAWRITER,
+ $path = "kafka_xxx",
+ $config = table(["topic_name"] = "configuration-table-topic")
+ ];
+ Log::add_filter(Conn::LOG, xxx_filter);
+}
diff --git a/tests/kafka/resolved-topic-override-only.bro b/tests/kafka/resolved-topic-override-only.bro
new file mode 100644
index 0000000..35cf606
--- /dev/null
+++ b/tests/kafka/resolved-topic-override-only.bro
@@ -0,0 +1,34 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+# @TEST-EXEC: bro -r ../../../tests/pcaps/exercise-traffic.pcap ../../../scripts/Apache/Kafka/ %INPUT > output
+# @TEST-EXEC: btest-diff output
+
+module Kafka;
+
+
+redef Kafka::mock = T;
+event bro_init() &priority=-10
+{
+ local xxx_filter: Log::Filter = [
+ $name = "kafka-xxx",
+ $writer = Log::WRITER_KAFKAWRITER,
+ $path = "kafka_xxx",
+ $config = table(["topic_name"] = "configuration-table-topic")
+ ];
+ Log::add_filter(Conn::LOG, xxx_filter);
+}
diff --git a/tests/pcaps/exercise-traffic.pcap b/tests/pcaps/exercise-traffic.pcap
new file mode 100644
index 0000000..1a2bb08
--- /dev/null
+++ b/tests/pcaps/exercise-traffic.pcap
Binary files differ