MINIFICPP-1048 - Add PublishKafka docker tests

Signed-off-by: Arpad Boda <aboda@apache.org>

This closes #657
diff --git a/docker/Dockerfile b/docker/Dockerfile
index 38bcaf7..7d88477 100644
--- a/docker/Dockerfile
+++ b/docker/Dockerfile
@@ -76,7 +76,7 @@
 RUN cd ${MINIFI_BASE_DIR} \
 	&& mkdir build \
 	&& cd build \
-	&& cmake -DDISABLE_JEMALLOC=ON -DSTATIC_BUILD= -DSKIP_TESTS=true -DENABLE_JNI=ON .. \
+	&& cmake -DDISABLE_JEMALLOC=ON -DSTATIC_BUILD= -DSKIP_TESTS=true -DENABLE_JNI=ON -DENABLE_LIBRDKAFKA=ON .. \
 	&& make -j8 package \
 	&& tar -xzvf ${MINIFI_BASE_DIR}/build/nifi-minifi-cpp-${MINIFI_VERSION}-bin.tar.gz -C ${MINIFI_BASE_DIR}
 
diff --git a/docker/test/integration/minifi/__init__.py b/docker/test/integration/minifi/__init__.py
index 48f25b3..cfb2eb5 100644
--- a/docker/test/integration/minifi/__init__.py
+++ b/docker/test/integration/minifi/__init__.py
@@ -27,6 +27,9 @@
 import yaml
 from copy import copy
 
+import time
+from collections import OrderedDict
+
 
 class Cluster(object):
     """
@@ -64,7 +67,7 @@
         self.minifi_root = '/opt/minifi/nifi-minifi-cpp-' + self.minifi_version
         self.nifi_root = '/opt/nifi/nifi-' + self.nifi_version
         self.network = None
-        self.containers = []
+        self.containers = OrderedDict()
         self.images = []
         self.tmp_files = []
 
@@ -93,12 +96,22 @@
         if self.network is None:
             net_name = 'nifi-' + str(uuid.uuid4())
             logging.info('Creating network: %s', net_name)
-            self.network = self.client.networks.create(net_name)
+            # Set IP
+            ipam_pool = docker.types.IPAMPool(
+                subnet='192.168.42.0/24',
+                gateway='192.168.42.1'
+            )
+            ipam_config = docker.types.IPAMConfig(
+                pool_configs=[ipam_pool]
+            )
+            self.network = self.client.networks.create(net_name, ipam=ipam_config)
 
         if engine == 'nifi':
             self.deploy_nifi_flow(flow, name, vols)
         elif engine == 'minifi-cpp':
             self.deploy_minifi_cpp_flow(flow, name, vols)
+        elif engine == 'kafka-broker':
+            self.deploy_kafka_broker(name)
         else:
             raise Exception('invalid flow engine: \'%s\'' % engine)
 
@@ -148,7 +161,7 @@
 
         logging.info('Started container \'%s\'', container.name)
 
-        self.containers.append(container)
+        self.containers[container.name] = container
 
     def deploy_nifi_flow(self, flow, name, vols):
         dockerfile = dedent("""FROM {base_image}
@@ -198,7 +211,35 @@
 
         logging.info('Started container \'%s\'', container.name)
 
-        self.containers.append(container)
+        self.containers[container.name] = container
+
+    def deploy_kafka_broker(self, name):
+        dockerfile = dedent("""FROM {base_image}
+                USER root
+                CMD $KAFKA_HOME/bin/kafka-console-consumer.sh --bootstrap-server host.docker.internal:9092 --topic test > heaven_signal.txt
+                """.format(base_image='spotify/kafka:latest'))
+
+        logging.info('Creating and running docker container for kafka broker...')
+
+        broker = self.client.containers.run(
+                    self.client.images.pull("spotify/kafka:latest"),
+                    detach=True,
+                    name='kafka-broker',
+                    ports={'2181/tcp': 2181, '9092/tcp': 9092},
+                    environment=["ADVERTISED_HOST=192.168.42.4", "ADVERTISED_PORT=9092"]
+                    )
+        self.network.connect(broker, ipv4_address='192.168.42.4')
+
+        configured_image = self.build_image(dockerfile, [])
+        consumer = self.client.containers.run(
+                    configured_image[0],
+                    detach=True,
+                    name='kafka-consumer',
+                    network=self.network.name
+                    )
+
+        self.containers[consumer.name] = consumer
+        self.containers[broker.name] = broker
 
     def build_image(self, dockerfile, context_files):
         conf_dockerfile_buffer = BytesIO()
@@ -227,6 +268,7 @@
                                                         custom_context=True,
                                                         rm=True,
                                                         forcerm=True)
+            logging.info('Created image with id: %s', configured_image[0].id)
             self.images.append(configured_image)
 
         finally:
