METRON-2069 Add btests for bro plugin topic_name selection (ottobackwards) closes apache/metron-bro-plugin-kafka#36
diff --git a/.gitignore b/.gitignore
index 735a17e..e08b80e 100644
--- a/.gitignore
+++ b/.gitignore
@@ -3,7 +3,7 @@
 *.iml
 *.iws
 .DS_Store
-
+/cmake-build-*
 .state
 build
 
@@ -38,6 +38,3 @@
 
 # Log files
 *.log
-# pcap files
-*.pcap
-*.pcapng
diff --git a/CMakeLists.txt b/CMakeLists.txt
index 2bc8d4b..18b8a14 100644
--- a/CMakeLists.txt
+++ b/CMakeLists.txt
@@ -28,6 +28,7 @@
   bro_plugin_cc(src/Plugin.cc)
   bro_plugin_cc(src/TaggedJSON.cc)
   bro_plugin_bif(src/kafka.bif)
+  bro_plugin_bif(src/events.bif)
   bro_plugin_dist_files(README CHANGES COPYING VERSION)
   bro_plugin_link_library(${LibRDKafka_LIBRARIES})
   bro_plugin_link_library(${LibRDKafka_C_LIBRARIES})
diff --git a/docker/README.md b/docker/README.md
index eac2919..dbfbd89 100644
--- a/docker/README.md
+++ b/docker/README.md
@@ -295,6 +295,7 @@
 ###### Parameters
 ```bash
 --skip-docker-build             [OPTIONAL] Skip build of bro docker machine.
+--no-pcaps                      [OPTIONAL] Do not run pcaps.
 --data-path                     [OPTIONAL] The pcap data path. Default: ./data
 --kafka-topic                   [OPTIONAL] The kafka topic name to use. Default: bro
 --plugin-version                [OPTIONAL] The plugin version. Default: the current branch name
diff --git a/docker/containers/bro-localbuild-container/Dockerfile b/docker/containers/bro-localbuild-container/Dockerfile
index 6a881f5..b35724e 100644
--- a/docker/containers/bro-localbuild-container/Dockerfile
+++ b/docker/containers/bro-localbuild-container/Dockerfile
@@ -24,7 +24,7 @@
    openssl-devel python-devel swig \
    zlib-devel perl \
    cyrus-sasl cyrus-sasl-devel cyrus-sasl-gssapi \
-   git jq screen
+   git jq screen tree vim
 
 # copy in the screen -rc
 COPY .screenrc /root
diff --git a/docker/in_docker_scripts/build_bro_plugin.sh b/docker/in_docker_scripts/build_bro_plugin.sh
index 064fe91..401b2a5 100755
--- a/docker/in_docker_scripts/build_bro_plugin.sh
+++ b/docker/in_docker_scripts/build_bro_plugin.sh
@@ -82,19 +82,26 @@
 
 cd /root || exit 1
 
-echo "================================"
+echo "==================================================="
 
-bro-pkg install code --version "${PLUGIN_VERSION}" --force
+bro-pkg -vvv install code --version "${PLUGIN_VERSION}" --force
 rc=$?; if [[ ${rc} != 0 ]]; then
   echo "ERROR running bro-pkg install ${rc}"
   exit ${rc}
 fi
-
-echo "================================"
+echo "==================================================="
+echo "ERR"
+cat /root/.zkg/testing/code/clones/code/zkg.test_command.stderr
+echo "==================================================="
+echo "OUT"
+cat /root/.zkg/testing/code/clones/code/zkg.test_command.stdout
+echo "==================================================="
+echo ""
+echo "==================================================="
 echo ""
 
-bro -N Apache::Kafka
+bro -NN Apache::Kafka
 
-echo "================================"
+echo "==================================================="
 echo ""
 
diff --git a/docker/run_end_to_end.sh b/docker/run_end_to_end.sh
index 3ec0145..9a7036f 100755
--- a/docker/run_end_to_end.sh
+++ b/docker/run_end_to_end.sh
@@ -30,6 +30,7 @@
   echo "    --data-path                     [OPTIONAL] The pcap data path. Default: ./data"
   echo "    --kafka-topic                   [OPTIONAL] The kafka topic to consume from. Default: bro"
   echo "    --plugin-version                [OPTIONAL] The plugin version. Default: the current branch name"
+  echo "    --no-pcap                       [OPTIONAL] Do not run pcaps."
   echo "    -h/--help                       Usage information."
   echo " "
   echo "COMPATABILITY"
@@ -45,7 +46,7 @@
 fi
 
 SKIP_REBUILD_BRO=false
-
+NO_PCAP=false
 ROOT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" > /dev/null && pwd)"
 SCRIPT_DIR="${ROOT_DIR}"/scripts
 CONTAINER_DIR="${ROOT_DIR}"/containers/bro-localbuild-container
@@ -68,6 +69,15 @@
       SKIP_REBUILD_BRO=true
       shift # past argument
     ;;
+  #
+  # NO_PCAP
+  #
+  #   --no-pcap
+  #
+    --no-pcap)
+      NO_PCAP=true
+      shift # past argument
+    ;;
 
   #
   # DATA_PATH
@@ -196,58 +206,58 @@
   exit ${rc}
 fi
 
-
-# for each pcap in the data directory, we want to
-# run bro then read the output from kafka
-# and output both of them to the same directory named
-# for the date/pcap
+if [[ "$NO_PCAP" = false ]]; then
+  # for each pcap in the data directory, we want to
+  # run bro then read the output from kafka
+  # and output both of them to the same directory named
+  # for the date/pcap
 
 
-for file in "${DATA_PATH}"/**/*.pcap*
-do
-  # get the file name
-  BASE_FILE_NAME=$(basename "${file}")
-  DOCKER_DIRECTORY_NAME=${BASE_FILE_NAME//\./_}
+  for file in "${DATA_PATH}"/**/*.pcap*
+  do
+    # get the file name
+    BASE_FILE_NAME=$(basename "${file}")
+    DOCKER_DIRECTORY_NAME=${BASE_FILE_NAME//\./_}
 
-  mkdir "${TEST_OUTPUT_PATH}/${DOCKER_DIRECTORY_NAME}" || exit 1
-  echo "MADE ${TEST_OUTPUT_PATH}/${DOCKER_DIRECTORY_NAME}"
+    mkdir "${TEST_OUTPUT_PATH}/${DOCKER_DIRECTORY_NAME}" || exit 1
+    echo "MADE ${TEST_OUTPUT_PATH}/${DOCKER_DIRECTORY_NAME}"
 
-  # get the current offset in kafka
-  # this is where we are going to _start_
-  OFFSET=$(bash "${SCRIPT_DIR}"/docker_run_get_offset_kafka.sh --kafka-topic="${KAFKA_TOPIC}" | sed "s/^${KAFKA_TOPIC}:0:\(.*\)$/\1/")
-  echo "OFFSET------------------> ${OFFSET}"
+    # get the current offset in kafka
+    # this is where we are going to _start_
+    OFFSET=$(bash "${SCRIPT_DIR}"/docker_run_get_offset_kafka.sh --kafka-topic="${KAFKA_TOPIC}" | sed "s/^${KAFKA_TOPIC}:0:\(.*\)$/\1/")
+    echo "OFFSET------------------> ${OFFSET}"
 
-  bash "${SCRIPT_DIR}"/docker_execute_process_data_file.sh --pcap-file-name="${BASE_FILE_NAME}" --output-directory-name="${DOCKER_DIRECTORY_NAME}"
+    bash "${SCRIPT_DIR}"/docker_execute_process_data_file.sh --pcap-file-name="${BASE_FILE_NAME}" --output-directory-name="${DOCKER_DIRECTORY_NAME}"
+    rc=$?; if [[ ${rc} != 0 ]]; then
+      echo "ERROR> FAILED TO PROCESS ${file} DATA.  CHECK LOGS, please run the finish_end_to_end.sh when you are done."
+      exit ${rc}
+    fi
+
+    KAFKA_OUTPUT_FILE="${TEST_OUTPUT_PATH}/${DOCKER_DIRECTORY_NAME}/kafka-output.log"
+    bash "${SCRIPT_DIR}"/docker_run_consume_kafka.sh --offset="${OFFSET}" --kafka-topic="${KAFKA_TOPIC}" | "${ROOT_DIR}"/remove_timeout_message.sh | tee "${KAFKA_OUTPUT_FILE}"
+
+    rc=$?; if [[ ${rc} != 0 ]]; then
+      echo "ERROR> FAILED TO PROCESS ${DATA_PATH} DATA.  CHECK LOGS"
+    fi
+
+    "${SCRIPT_DIR}"/split_kafka_output_by_log.sh --log-directory="${TEST_OUTPUT_PATH}/${DOCKER_DIRECTORY_NAME}"
+    rc=$?; if [[ ${rc} != 0 ]]; then
+      echo "ERROR> ISSUE ENCOUNTERED WHEN SPLITTING KAFKA OUTPUT LOGS"
+    fi
+  done
+
+  "${SCRIPT_DIR}"/print_results.sh --test-directory="${TEST_OUTPUT_PATH}"
   rc=$?; if [[ ${rc} != 0 ]]; then
-    echo "ERROR> FAILED TO PROCESS ${file} DATA.  CHECK LOGS, please run the finish_end_to_end.sh when you are done."
+    echo "ERROR> ISSUE ENCOUNTERED WHEN PRINTING RESULTS"
     exit ${rc}
   fi
 
-  KAFKA_OUTPUT_FILE="${TEST_OUTPUT_PATH}/${DOCKER_DIRECTORY_NAME}/kafka-output.log"
-  bash "${SCRIPT_DIR}"/docker_run_consume_kafka.sh --offset="${OFFSET}" --kafka-topic="${KAFKA_TOPIC}" | "${ROOT_DIR}"/remove_timeout_message.sh | tee "${KAFKA_OUTPUT_FILE}"
-
+  "${SCRIPT_DIR}"/analyze_results.sh --test-directory="${TEST_OUTPUT_PATH}"
   rc=$?; if [[ ${rc} != 0 ]]; then
-    echo "ERROR> FAILED TO PROCESS ${DATA_PATH} DATA.  CHECK LOGS"
+    echo "ERROR> ISSUE ENCOUNTERED WHEN ANALYZING RESULTS"
+    exit ${rc}
   fi
-
-  "${SCRIPT_DIR}"/split_kakfa_output_by_log.sh --log-directory="${TEST_OUTPUT_PATH}/${DOCKER_DIRECTORY_NAME}"
-  rc=$?; if [[ ${rc} != 0 ]]; then
-    echo "ERROR> ISSUE ENCOUNTERED WHEN SPLITTING KAFKA OUTPUT LOGS"
-  fi
-done
-
-"${SCRIPT_DIR}"/print_results.sh --test-directory="${TEST_OUTPUT_PATH}"
-rc=$?; if [[ ${rc} != 0 ]]; then
-  echo "ERROR> ISSUE ENCOUNTERED WHEN PRINTING RESULTS"
-  exit ${rc}
 fi
-
-"${SCRIPT_DIR}"/analyze_results.sh --test-directory="${TEST_OUTPUT_PATH}"
-rc=$?; if [[ ${rc} != 0 ]]; then
-  echo "ERROR> ISSUE ENCOUNTERED WHEN ANALYZING RESULTS"
-  exit ${rc}
-fi
-
 echo ""
 echo "Run complete"
 echo "The kafka and bro output can be found at ${TEST_OUTPUT_PATH}"
diff --git a/docker/scripts/split_kakfa_output_by_log.sh b/docker/scripts/split_kafka_output_by_log.sh
similarity index 100%
rename from docker/scripts/split_kakfa_output_by_log.sh
rename to docker/scripts/split_kafka_output_by_log.sh
diff --git a/scripts/Apache/Kafka/logs-to-kafka.bro b/scripts/Apache/Kafka/logs-to-kafka.bro
index 24d88a6..5852505 100644
--- a/scripts/Apache/Kafka/logs-to-kafka.bro
+++ b/scripts/Apache/Kafka/logs-to-kafka.bro
@@ -19,6 +19,7 @@
 
 module Kafka;
 
+
 function send_to_kafka(id: Log::ID): bool
 {
         if (|logs_to_send| == 0 && send_all_active_logs == F)
@@ -51,3 +52,7 @@
                 }
         }
 }
+
+event kafka_topic_resolved_event(topic: string) {
+    print(fmt("Kafka topic set to %s",topic));
+}
\ No newline at end of file
diff --git a/scripts/init.bro b/scripts/init.bro
index 08d46cd..7e2c56c 100644
--- a/scripts/init.bro
+++ b/scripts/init.bro
@@ -55,5 +55,7 @@
 
         ## A comma separated list of librdkafka debug contexts
         const debug: string = "" &redef;
+
+        const mock: bool = F &redef;
 }
 
diff --git a/src/KafkaWriter.cc b/src/KafkaWriter.cc
index 396c861..d2287bf 100644
--- a/src/KafkaWriter.cc
+++ b/src/KafkaWriter.cc
@@ -16,6 +16,7 @@
  */
 
 #include "KafkaWriter.h"
+#include "events.bif.h"
 
 using namespace logging;
 using namespace writer;
@@ -35,6 +36,7 @@
 
   // tag_json - thread local copy
   tag_json = BifConst::Kafka::tag_json;
+  mocking = BifConst::Kafka::mock;
 
   // json_timestamps
   ODesc tsfmt;
@@ -88,6 +90,7 @@
  */
 bool KafkaWriter::DoInit(const WriterInfo& info, int num_fields, const threading::Field* const* fields)
 {
+
     // Timeformat object, default to TS_EPOCH
     threading::formatter::JSON::TimeFormat tf = threading::formatter::JSON::TS_EPOCH;
 
@@ -104,6 +107,10 @@
       topic_name = info.path;
     }
 
+    if (mocking) {
+       raise_topic_resolved_event(topic_name);
+    }
+
     /**
      * Format the timestamps
      * NOTE: This string comparision implementation is currently the necessary
@@ -171,25 +178,26 @@
         }
     }
 
-    // create kafka producer
-    producer = RdKafka::Producer::create(conf, err);
-    if (!producer) {
-        Error(Fmt("Failed to create producer: %s", err.c_str()));
-        return false;
-    }
+    if (!mocking) {
+        // create kafka producer
+        producer = RdKafka::Producer::create(conf, err);
+        if (!producer) {
+            Error(Fmt("Failed to create producer: %s", err.c_str()));
+            return false;
+        }
 
-    // create handle to topic
-    topic_conf = RdKafka::Conf::create(RdKafka::Conf::CONF_TOPIC);
-    topic = RdKafka::Topic::create(producer, topic_name, topic_conf, err);
-    if (!topic) {
-        Error(Fmt("Failed to create topic handle: %s", err.c_str()));
-        return false;
-    }
+        // create handle to topic
+        topic_conf = RdKafka::Conf::create(RdKafka::Conf::CONF_TOPIC);
+        topic = RdKafka::Topic::create(producer, topic_name, topic_conf, err);
+        if (!topic) {
+            Error(Fmt("Failed to create topic handle: %s", err.c_str()));
+            return false;
+        }
 
-    if(is_debug) {
-        MsgThread::Info(Fmt("Successfully created producer."));
+        if (is_debug) {
+            MsgThread::Info(Fmt("Successfully created producer."));
+        }
     }
-
     return true;
 }
 
@@ -206,21 +214,23 @@
     int waited = 0;
     int max_wait = BifConst::Kafka::max_wait_on_shutdown;
 
-    // wait a bit for queued messages to be delivered
-    while (producer->outq_len() > 0 && waited <= max_wait) {
-        producer->poll(poll_interval);
-        waited += poll_interval;
-    }
+    if (!mocking) {
+        // wait a bit for queued messages to be delivered
+        while (producer->outq_len() > 0 && waited <= max_wait) {
+            producer->poll(poll_interval);
+            waited += poll_interval;
+        }
 
-    // successful only if all messages delivered
-    if (producer->outq_len() == 0) {
-        success = true;
-    } else {
-        Error(Fmt("Unable to deliver %0d message(s)", producer->outq_len()));
-    }
+        // successful only if all messages delivered
+        if (producer->outq_len() == 0) {
+            success = true;
+        } else {
+            Error(Fmt("Unable to deliver %0d message(s)", producer->outq_len()));
+        }
 
-    delete topic;
-    delete producer;
+        delete topic;
+        delete producer;
+    }
     delete formatter;
     delete conf;
     delete topic_conf;
@@ -234,26 +244,26 @@
  */
 bool KafkaWriter::DoWrite(int num_fields, const threading::Field* const* fields, threading::Value** vals)
 {
-    ODesc buff;
-    buff.Clear();
+    if (!mocking) {
+        ODesc buff;
+        buff.Clear();
 
-    // format the log entry
-    formatter->Describe(&buff, num_fields, fields, vals);
+        // format the log entry
+        formatter->Describe(&buff, num_fields, fields, vals);
 
-    // send the formatted log entry to kafka
-    const char* raw = (const char*)buff.Bytes();
-    RdKafka::ErrorCode resp = producer->produce(
-        topic, RdKafka::Topic::PARTITION_UA, RdKafka::Producer::RK_MSG_COPY,
-        const_cast<char*>(raw), strlen(raw), NULL, NULL);
+        // send the formatted log entry to kafka
+        const char *raw = (const char *) buff.Bytes();
+        RdKafka::ErrorCode resp = producer->produce(
+                topic, RdKafka::Topic::PARTITION_UA, RdKafka::Producer::RK_MSG_COPY,
+                const_cast<char *>(raw), strlen(raw), NULL, NULL);
 
-    if (RdKafka::ERR_NO_ERROR == resp) {
-        producer->poll(0);
+        if (RdKafka::ERR_NO_ERROR == resp) {
+            producer->poll(0);
+        } else {
+            string err = RdKafka::err2str(resp);
+            Error(Fmt("Kafka send failed: %s", err.c_str()));
+        }
     }
-    else {
-        string err = RdKafka::err2str(resp);
-        Error(Fmt("Kafka send failed: %s", err.c_str()));
-    }
-
     return true;
 }
 
@@ -279,7 +289,9 @@
  */
 bool KafkaWriter::DoFlush(double network_time)
 {
-    producer->poll(0);
+    if (!mocking) {
+        producer->poll(0);
+    }
     return true;
 }
 
@@ -303,6 +315,20 @@
  */
 bool KafkaWriter::DoHeartbeat(double network_time, double current_time)
 {
-    producer->poll(0);
+    if (!mocking) {
+        producer->poll(0);
+    }
     return true;
 }
+
+/**
+ * Triggered when the topic is resolved from the configuration, when mocking/testing
+ * @param topic
+ */
+void KafkaWriter::raise_topic_resolved_event(const string topic) {
+    if (kafka_topic_resolved_event) {
+        val_list *vl = new val_list;
+        vl->append(new StringVal(topic.c_str()));
+        mgr.QueueEvent(kafka_topic_resolved_event, vl);
+    }
+}
diff --git a/src/KafkaWriter.h b/src/KafkaWriter.h
index c67c664..0ef0fb1 100644
--- a/src/KafkaWriter.h
+++ b/src/KafkaWriter.h
@@ -66,9 +66,11 @@
 
 private:
     string GetConfigValue(const WriterInfo& info, const string name) const;
+    void raise_topic_resolved_event(const string topic);
     static const string default_topic_key;
     string stream_id;
     bool tag_json;
+    bool mocking;
     string json_timestamps;
     map<string, string> kafka_conf;
     string topic_name;
diff --git a/src/events.bif b/src/events.bif
new file mode 100644
index 0000000..515fad5
--- /dev/null
+++ b/src/events.bif
@@ -0,0 +1 @@
+event kafka_topic_resolved_event%(topic: string%);
\ No newline at end of file
diff --git a/src/kafka.bif b/src/kafka.bif
index 2709072..53e3bfe 100644
--- a/src/kafka.bif
+++ b/src/kafka.bif
@@ -23,3 +23,4 @@
 const tag_json: bool;
 const json_timestamps: JSON::TimestampFormat;
 const debug: string;
+const mock: bool;
diff --git a/tests/Baseline/kafka.resolved-topic-config/output b/tests/Baseline/kafka.resolved-topic-config/output
new file mode 100644
index 0000000..50438b7
--- /dev/null
+++ b/tests/Baseline/kafka.resolved-topic-config/output
@@ -0,0 +1 @@
+Kafka topic set to const-variable-topic
diff --git a/tests/Baseline/kafka.resolved-topic-default/output b/tests/Baseline/kafka.resolved-topic-default/output
new file mode 100644
index 0000000..1cfb642
--- /dev/null
+++ b/tests/Baseline/kafka.resolved-topic-default/output
@@ -0,0 +1 @@
+Kafka topic set to bro
diff --git a/tests/Baseline/kafka.resolved-topic-override-and-config/output b/tests/Baseline/kafka.resolved-topic-override-and-config/output
new file mode 100644
index 0000000..54bee9c
--- /dev/null
+++ b/tests/Baseline/kafka.resolved-topic-override-and-config/output
@@ -0,0 +1,2 @@
+Kafka topic set to const-variable-topic
+Kafka topic set to configuration-table-topic
diff --git a/tests/Baseline/kafka.resolved-topic-override-only/output b/tests/Baseline/kafka.resolved-topic-override-only/output
new file mode 100644
index 0000000..fc3ecea
--- /dev/null
+++ b/tests/Baseline/kafka.resolved-topic-override-only/output
@@ -0,0 +1 @@
+Kafka topic set to configuration-table-topic
diff --git a/tests/Baseline/kafka.show-plugin/output b/tests/Baseline/kafka.show-plugin/output
index e6ad77a..978febc 100644
--- a/tests/Baseline/kafka.show-plugin/output
+++ b/tests/Baseline/kafka.show-plugin/output
@@ -6,4 +6,6 @@
     [Constant] Kafka::tag_json
     [Constant] Kafka::json_timestamps
     [Constant] Kafka::debug
+    [Constant] Kafka::mock
+    [Event] kafka_topic_resolved_event
 
diff --git a/tests/kafka/resolved-topic-config.bro b/tests/kafka/resolved-topic-config.bro
new file mode 100644
index 0000000..56fa093
--- /dev/null
+++ b/tests/kafka/resolved-topic-config.bro
@@ -0,0 +1,26 @@
+#
+#  Licensed to the Apache Software Foundation (ASF) under one or more
+#  contributor license agreements.  See the NOTICE file distributed with
+#  this work for additional information regarding copyright ownership.
+#  The ASF licenses this file to You under the Apache License, Version 2.0
+#  (the "License"); you may not use this file except in compliance with
+#  the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+#  limitations under the License.
+#
+
+# @TEST-EXEC: bro -r ../../../tests/pcaps/exercise-traffic.pcap ../../../scripts/Apache/Kafka/ %INPUT > output
+# @TEST-EXEC: btest-diff output
+
+module Kafka;
+
+
+redef Kafka::logs_to_send = set(Conn::LOG);
+redef Kafka::topic_name = "const-variable-topic";
+redef Kafka::mock = T;
diff --git a/tests/kafka/resolved-topic-default.bro b/tests/kafka/resolved-topic-default.bro
new file mode 100644
index 0000000..ea9d217
--- /dev/null
+++ b/tests/kafka/resolved-topic-default.bro
@@ -0,0 +1,24 @@
+#
+#  Licensed to the Apache Software Foundation (ASF) under one or more
+#  contributor license agreements.  See the NOTICE file distributed with
+#  this work for additional information regarding copyright ownership.
+#  The ASF licenses this file to You under the Apache License, Version 2.0
+#  (the "License"); you may not use this file except in compliance with
+#  the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+#  limitations under the License.
+#
+
+# @TEST-EXEC: bro -r ../../../tests/pcaps/exercise-traffic.pcap ../../../scripts/Apache/Kafka/ %INPUT > output
+# @TEST-EXEC: btest-diff output
+
+module Kafka;
+
+redef Kafka::logs_to_send = set(Conn::LOG);
+redef Kafka::mock = T;
diff --git a/tests/kafka/resolved-topic-override-and-config.bro b/tests/kafka/resolved-topic-override-and-config.bro
new file mode 100644
index 0000000..d75ce61
--- /dev/null
+++ b/tests/kafka/resolved-topic-override-and-config.bro
@@ -0,0 +1,37 @@
+#
+#  Licensed to the Apache Software Foundation (ASF) under one or more
+#  contributor license agreements.  See the NOTICE file distributed with
+#  this work for additional information regarding copyright ownership.
+#  The ASF licenses this file to You under the Apache License, Version 2.0
+#  (the "License"); you may not use this file except in compliance with
+#  the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+#  limitations under the License.
+#
+
+# @TEST-EXEC: bro -r ../../../tests/pcaps/exercise-traffic.pcap ../../../scripts/Apache/Kafka/ %INPUT > output
+# @TEST-EXEC: btest-diff output
+
+module Kafka;
+
+
+redef Kafka::logs_to_send = set(Conn::LOG);
+redef Kafka::topic_name = "const-variable-topic";
+redef Kafka::mock = T;
+
+event bro_init() &priority=-10
+{
+    local xxx_filter: Log::Filter = [
+        $name = "kafka-xxx",
+        $writer = Log::WRITER_KAFKAWRITER,
+        $path = "kafka_xxx",
+        $config = table(["topic_name"] = "configuration-table-topic")
+    ];
+    Log::add_filter(Conn::LOG, xxx_filter);
+}
diff --git a/tests/kafka/resolved-topic-override-only.bro b/tests/kafka/resolved-topic-override-only.bro
new file mode 100644
index 0000000..35cf606
--- /dev/null
+++ b/tests/kafka/resolved-topic-override-only.bro
@@ -0,0 +1,34 @@
+#
+#  Licensed to the Apache Software Foundation (ASF) under one or more
+#  contributor license agreements.  See the NOTICE file distributed with
+#  this work for additional information regarding copyright ownership.
+#  The ASF licenses this file to You under the Apache License, Version 2.0
+#  (the "License"); you may not use this file except in compliance with
+#  the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+#  limitations under the License.
+#
+
+# @TEST-EXEC: bro -r ../../../tests/pcaps/exercise-traffic.pcap ../../../scripts/Apache/Kafka/ %INPUT > output
+# @TEST-EXEC: btest-diff output
+
+module Kafka;
+
+
+redef Kafka::mock = T;
+event bro_init() &priority=-10
+{
+    local xxx_filter: Log::Filter = [
+        $name = "kafka-xxx",
+        $writer = Log::WRITER_KAFKAWRITER,
+        $path = "kafka_xxx",
+        $config = table(["topic_name"] = "configuration-table-topic")
+    ];
+    Log::add_filter(Conn::LOG, xxx_filter);
+}
diff --git a/tests/pcaps/exercise-traffic.pcap b/tests/pcaps/exercise-traffic.pcap
new file mode 100644
index 0000000..1a2bb08
--- /dev/null
+++ b/tests/pcaps/exercise-traffic.pcap
Binary files differ