MINIFICPP-1048 - Add PublishKafka docker tests
Signed-off-by: Arpad Boda <aboda@apache.org>
This closes #657
diff --git a/docker/Dockerfile b/docker/Dockerfile
index 38bcaf7..7d88477 100644
--- a/docker/Dockerfile
+++ b/docker/Dockerfile
@@ -76,7 +76,7 @@
RUN cd ${MINIFI_BASE_DIR} \
&& mkdir build \
&& cd build \
- && cmake -DDISABLE_JEMALLOC=ON -DSTATIC_BUILD= -DSKIP_TESTS=true -DENABLE_JNI=ON .. \
+ && cmake -DDISABLE_JEMALLOC=ON -DSTATIC_BUILD= -DSKIP_TESTS=true -DENABLE_JNI=ON -DENABLE_LIBRDKAFKA=ON .. \
&& make -j8 package \
&& tar -xzvf ${MINIFI_BASE_DIR}/build/nifi-minifi-cpp-${MINIFI_VERSION}-bin.tar.gz -C ${MINIFI_BASE_DIR}
diff --git a/docker/test/integration/minifi/__init__.py b/docker/test/integration/minifi/__init__.py
index 48f25b3..cfb2eb5 100644
--- a/docker/test/integration/minifi/__init__.py
+++ b/docker/test/integration/minifi/__init__.py
@@ -27,6 +27,9 @@
import yaml
from copy import copy
+import time
+from collections import OrderedDict
+
class Cluster(object):
"""
@@ -64,7 +67,7 @@
self.minifi_root = '/opt/minifi/nifi-minifi-cpp-' + self.minifi_version
self.nifi_root = '/opt/nifi/nifi-' + self.nifi_version
self.network = None
- self.containers = []
+ self.containers = OrderedDict()
self.images = []
self.tmp_files = []
@@ -93,12 +96,22 @@
if self.network is None:
net_name = 'nifi-' + str(uuid.uuid4())
logging.info('Creating network: %s', net_name)
- self.network = self.client.networks.create(net_name)
+ # Set IP
+ ipam_pool = docker.types.IPAMPool(
+ subnet='192.168.42.0/24',
+ gateway='192.168.42.1'
+ )
+ ipam_config = docker.types.IPAMConfig(
+ pool_configs=[ipam_pool]
+ )
+ self.network = self.client.networks.create(net_name, ipam=ipam_config)
if engine == 'nifi':
self.deploy_nifi_flow(flow, name, vols)
elif engine == 'minifi-cpp':
self.deploy_minifi_cpp_flow(flow, name, vols)
+ elif engine == 'kafka-broker':
+ self.deploy_kafka_broker(name)
else:
raise Exception('invalid flow engine: \'%s\'' % engine)
@@ -148,7 +161,7 @@
logging.info('Started container \'%s\'', container.name)
- self.containers.append(container)
+ self.containers[container.name] = container
def deploy_nifi_flow(self, flow, name, vols):
dockerfile = dedent("""FROM {base_image}
@@ -198,7 +211,35 @@
logging.info('Started container \'%s\'', container.name)
- self.containers.append(container)
+ self.containers[container.name] = container
+
+ def deploy_kafka_broker(self, name):
+ dockerfile = dedent("""FROM {base_image}
+ USER root
+ CMD $KAFKA_HOME/bin/kafka-console-consumer.sh --bootstrap-server host.docker.internal:9092 --topic test > heaven_signal.txt
+ """.format(base_image='spotify/kafka:latest'))
+
+ logging.info('Creating and running docker container for kafka broker...')
+
+ broker = self.client.containers.run(
+ self.client.images.pull("spotify/kafka:latest"),
+ detach=True,
+ name='kafka-broker',
+ ports={'2181/tcp': 2181, '9092/tcp': 9092},
+ environment=["ADVERTISED_HOST=192.168.42.4", "ADVERTISED_PORT=9092"]
+ )
+ self.network.connect(broker, ipv4_address='192.168.42.4')
+
+ configured_image = self.build_image(dockerfile, [])
+ consumer = self.client.containers.run(
+ configured_image[0],
+ detach=True,
+ name='kafka-consumer',
+ network=self.network.name
+ )
+
+ self.containers[consumer.name] = consumer
+ self.containers[broker.name] = broker
def build_image(self, dockerfile, context_files):
conf_dockerfile_buffer = BytesIO()
@@ -227,6 +268,7 @@
custom_context=True,
rm=True,
forcerm=True)
+ logging.info('Created image with id: %s', configured_image[0].id)
self.images.append(configured_image)
finally:
@@ -247,7 +289,7 @@
"""
# Clean up containers
- for container in self.containers:
+ for container in self.containers.values():
logging.info('Cleaning up container: %s', container.name)
container.remove(v=True, force=True)
@@ -466,6 +508,14 @@
else:
return key
+class PublishKafka(Processor):
+ def __init__(self):
+ super(PublishKafka, self).__init__('PublishKafka',
+ properties={'Client Name': 'nghiaxlee', 'Known Brokers': '192.168.42.4:9092', 'Topic Name': 'test',
+ 'Batch Size': '10', 'Compress Codec': 'none', 'Delivery Guarantee': '1',
+ 'Request Timeout': '10 sec', 'Message Timeout': '5 sec'},
+ auto_terminate=['success'])
+
class InputPort(Connectable):
def __init__(self, name=None, remote_process_group=None):
diff --git a/docker/test/integration/minifi/test/__init__.py b/docker/test/integration/minifi/test/__init__.py
index 67affa9..71fd096 100644
--- a/docker/test/integration/minifi/test/__init__.py
+++ b/docker/test/integration/minifi/test/__init__.py
@@ -16,6 +16,8 @@
import logging
import shutil
import uuid
+import tarfile
+from io import BytesIO
from threading import Event
import os
@@ -67,6 +69,9 @@
super(DockerTestCluster, self).__init__()
+ if isinstance(output_validator, KafkaValidator):
+ output_validator.set_containers(self.containers)
+
def deploy_flow(self,
flow,
name=None,
@@ -89,6 +94,26 @@
name=name,
engine=engine)
+ def start_flow(self, name):
+ container = self.containers[name]
+ container.reload()
+ logging.info("Status before start: %s", container.status)
+ if container.status == 'exited':
+ logging.info("Start container: %s", name)
+ container.start()
+ return True
+ return False
+
+ def stop_flow(self, name):
+ container = self.containers[name]
+ container.reload()
+ logging.info("Status before stop: %s", container.status)
+ if container.status == 'running':
+ logging.info("Stop container: %s", name)
+ container.stop(timeout=0)
+ return True
+ return False
+
def put_test_data(self, contents):
"""
Creates a randomly-named file in the test input dir and writes
@@ -116,7 +141,7 @@
def log_nifi_output(self):
- for container in self.containers:
+ for container in self.containers.values():
container = self.client.containers.get(container.id)
logging.info('Container logs for container \'%s\':\n%s', container.name, container.logs().decode("utf-8"))
if b'Segmentation fault' in container.logs():
@@ -141,14 +166,20 @@
stats = container.stats(stream=False)
logging.info('Container stats:\n%s', stats)
- def check_output(self, timeout=5):
+ def check_output(self, timeout=5, **kwargs):
"""
Wait for flow output, validate it, and log minifi output.
"""
self.wait_for_output(timeout)
self.log_nifi_output()
-
- return self.output_validator.validate() and not self.segfault
+ if self.segfault:
+ return false
+ if isinstance(self.output_validator, FileOutputValidator):
+ return self.output_validator.validate(dir=kwargs.get('dir', ''))
+ return self.output_validator.validate()
+ def rm_out_child(self, dir):
+ logging.info('Removing %s from output folder', self.tmp_test_output_dir + dir)
+ shutil.rmtree(self.tmp_test_output_dir + dir)
def __exit__(self, exc_type, exc_val, exc_tb):
"""
@@ -204,6 +235,9 @@
def set_output_dir(self, output_dir):
self.output_dir = output_dir
+ def validate(self, dir=''):
+ pass
+
class SingleFileOutputValidator(FileOutputValidator):
"""
Validates the content of a single file in the given directory.
@@ -213,22 +247,70 @@
self.valid = False
self.expected_content = expected_content
+ def validate(self, dir=''):
+
+ self.valid = False
+
+ full_dir = self.output_dir + dir
+ logging.info("Output folder: %s", full_dir)
+
+ listing = listdir(full_dir)
+
+ if listing:
+ for l in listing:
+ logging.info("name:: %s", l)
+ out_file_name = listing[0]
+
+ with open(join(full_dir, out_file_name), 'r') as out_file:
+ contents = out_file.read()
+ logging.info("dir %s -- name %s", full_dir, out_file_name)
+ logging.info("expected %s -- content %s", self.expected_content, contents)
+
+ if self.expected_content in contents:
+ self.valid = True
+
+ return self.valid
+
+class KafkaValidator(OutputValidator):
+ """
+ Validates PublishKafka
+ """
+
+ def __init__(self, expected_content):
+ self.valid = False
+ self.expected_content = expected_content
+ self.containers = None
+
+ def set_containers(self, containers):
+ self.containers = containers
+
def validate(self):
if self.valid:
return True
+ if self.containers is None:
+ return self.valid
- listing = listdir(self.output_dir)
+ if 'kafka-consumer' not in self.containers:
+ logging.info('Not found kafka container.')
+ return False
+ else:
+ kafka_container = self.containers['kafka-consumer']
- if listing:
- out_file_name = listing[0]
+ output, stat = kafka_container.get_archive('/heaven_signal.txt')
+ file_obj = BytesIO()
+ for i in output:
+ file_obj.write(i)
+ file_obj.seek(0)
+ tar = tarfile.open(mode='r', fileobj=file_obj)
+ contents = tar.extractfile('heaven_signal.txt').read()
+ logging.info("expected %s -- content %s", self.expected_content, contents)
- with open(join(self.output_dir, out_file_name), 'r') as out_file:
- contents = out_file.read()
+ contents = contents.decode("utf-8")
+ if self.expected_content in contents:
+ self.valid = True
- if contents == self.expected_content:
- self.valid = True
-
+ logging.info("expected %s -- content %s", self.expected_content, contents)
return self.valid
class EmptyFilesOutPutValidator(FileOutputValidator):
@@ -238,14 +320,17 @@
def __init__(self):
self.valid = False
- def validate(self):
+ def validate(self, dir=''):
if self.valid:
return True
- listing = listdir(self.output_dir)
+ full_dir = self.output_dir + dir
+ logging.info("Output folder: %s", full_dir)
+
+ listing = listdir(full_dir)
if listing:
- self.valid = all(os.path.getsize(os.path.join(self.output_dir,x)) == 0 for x in listing)
+ self.valid = all(os.path.getsize(os.path.join(full_dir,x)) == 0 for x in listing)
return self.valid
@@ -256,12 +341,17 @@
def __init__(self):
self.valid = False
- def validate(self):
+ def validate(self, dir=''):
if self.valid:
return True
- self.valid = not bool(listdir(self.output_dir))
+ full_dir = self.output_dir + dir
+ logging.info("Output folder: %s", full_dir)
+
+ listing = listdir(full_dir)
+
+ self.valid = not bool(listing)
return self.valid
diff --git a/docker/test/integration/test_rdkafka.py b/docker/test/integration/test_rdkafka.py
new file mode 100644
index 0000000..5a5c696
--- /dev/null
+++ b/docker/test/integration/test_rdkafka.py
@@ -0,0 +1,76 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the \"License\"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an \"AS IS\" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from minifi import *
+from minifi.test import *
+
+
+def test_publish_kafka():
+ """
+ Verify delivery of message to kafka broker
+ """
+ producer_flow = GetFile('/tmp/input') >> PublishKafka() >> ('success', LogAttribute())
+
+ with DockerTestCluster(KafkaValidator('test')) as cluster:
+ cluster.put_test_data('test')
+ cluster.deploy_flow(None, engine='kafka-broker')
+ cluster.deploy_flow(producer_flow, name='minifi-producer', engine='minifi-cpp')
+
+ assert cluster.check_output(10)
+
+def test_no_broker():
+ """
+ Verify failure case when broker is down
+ """
+ producer_flow = (GetFile('/tmp/input') >> PublishKafka()
+ >> (('failure', PutFile('/tmp/output')),
+ ('success', LogAttribute())))
+
+ with DockerTestCluster(SingleFileOutputValidator('no broker')) as cluster:
+ cluster.put_test_data('no broker')
+ cluster.deploy_flow(producer_flow, name='minifi-producer', engine='minifi-cpp')
+
+ assert cluster.check_output(30)
+
+def test_broker_on_off():
+ """
+ Verify delivery of message when broker is unstable
+ """
+ producer_flow = (GetFile('/tmp/input') >> PublishKafka()
+ >> (('success', PutFile('/tmp/output/success')),
+ ('failure', PutFile('/tmp/output/failure'))))
+
+ with DockerTestCluster(SingleFileOutputValidator('test')) as cluster:
+ cluster.put_test_data('test')
+ cluster.deploy_flow(None, engine='kafka-broker')
+ cluster.deploy_flow(producer_flow, name='minifi-producer', engine='minifi-cpp')
+
+ def start_kafka():
+ assert cluster.start_flow('kafka-broker')
+ assert cluster.start_flow('kafka-consumer')
+ def stop_kafka():
+ assert cluster.stop_flow('kafka-consumer')
+ assert cluster.stop_flow('kafka-broker')
+
+ assert cluster.check_output(10, dir='/success')
+ stop_kafka()
+ assert cluster.check_output(30, dir='/failure')
+ start_kafka()
+ cluster.rm_out_child('/success')
+ assert cluster.check_output(30, dir='/success')
+ stop_kafka()
+ cluster.rm_out_child('/failure')
+ assert cluster.check_output(30, dir='/failure')
+