@@ -247,7 +289,7 @@
         """
 
         # Clean up containers
-        for container in self.containers:
+        for container in self.containers.values():
             logging.info('Cleaning up container: %s', container.name)
             container.remove(v=True, force=True)
 
@@ -466,6 +508,14 @@
         else:
             return key
 
+class PublishKafka(Processor):
+    def __init__(self):
+        super(PublishKafka, self).__init__('PublishKafka',
+                                           properties={'Client Name': 'nghiaxlee', 'Known Brokers': '192.168.42.4:9092', 'Topic Name': 'test',
+                                                       'Batch Size': '10', 'Compress Codec': 'none', 'Delivery Guarantee': '1',
+                                                       'Request Timeout': '10 sec', 'Message Timeout': '5 sec'},
+                                           auto_terminate=['success'])
+
 
 class InputPort(Connectable):
     def __init__(self, name=None, remote_process_group=None):
diff --git a/docker/test/integration/minifi/test/__init__.py b/docker/test/integration/minifi/test/__init__.py
index 67affa9..71fd096 100644
--- a/docker/test/integration/minifi/test/__init__.py
+++ b/docker/test/integration/minifi/test/__init__.py
@@ -16,6 +16,8 @@
 import logging
 import shutil
 import uuid
+import tarfile
+from io import BytesIO
 from threading import Event
 
 import os
@@ -67,6 +69,9 @@
 
         super(DockerTestCluster, self).__init__()
 
+        if isinstance(output_validator, KafkaValidator):
+            output_validator.set_containers(self.containers)
+
     def deploy_flow(self,
                     flow,
                     name=None,
@@ -89,6 +94,26 @@
                                                    name=name,
                                                    engine=engine)
 
+    def start_flow(self, name):
+        container = self.containers[name]
+        container.reload()
+        logging.info("Status before start: %s", container.status)
+        if container.status == 'exited':
+            logging.info("Start container: %s", name)
+            container.start()
+            return True
+        return False
+
+    def stop_flow(self, name):
+        container = self.containers[name]
+        container.reload()
+        logging.info("Status before stop: %s", container.status)
+        if container.status == 'running':
+            logging.info("Stop container: %s", name)
+            container.stop(timeout=0)
+            return True
+        return False
+
     def put_test_data(self, contents):
         """
         Creates a randomly-named file in the test input dir and writes
@@ -116,7 +141,7 @@
 
     def log_nifi_output(self):
 
-        for container in self.containers:
+        for container in self.containers.values():
             container = self.client.containers.get(container.id)
             logging.info('Container logs for container \'%s\':\n%s', container.name, container.logs().decode("utf-8"))
             if b'Segmentation fault' in container.logs():
@@ -141,14 +166,20 @@
             stats = container.stats(stream=False)
             logging.info('Container stats:\n%s', stats)
 
-    def check_output(self, timeout=5):
+    def check_output(self, timeout=5, **kwargs):
         """
         Wait for flow output, validate it, and log minifi output.
         """
         self.wait_for_output(timeout)
         self.log_nifi_output()
-
-        return self.output_validator.validate() and not self.segfault
+        if self.segfault:
+            return false
+        if isinstance(self.output_validator, FileOutputValidator):
+            return self.output_validator.validate(dir=kwargs.get('dir', ''))
+        return self.output_validator.validate()
+    def rm_out_child(self, dir):
+        logging.info('Removing %s from output folder', self.tmp_test_output_dir + dir)
+        shutil.rmtree(self.tmp_test_output_dir + dir)
 
     def __exit__(self, exc_type, exc_val, exc_tb):
         """
@@ -204,6 +235,9 @@
     def set_output_dir(self, output_dir):
         self.output_dir = output_dir
 
+    def validate(self, dir=''):
+        pass
+
 class SingleFileOutputValidator(FileOutputValidator):
     """
     Validates the content of a single file in the given directory.
@@ -213,22 +247,70 @@
         self.valid = False
         self.expected_content = expected_content
 
+    def validate(self, dir=''):
+
+        self.valid = False
+
+        full_dir = self.output_dir + dir
+        logging.info("Output folder: %s", full_dir)
+
+        listing = listdir(full_dir)
+
+        if listing:
+            for l in listing:
+                logging.info("name:: %s", l)
+            out_file_name = listing[0]
+
+            with open(join(full_dir, out_file_name), 'r') as out_file:
+                contents = out_file.read()
+                logging.info("dir %s -- name %s", full_dir, out_file_name)
+                logging.info("expected %s -- content %s", self.expected_content, contents)
+
+                if self.expected_content in contents:
+                    self.valid = True
+
+        return self.valid
+
+class KafkaValidator(OutputValidator):
+    """
+    Validates PublishKafka
+    """
+
+    def __init__(self, expected_content):
+        self.valid = False
+        self.expected_content = expected_content
+        self.containers = None
+
+    def set_containers(self, containers):
+        self.containers = containers
+
     def validate(self):
 
         if self.valid:
             return True
+        if self.containers is None:
+            return self.valid
 
-        listing = listdir(self.output_dir)
+        if 'kafka-consumer' not in self.containers:
+            logging.info('Not found kafka container.')
+            return False
+        else:
+            kafka_container = self.containers['kafka-consumer']
 
-        if listing:
-            out_file_name = listing[0]
+        output, stat = kafka_container.get_archive('/heaven_signal.txt')
+        file_obj = BytesIO()
+        for i in output:
+            file_obj.write(i)
+        file_obj.seek(0)
+        tar = tarfile.open(mode='r', fileobj=file_obj)
+        contents = tar.extractfile('heaven_signal.txt').read()
+        logging.info("expected %s -- content %s", self.expected_content, contents)
 
-            with open(join(self.output_dir, out_file_name), 'r') as out_file:
-                contents = out_file.read()
+        contents = contents.decode("utf-8")
+        if self.expected_content in contents:
+            self.valid = True
 
-                if contents == self.expected_content:
-                    self.valid = True
-
+        logging.info("expected %s -- content %s", self.expected_content, contents)
         return self.valid
 
 class EmptyFilesOutPutValidator(FileOutputValidator):
@@ -238,14 +320,17 @@
     def __init__(self):
         self.valid = False
 
-    def validate(self):
+    def validate(self, dir=''):
 
         if self.valid:
             return True
 
-        listing = listdir(self.output_dir)
+        full_dir = self.output_dir + dir
+        logging.info("Output folder: %s", full_dir)
+
+        listing = listdir(full_dir)
         if listing:
-            self.valid = all(os.path.getsize(os.path.join(self.output_dir,x)) == 0 for x in listing)
+            self.valid = all(os.path.getsize(os.path.join(full_dir,x)) == 0 for x in listing)
 
         return self.valid
 
@@ -256,12 +341,17 @@
     def __init__(self):
         self.valid = False
 
-    def validate(self):
+    def validate(self, dir=''):
 
         if self.valid:
             return True
 
-        self.valid = not bool(listdir(self.output_dir))
+        full_dir = self.output_dir + dir
+        logging.info("Output folder: %s", full_dir)
+
+        listing = listdir(full_dir)
+
+        self.valid = not bool(listing)
 
         return self.valid
 
diff --git a/docker/test/integration/test_rdkafka.py b/docker/test/integration/test_rdkafka.py
new file mode 100644
index 0000000..5a5c696
--- /dev/null
+++ b/docker/test/integration/test_rdkafka.py
@@ -0,0 +1,76 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the \"License\"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an \"AS IS\" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from minifi import *
+from minifi.test import *
+
+
+def test_publish_kafka():
+    """
+    Verify delivery of message to kafka broker
+    """
+    producer_flow = GetFile('/tmp/input') >> PublishKafka() >> ('success', LogAttribute())
+
+    with DockerTestCluster(KafkaValidator('test')) as cluster:
+        cluster.put_test_data('test')
+        cluster.deploy_flow(None, engine='kafka-broker')
+        cluster.deploy_flow(producer_flow, name='minifi-producer', engine='minifi-cpp')
+
+        assert cluster.check_output(10)
+
+def test_no_broker():
+    """
+    Verify failure case when broker is down
+    """
+    producer_flow = (GetFile('/tmp/input') >> PublishKafka()
+                        >> (('failure', PutFile('/tmp/output')),
+                            ('success', LogAttribute())))
+
+    with DockerTestCluster(SingleFileOutputValidator('no broker')) as cluster:
+        cluster.put_test_data('no broker')
+        cluster.deploy_flow(producer_flow, name='minifi-producer', engine='minifi-cpp')
+
+        assert cluster.check_output(30)
+
+def test_broker_on_off():
+    """
+    Verify delivery of message when broker is unstable
+    """
+    producer_flow = (GetFile('/tmp/input') >> PublishKafka()
+                     >> (('success', PutFile('/tmp/output/success')),
+                         ('failure', PutFile('/tmp/output/failure'))))
+
+    with DockerTestCluster(SingleFileOutputValidator('test')) as cluster:
+        cluster.put_test_data('test')
+        cluster.deploy_flow(None, engine='kafka-broker')
+        cluster.deploy_flow(producer_flow, name='minifi-producer', engine='minifi-cpp')
+
+        def start_kafka():
+            assert cluster.start_flow('kafka-broker')
+            assert cluster.start_flow('kafka-consumer')
+        def stop_kafka():
+            assert cluster.stop_flow('kafka-consumer')
+            assert cluster.stop_flow('kafka-broker')
+
+        assert cluster.check_output(10, dir='/success')
+        stop_kafka()
+        assert cluster.check_output(30, dir='/failure')
+        start_kafka()
+        cluster.rm_out_child('/success')
+        assert cluster.check_output(30, dir='/success')
+        stop_kafka()
+        cluster.rm_out_child('/failure')
+        assert cluster.check_output(30, dir='/failure')
+