MINIFICPP-1445 - Move docker integration tests to python behave

Signed-off-by: Arpad Boda <aboda@apache.org>

This closes #995
diff --git a/docker/DockerVerify.sh b/docker/DockerVerify.sh
index f4efbc9..dc897c2 100755
--- a/docker/DockerVerify.sh
+++ b/docker/DockerVerify.sh
@@ -58,7 +58,8 @@
 fi
 
 pip install --upgrade \
-            pytest \
+            behave \
+            pytimeparse \
             docker \
             PyYAML \
             m2crypto \
@@ -71,4 +72,24 @@
 PYTHONPATH="${PYTHONPATH}:${docker_dir}/test/integration"
 export PYTHONPATH
 
-exec pytest -s -v "${docker_dir}"/test/integration
+BEHAVE_OPTS="-f pretty --logging-level INFO --no-capture"
+
+cd "${docker_dir}/test/integration"
+exec 
+  behave $BEHAVE_OPTS "features/file_system_operations.feature" -n "Get and put operations run in a simple flow" &&
+  behave $BEHAVE_OPTS "features/file_system_operations.feature" -n "PutFile does not overwrite a file that already exists" &&
+  behave $BEHAVE_OPTS "features/s2s.feature" -n "A MiNiFi instance produces and transfers data to a NiFi instance via s2s" &&
+  behave $BEHAVE_OPTS "features/s2s.feature" -n "Zero length files are transfered between via s2s if the \"drop empty\" connection property is false" &&
+  behave $BEHAVE_OPTS "features/s2s.feature" -n "Zero length files are not transfered between via s2s if the \"drop empty\" connection property is true" &&
+  behave $BEHAVE_OPTS "features/http.feature" -n "A MiNiFi instance transfers data to another MiNiFi instance" &&
+  behave $BEHAVE_OPTS "features/http.feature" -n "A MiNiFi instance sends data through a HTTP proxy and another one listens" &&
+  behave $BEHAVE_OPTS "features/http.feature" -n "A MiNiFi instance and transfers hashed data to another MiNiFi instance" &&
+  behave $BEHAVE_OPTS "features/kafka.feature" -n "A MiNiFi instance transfers data to a kafka broker" &&
+  behave $BEHAVE_OPTS "features/kafka.feature" -n "PublishKafka sends flowfiles to failure when the broker is not available" &&
+  behave $BEHAVE_OPTS "features/kafka.feature" -n "PublishKafka sends can use SSL connect" &&
+  behave $BEHAVE_OPTS "features/s3.feature" -n "A MiNiFi instance transfers encoded data to s3" &&
+  behave $BEHAVE_OPTS "features/s3.feature" -n "A MiNiFi instance transfers encoded data through a http proxy to s3" &&
+  behave $BEHAVE_OPTS "features/s3.feature" -n "A MiNiFi instance can remove s3 bucket objects" &&
+  behave $BEHAVE_OPTS "features/s3.feature" -n "Deletion of a s3 object through a proxy-server succeeds" &&
+  behave $BEHAVE_OPTS "features/s3.feature" -n "A MiNiFi instance can download s3 bucket objects directly" &&
+  behave $BEHAVE_OPTS "features/s3.feature" -n "A MiNiFi instance can download s3 bucket objects via a http-proxy"
diff --git a/docker/Dockerfile b/docker/Dockerfile
index 14ea617..ea6049f 100644
--- a/docker/Dockerfile
+++ b/docker/Dockerfile
@@ -36,6 +36,7 @@
   openjdk8-jre-base \
   openjdk8 \
   autoconf \
+  automake \
   libtool \
   wget \
   gdb \
diff --git a/docker/test/integration/MiNiFi_integration_test_driver.py b/docker/test/integration/MiNiFi_integration_test_driver.py
new file mode 100644
index 0000000..a1c63a0
--- /dev/null
+++ b/docker/test/integration/MiNiFi_integration_test_driver.py
@@ -0,0 +1,210 @@
+from subprocess import Popen, PIPE, STDOUT
+
+import docker
+import logging
+import os
+import shutil
+import threading
+import time
+import uuid
+
+from pydoc import locate
+
+from minifi.core.InputPort import InputPort
+
+from minifi.core.DockerTestCluster import DockerTestCluster
+from minifi.core.SingleNodeDockerCluster import SingleNodeDockerCluster
+from minifi.core.DockerTestDirectoryBindings import DockerTestDirectoryBindings
+
+from minifi.validators.EmptyFilesOutPutValidator import EmptyFilesOutPutValidator
+from minifi.validators.NoFileOutPutValidator import NoFileOutPutValidator
+from minifi.validators.SingleFileOutputValidator import SingleFileOutputValidator
+
+class MiNiFi_integration_test():
+    def __init__(self, context):
+        logging.info("MiNiFi_integration_test init")
+        self.test_id = str(uuid.uuid4())
+        self.clusters = {}
+
+        self.connectable_nodes = []
+        # Remote process groups are not connectables
+        self.remote_process_groups = []
+        self.file_system_observer = None
+
+        self.docker_network = None
+
+        self.docker_directory_bindings = DockerTestDirectoryBindings()
+        self.docker_directory_bindings.create_new_data_directories(self.test_id)
+
+    def __del__(self):
+        logging.info("MiNiFi_integration_test cleanup")
+
+        # Clean up network, for some reason only this order of events work for cleanup
+        if self.docker_network is not None:
+            logging.info('Cleaning up network network: %s', self.docker_network.name)
+            while len(self.docker_network.containers) != 0:
+                for container in self.docker_network.containers:
+                    self.docker_network.disconnect(container, force=True)
+                self.docker_network.reload()
+            self.docker_network.remove()
+
+        container_ids = []
+        for cluster in self.clusters.values():
+            for container in cluster.containers.values():
+                container_ids.append(container.id)
+            del cluster
+
+        # The cluster deleter is not reliable for cleaning up
+        docker_client = docker.from_env()
+        for container_id in container_ids:    
+            self.delete_docker_container_by_id(container_id)
+
+        del self.docker_directory_bindings
+
+    def delete_docker_container_by_id(self, container_id):
+        docker_client = docker.from_env()
+        try:
+            container = docker_client.containers.get(container_id)
+            container.remove(v=True, force=True)
+        except docker.errors.NotFound:
+            logging.warn("Contaner '%s' is already cleaned up before.", container_id)
+            return
+        wait_start_time = time.perf_counter()
+        while (time.perf_counter() - wait_start_time) < 35:
+            try:
+                docker_client.containers.get(container_id)
+                logging.error("Docker container '%s' still exists after removal attempt. Waiting for docker daemon to update...", container_id)
+                time.sleep(5)
+            except docker.errors.NotFound:
+                logging.info("Docker container cleanup successful for '%s'.", container_id)
+                return
+        logging.error("Failed to clean up docker container '%s'.", container_id)
+
+    def docker_path_to_local_path(self, docker_path):
+        return self.docker_directory_bindings.docker_path_to_local_path(self.test_id, docker_path)
+
+    def get_test_id(self):
+        return self.test_id
+
+    def acquire_cluster(self, name):
+        return self.clusters.setdefault(name, DockerTestCluster())
+
+    def set_up_cluster_network(self):
+        self.docker_network = SingleNodeDockerCluster.create_docker_network()
+        for cluster in self.clusters.values():
+            cluster.set_network(self.docker_network)
+
+    def start(self):
+        logging.info("MiNiFi_integration_test start")
+        self.set_up_cluster_network()
+        for cluster in self.clusters.values():
+            logging.info("Starting cluster %s with an engine of %s", cluster.get_name(), cluster.get_engine())
+            cluster.set_directory_bindings(self.docker_directory_bindings.get_directory_bindings(self.test_id))
+            cluster.deploy_flow()
+        for cluster_name, cluster in self.clusters.items():
+            startup_success = True
+            logging.info("Engine: %s", cluster.get_engine())
+            if cluster.get_engine() == "minifi-cpp":
+                startup_success = cluster.wait_for_app_logs("Starting Flow Controller", 120)
+            elif cluster.get_engine() == "nifi":
+                startup_success = cluster.wait_for_app_logs("Starting Flow Controller...", 120)
+            elif cluster.get_engine() == "kafka-broker":
+                startup_success = cluster.wait_for_app_logs("Startup complete.", 120)
+            if not startup_success:
+                cluster.log_nifi_output()
+            assert startup_success
+
+    def add_node(self, processor):
+        if processor.get_name() in (elem.get_name() for elem in self.connectable_nodes):
+            raise Exception("Trying to register processor with an already registered name: \"%s\"" % processor.get_name())
+        self.connectable_nodes.append(processor)
+
+    def get_or_create_node_by_name(self, node_name):
+        node = self.get_node_by_name(node_name) 
+        if node is None:
+            if node_name == "RemoteProcessGroup":
+                raise Exception("Trying to register RemoteProcessGroup without an input port or address.")
+            node = locate("minifi.processors." + node_name + "." + node_name)()
+            node.set_name(node_name)
+            self.add_node(node)
+        return node
+
+    def get_node_by_name(self, name):
+        for node in self.connectable_nodes:
+            if name == node.get_name():
+                return node
+        raise Exception("Trying to fetch unknow node: \"%s\"" % name)
+
+    def add_remote_process_group(self, remote_process_group):
+        if remote_process_group.get_name() in (elem.get_name() for elem in self.remote_process_groups):
+            raise Exception("Trying to register remote_process_group with an already registered name: \"%s\"" % remote_process_group.get_name())
+        self.remote_process_groups.append(remote_process_group)
+
+    def get_remote_process_group_by_name(self, name):
+        for node in self.remote_process_groups:
+            if name == node.get_name():
+                return node
+        raise Exception("Trying to fetch unknow node: \"%s\"" % name)
+
+    @staticmethod
+    def generate_input_port_for_remote_process_group(remote_process_group, name):
+        input_port_node = InputPort(name, remote_process_group)
+        # Generate an MD5 hash unique to the remote process group id
+        input_port_node.set_uuid(uuid.uuid3(remote_process_group.get_uuid(), "input_port"))
+        return input_port_node
+
+    def add_test_data(self, path, test_data):
+        file_name = str(uuid.uuid4())
+        self.docker_directory_bindings.put_file_to_docker_path(self.test_id, path, file_name, test_data.encode('utf-8'))
+
+    def put_test_resource(self, file_name, contents):
+        self.docker_directory_bindings.put_test_resource(self.test_id, file_name, contents)
+
+    def get_out_subdir(self, subdir):
+        return self.docker_directory_bindings.get_out_subdir(self.test_id, subdir)
+
+    def rm_out_child(self, subdir):
+        self.docker_directory_bindings.rm_out_child(self.test_id, subdir)
+
+    def add_file_system_observer(self, file_system_observer):
+        self.file_system_observer = file_system_observer
+
+    def check_for_no_files_generated(self, timeout_seconds, subdir=''):
+        output_validator = NoFileOutPutValidator()
+        output_validator.set_output_dir(self.file_system_observer.get_output_dir())
+        self.check_output(timeout_seconds, output_validator, 1, subdir)
+
+    def check_for_file_with_content_generated(self, content, timeout_seconds, subdir=''):
+        output_validator = SingleFileOutputValidator(content)
+        output_validator.set_output_dir(self.file_system_observer.get_output_dir())
+        self.check_output(timeout_seconds, output_validator, 1, subdir)
+
+    def check_for_multiple_empty_files_generated(self, timeout_seconds, subdir=''):
+        output_validator = EmptyFilesOutPutValidator()
+        output_validator.set_output_dir(self.file_system_observer.get_output_dir())
+        self.check_output(timeout_seconds, output_validator, 2, subdir)
+
+    def check_output(self, timeout_seconds, output_validator, max_files, subdir):
+        if subdir:
+            output_validator.subdir = subdir
+        self.file_system_observer.wait_for_output(timeout_seconds, max_files)
+        for cluster in self.clusters.values():
+            # Logs for both nifi and minifi, but not other engines
+            cluster.log_nifi_output()
+            assert not cluster.segfault_happened()
+        assert output_validator.validate()
+
+    def check_s3_server_object_data(self, cluster_name, object_data):
+        cluster = self.acquire_cluster(cluster_name)
+        cluster.check_s3_server_object_data(object_data)
+
+    def check_s3_server_object_metadata(self, cluster_name, content_type):
+        cluster = self.acquire_cluster(cluster_name)
+        cluster.check_s3_server_object_metadata(content_type)
+
+    def check_empty_s3_bucket(self, cluster_name):
+        cluster = self.acquire_cluster(cluster_name)
+        assert cluster.is_s3_bucket_empty()
+
+    def check_http_proxy_access(self, cluster_name, url):
+        self.clusters[cluster_name].check_http_proxy_access(url)
diff --git a/docker/test/integration/README.md b/docker/test/integration/README.md
index e4fb628..421cfca 100644
--- a/docker/test/integration/README.md
+++ b/docker/test/integration/README.md
@@ -19,180 +19,19 @@
 tests are designed to test the integration between distinct MiNiFi instances as
 well as other systems which are available in docker, such as Apache NiFi.
 
-* Currently test_https.py does not work due to the upgrade to NiFi 1.7. This will be resolved as
-  soon as possible.
+* Currently there is an extra unused test mockup for testing TLS with invoke_http.
+* HashContent tests do not actually seem what they advertise to
+* There is a test requirement for PublishKafka, confirming it can handle broker outages. This will be reintroduced when ConsumeKafka is on the master and will have its similar testing requirements implemented.
 
 ## Test environment
 
-The test framework is written in Python 3 and uses pip3 to add required packages.
+The test framework is written in Python 3 and uses pip3 to add required packages. The framework it uses is python-behave, a BDD testing framework. The feature specifications are written in human readable format in the features directory. Please refer to the behave documentation on how the framework performs testing.
 
 The tests use docker containers so docker engine should be installed on your system. Check the [get docker](https://docs.docker.com/get-docker/) page for further information.
 
 One of the required python packages is the `m2crypto` package which depends on `swig` for compilation,
 so `swig` should also be installed on your system (e.g. `sudo apt install swig` on debian based systems).
 
-## Test Execution Lifecycle
-
-Each test involves the following stages as part of its execution lifecycle:
-
-### Definition of flows/Flow DSL
-
-Flows are defined using a python-native domain specific language (DSL). The DSL
-supports the standard primitives which make up a NiFi/MiNiFi flow, such as
-processors, connections, and controller services. Several processors defined in
-the DSL have optional, named parameters enabling concise flow expression.
-
-By default, all relationships are set to auto-terminate. If a relationship is
-used, it is automatically taken out of the auto\_terminate list.
-
-**Example Trivial Flow:**
-
-```python
-flow = GetFile('/tmp/input') >> LogAttribute() >> PutFile('/tmp/output')
-```
-
-#### Supported Processors
-
-The following processors/parameters are supported:
-
-**GetFile**
-
-- input\_dir
-
-**PutFile**
-
-- output\_dir
-
-**LogAttribute**
-
-**ListenHTTP**
-
-- port
-- cert=None
-
-**InvokeHTTP**
-
-- url
-- method='GET'
-- ssl\_context\_service=None
-
-#### Remote Process Groups
-
-Remote process groups and input ports are supported.
-
-**Example InputPort/RemoteProcessGroup:**
-
-```python
-port = InputPort('from-minifi', RemoteProcessGroup('http://nifi:8080/nifi'))
-```
-
-InputPorts may be used as inputs or outputs in the flow DSL:
-
-```python
-recv_flow = (port
-             >> LogAttribute()
-             >> PutFile('/tmp/output'))
-
-send_flow = (GetFile('/tmp/input')
-             >> LogAttribute()
-             >> port)
-```
-
-These example flows could be deployed as separate NiFi/MiNiFi instances where
-the send\_flow would send data to the recv\_flow using the site-to-site
-protocol.
-
-### Definition of an output validator
-
-The output validator is responsible for checking the state of a cluster for
-valid output conditions. Currently, the only supported output validator is the
-SingleFileOutputValidator, which looks for a single file to be written to
-/tmp/output by a flow having a given string as its contents.
-
-**Example SingleFileOutputValidator:**
-
-```python
-SingleFileOutputValidator('example output')
-```
-
-This example SingleFileOutputValidator would validate that a single file is
-written with the contents 'example output.'
-
-### Creation of a DockerTestCluster
-
-DockerTestCluster instances are used to deploy one or more flow to a simulated
-or actual multi-host docker cluster. This enables testing of interactions
-between multiple system components, such as MiNiFi flows. Before the test
-cluster is destroyed, an assertion may be performed on the results of the
-*check\_output()* method of the cluster. This invokes the validator supplied at
-construction against the output state of the system.
-
-Creation of a DockerTestCluster is simple:
-
-**Example DockerTestCluster Instantiation:**
-
-```python
-with DockerTestCluster(SingleFileOutputValidator('test')) as cluster:
-  ...
-  # Perform test operations
-  ...
-  assert cluster.check_output()
-```
-
-Note that a docker cluster must be created inside of a *with* structure to
-ensure that all resources are created and destroyed cleanly.
-
-### Insertion of test input data
-
-Although arbitrary NiFi flows can ingest data from a multitude of sources, a
-MiNiFi system integration test is expected to receive input via deterministed,
-controlled channels. The primary supported method of providing input to a
-MiNiFi system integration test is to insert data into the filesystem at
-/tmp/input.
-
-To write a string to the contents of a file in /tmp/input, use the
-*put\_test\_data()* method.
-
-**Example put\_test\_data() Usage:**
-
-```python
-cluster.put_test_data('test')
-```
-
-This writes a file with a random name to /tmp/input, with the contents 'test.'
-
-To provide a resource to a container, such as a TLS certificate, use the
-*put\_test\_resource()* method to write a resource file to /tmp/resources.
-
-**Example put\_test\_resource() Usage:**
-
-```python
-cluster.put_test_resource('test-resource', 'resource contents')
-```
-
-This writes a file to /tmp/resources/test-resource with the contents 'resource
-contents.'
-
-### Deployment of one or more flows
-
-Deployment of flows to a test cluster is performed using the *deploy\_flow()*
-method of a cluster. Each flow is deployed as a separate docker service having
-its own DNS name. If a name is not provided upon deployment, a random name will
-be used.
-
-**Example deploy\_flow() Usage:**
-
-```python
-cluster.deploy_flow(flow, name='test-flow')
-```
-
-The deploy\_flow function defaults to a MiNiFi - C++ engine, but other engines,
-such as NiFi may be used:
-
-```python
-cluster.deploy_flow(flow, engine='nifi')
-```
-
 ### Execution of one or more flows
 
 Flows are executed immediately upon deployment and according to schedule
@@ -202,27 +41,3 @@
 flows are executed immediately upon input availability and output is validated
 immediately after it is written to disk.
 
-### Output validation
-
-As soon as data is written to /tmp/output, the OutputValidator (defined
-according to the documentation above) is executed on the output. The
-*check\_output()* cluster method waits for up to 5 seconds for valid output.
-
-### Cluster teardown/cleanup
-
-The deployment of a test cluster involves creating one or more docker
-containers and networks, as well as temporary files/directories on the host
-system. All resources are cleaned up automatically as long as clusters are
-created within a *with* block.
-
-```python
-
-# Using the with block ensures that all cluster resources are cleaned up after
-# the test cluster is no longer needed.
-
-with DockerTestCluster(SingleFileOutputValidator('test')) as cluster:
-  ...
-  # Perform test operations
-  ...
-  assert cluster.check_output()
-```
diff --git a/docker/test/integration/__init__.py b/docker/test/integration/__init__.py
new file mode 100644
index 0000000..e69de29
--- /dev/null
+++ b/docker/test/integration/__init__.py
diff --git a/docker/test/integration/environment.py b/docker/test/integration/environment.py
new file mode 100644
index 0000000..68ddd42
--- /dev/null
+++ b/docker/test/integration/environment.py
@@ -0,0 +1,27 @@
+from behave import fixture, use_fixture
+import sys
+sys.path.append('../minifi')
+import logging
+
+from MiNiFi_integration_test_driver import MiNiFi_integration_test
+from minifi import *
+
+def raise_exception(exception):
+    raise exception
+
+@fixture
+def test_driver_fixture(context):
+    logging.info("Integration test setup")
+    context.test = MiNiFi_integration_test(context)
+    yield context.test
+    logging.info("Integration test teardown...")
+    del context.test
+
+def before_scenario(context, scenario):
+    use_fixture(test_driver_fixture, context)
+
+def after_scenario(context, scenario):
+	pass
+
+def before_all(context):
+    context.config.setup_logging()
diff --git a/docker/test/integration/features/file_system_operations.feature b/docker/test/integration/features/file_system_operations.feature
new file mode 100644
index 0000000..a11e398
--- /dev/null
+++ b/docker/test/integration/features/file_system_operations.feature
@@ -0,0 +1,40 @@
+Feature: File system operations are handled by the GetFile and PutFile processors
+  In order to store and access data on the local file system
+  As a user of MiNiFi
+  I need to have GetFile and PutFile processors
+
+  Background:
+    Given the content of "/tmp/output" is monitored
+
+  Scenario: Get and put operations run in a simple flow
+    Given a GetFile processor with the "Input Directory" property set to "/tmp/input"
+    And a file with the content "test" is present in "/tmp/input"
+    And a PutFile processor with the "Directory" property set to "/tmp/output"
+    And the "success" relationship of the GetFile processor is connected to the PutFile
+    When the MiNiFi instance starts up
+    Then a flowfile with the content "test" is placed in the monitored directory in less than 10 seconds
+
+  Scenario: PutFile does not overwrite a file that already exists
+    Given a set of processors:
+      | type    | name      | uuid                                 |
+      | GetFile | GetFile   | 66259995-11da-41df-bff7-e262d5f6d7c9 |
+      | PutFile | PutFile_1 | 694423a0-26f3-4e95-9f9f-c03b6d6c189d |
+      | PutFile | PutFile_2 | f37e51e9-ad67-4e16-9dc6-ad853b0933e3 |
+      | PutFile | PutFile_3 | f37e51e9-ad67-4e16-9dc6-ad853b0933e3 |
+
+    And these processor properties are set:
+      | processor name | property name   | property value |
+      | GetFile        | Input Directory | /tmp/input     |
+      | PutFile_1      | Input Directory | /tmp           |
+      | PutFile_2      | Directory       | /tmp           |
+      | PutFile_3      | Directory       | /tmp/output    |
+
+    And the processors are connected up as described here:
+      | source name | relationship name | destination name |
+      | GetFile     | success           | PutFile_1        |
+      | PutFile_1   | success           | PutFile_2        |
+      | PutFile_2   | failuire          | PutFile_3        |
+
+    And a file with the content "test" is present in "/tmp/input"
+    When the MiNiFi instance starts up
+    Then a flowfile with the content "test" is placed in the monitored directory in less than 10 seconds
diff --git a/docker/test/integration/features/http.feature b/docker/test/integration/features/http.feature
new file mode 100644
index 0000000..0652fd9
--- /dev/null
+++ b/docker/test/integration/features/http.feature
@@ -0,0 +1,60 @@
+Feature: Sending data using InvokeHTTP to a receiver using ListenHTTP
+  In order to send and receive data via HTTP
+  As a user of MiNiFi
+  I need to have ListenHTTP and InvokeHTTP processors
+
+  Background:
+    Given the content of "/tmp/output" is monitored
+
+  Scenario: A MiNiFi instance transfers data to another MiNiFi instance
+    Given a GetFile processor with the "Input Directory" property set to "/tmp/input"
+    And a file with the content "test" is present in "/tmp/input"
+    And a InvokeHTTP processor with the "Remote URL" property set to "http://secondary:8080/contentListener"
+    And the "HTTP Method" of the InvokeHTTP processor is set to "POST"
+    And the "success" relationship of the GetFile processor is connected to the InvokeHTTP
+
+    And a ListenHTTP processor with the "Listening Port" property set to "8080" in a "secondary" flow
+    And a PutFile processor with the "Directory" property set to "/tmp/output" in the "secondary" flow
+    And the "success" relationship of the ListenHTTP processor is connected to the PutFile
+
+    When both instances start up
+    Then a flowfile with the content "test" is placed in the monitored directory in less than 30 seconds
+
+  Scenario: A MiNiFi instance sends data through a HTTP proxy and another one listens
+    Given a GetFile processor with the "Input Directory" property set to "/tmp/input"
+    And a file with the content "test" is present in "/tmp/input"
+    And a InvokeHTTP processor with the "Remote URL" property set to "http://minifi-listen:8080/contentListener"
+    And these processor properties are set to match the http proxy:
+      | processor name | property name             | property value |
+      | InvokeHTTP     | HTTP Method               | POST           |
+      | InvokeHTTP     | Proxy Host                | http-proxy     |
+      | InvokeHTTP     | Proxy Port                | 3128           |
+      | InvokeHTTP     | invokehttp-proxy-username | admin          |
+      | InvokeHTTP     | invokehttp-proxy-password | test101        |
+    And the "success" relationship of the GetFile processor is connected to the InvokeHTTP
+
+    And a http proxy server "http-proxy" is set up accordingly 
+
+    And a ListenHTTP processor with the "Listening Port" property set to "8080" in a "minifi-listen" flow
+    And a PutFile processor with the "Directory" property set to "/tmp/output" in the "minifi-listen" flow
+    And the "success" relationship of the ListenHTTP processor is connected to the PutFile
+
+    When all instances start up
+    Then a flowfile with the content "test" is placed in the monitored directory in less than 60 seconds
+    And no errors were generated on the "http-proxy" regarding "http://minifi-listen:8080/contentListener"
+
+  Scenario: A MiNiFi instance and transfers hashed data to another MiNiFi instance
+    Given a GetFile processor with the "Input Directory" property set to "/tmp/input"
+    And a file with the content "test" is present in "/tmp/input"
+    And a HashContent processor with the "Hash Attribute" property set to "hash"
+    And a InvokeHTTP processor with the "Remote URL" property set to "http://secondary:8080/contentListener"
+    And the "HTTP Method" of the InvokeHTTP processor is set to "POST"
+    And the "success" relationship of the GetFile processor is connected to the HashContent
+    And the "success" relationship of the HashContent processor is connected to the InvokeHTTP
+
+    And a ListenHTTP processor with the "Listening Port" property set to "8080" in a "secondary" flow
+    And a PutFile processor with the "Directory" property set to "/tmp/output" in the "secondary" flow
+    And the "success" relationship of the ListenHTTP processor is connected to the PutFile
+
+    When both instances start up
+    Then a flowfile with the content "test" is placed in the monitored directory in less than 30 seconds
diff --git a/docker/test/integration/features/https.feature b/docker/test/integration/features/https.feature
new file mode 100644
index 0000000..de13b05
--- /dev/null
+++ b/docker/test/integration/features/https.feature
@@ -0,0 +1,23 @@
+Feature: Using SSL context service to send data with TLS
+  In order to send data via HTTPS
+  As a user of MiNiFi
+  I need to have access to the SSLContextService
+
+  Background:
+    Given the content of "/tmp/output" is monitored
+
+  Scenario: A MiNiFi instance sends data using InvokeHTTP to a receiver using ListenHTTP with TLS
+    Given a GetFile processor with the "Input Directory" property set to "/tmp/input"
+    And a file with the content "test" is present in "/tmp/input"
+    And a InvokeHTTP processor with the "Remote URL" property set to "https://secondary:4430/contentListener"
+    And the "HTTP Method" of the InvokeHTTP processor is set to "POST"
+
+    And the "success" relationship of the GetFile processor is connected to the InvokeHTTP
+
+    And a ListenHTTP processor with the "Listening Port" property set to "4430" in a "secondary" flow
+    And a PutFile processor with the "Directory" property set to "/tmp/output" in the "secondary" flow
+    And the "success" relationship of the ListenHTTP processor is connected to the PutFile
+
+    And an ssl context service set up for InvokeHTTP and ListenHTTP
+    When both instances start up
+    Then a flowfile with the content "test" is placed in the monitored directory in less than 60 seconds
diff --git a/docker/test/integration/features/kafka.feature b/docker/test/integration/features/kafka.feature
new file mode 100644
index 0000000..2395115
--- /dev/null
+++ b/docker/test/integration/features/kafka.feature
@@ -0,0 +1,59 @@
+Feature: Sending data to using Kafka streaming platform using PublishKafka
+  In order to send data to a Kafka stream
+  As a user of MiNiFi
+  I need to have PublishKafka processor
+
+  Background:
+    Given the content of "/tmp/output" is monitored
+
+  Scenario: A MiNiFi instance transfers data to a kafka broker
+    Given a GetFile processor with the "Input Directory" property set to "/tmp/input"
+    And a file with the content "test" is present in "/tmp/input"
+    And a PublishKafka processor set up to communicate with a kafka broker instance
+    And a PutFile processor with the "Directory" property set to "/tmp/output"
+    And the "success" relationship of the GetFile processor is connected to the PublishKafka
+    And the "success" relationship of the PublishKafka processor is connected to the PutFile
+
+    And a kafka broker "broker" is set up in correspondence with the PublishKafka
+
+    When both instances start up
+    Then a flowfile with the content "test" is placed in the monitored directory in less than 60 seconds
+
+  Scenario: PublishKafka sends flowfiles to failure when the broker is not available
+    Given a GetFile processor with the "Input Directory" property set to "/tmp/input"
+    And a file with the content "no broker" is present in "/tmp/input"
+    And a PublishKafka processor set up to communicate with a kafka broker instance
+    And a PutFile processor with the "Directory" property set to "/tmp/output"
+    And the "success" relationship of the GetFile processor is connected to the PublishKafka
+    And the "failure" relationship of the PublishKafka processor is connected to the PutFile
+
+    When the MiNiFi instance starts up
+    Then a flowfile with the content "no broker" is placed in the monitored directory in less than 60 seconds
+
+  Scenario: PublishKafka sends can use SSL connect
+    Given a GetFile processor with the "Input Directory" property set to "/tmp/input"
+    And a file with the content "test" is present in "/tmp/input"
+    And a PublishKafka processor set up to communicate with a kafka broker instance
+    And these processor properties are set:
+      | processor name | property name          | property value                             |
+      | PublishKafka   | Client Name            | LMN                                        |
+      | PublishKafka   | Known Brokers          | kafka-broker:9093                          |
+      | PublishKafka   | Topic Name             | test                                       |
+      | PublishKafka   | Batch Size             | 10                                         |
+      | PublishKafka   | Compress Codec         | none                                       |
+      | PublishKafka   | Delivery Guarantee     | 1                                          |
+      | PublishKafka   | Request Timeout        | 10 sec                                     |
+      | PublishKafka   | Message Timeout Phrase | 12 sec                                     |
+      | PublishKafka   | Security CA Key        | /tmp/resources/certs/ca-cert               |
+      | PublishKafka   | Security Cert          | /tmp/resources/certs/client_LMN_client.pem |
+      | PublishKafka   | Security Pass Phrase   | abcdefgh                                   |
+      | PublishKafka   | Security Private Key   | /tmp/resources/certs/client_LMN_client.key |
+      | PublishKafka   | Security Protocol      | ssl                                        |
+    And a PutFile processor with the "Directory" property set to "/tmp/output"
+    And the "success" relationship of the GetFile processor is connected to the PublishKafka
+    And the "success" relationship of the GetFile processor is connected to the PutFile
+
+    And a kafka broker "broker" is set up in correspondence with the PublishKafka
+
+    When both instances start up
+    Then a flowfile with the content "test" is placed in the monitored directory in less than 60 seconds
diff --git a/docker/test/integration/features/s2s.feature b/docker/test/integration/features/s2s.feature
new file mode 100644
index 0000000..5b8c20a
--- /dev/null
+++ b/docker/test/integration/features/s2s.feature
@@ -0,0 +1,45 @@
+Feature: Sending data from MiNiFi-C++ to NiFi using S2S protocol
+  In order to transfer data inbetween NiFi and MiNiFi flows
+  As a user of MiNiFi
+  I need to have RemoteProcessGroup flow nodes
+
+  Background:
+    Given the content of "/tmp/output" is monitored
+
+  Scenario: A MiNiFi instance produces and transfers data to a NiFi instance via s2s
+    Given a GetFile processor with the "Input Directory" property set to "/tmp/input"
+    And a file with the content "test" is present in "/tmp/input"
+    And a RemoteProcessGroup node opened on "http://nifi:8080/nifi"
+    And the "success" relationship of the GetFile processor is connected to the input port on the RemoteProcessGroup
+
+    And a NiFi flow "nifi" receiving data from a RemoteProcessGroup "from-minifi" on port 8080
+    And a PutFile processor with the "Directory" property set to "/tmp/output" in the "nifi" flow
+    And the "success" relationship of the from-minifi is connected to the PutFile
+
+    When both instances start up
+    Then a flowfile with the content "test" is placed in the monitored directory in less than 90 seconds
+
+  Scenario: Zero length files are transfered between via s2s if the "drop empty" connection property is false
+    Given a GenerateFlowFile processor with the "File Size" property set to "0B"
+    And a RemoteProcessGroup node opened on "http://nifi:8080/nifi"
+    And the "success" relationship of the GenerateFlowFile processor is connected to the input port on the RemoteProcessGroup
+
+    And a NiFi flow "nifi" receiving data from a RemoteProcessGroup "from-minifi" on port 8080
+    And a PutFile processor with the "Directory" property set to "/tmp/output" in the "nifi" flow
+    And the "success" relationship of the from-minifi is connected to the PutFile
+
+    When both instances start up
+    Then at least one empty flowfile is placed in the monitored directory in less than 90 seconds
+
+  Scenario: Zero length files are not transfered between via s2s if the "drop empty" connection property is true
+    Given a GenerateFlowFile processor with the "File Size" property set to "0B"
+    And a RemoteProcessGroup node opened on "http://nifi:8080/nifi"
+    And the "success" relationship of the GenerateFlowFile processor is connected to the input port on the RemoteProcessGroup
+    And the connection going to the RemoteProcessGroup has "drop empty" set
+
+    And a NiFi flow "nifi" receiving data from a RemoteProcessGroup "from-minifi" on port 8080
+    And a PutFile processor with the "Directory" property set to "/tmp/output" in the "nifi" flow
+    And the "success" relationship of the from-minifi is connected to the PutFile
+
+    When both instances start up
+    Then no files are placed in the monitored directory in 90 seconds of running time
diff --git a/docker/test/integration/features/s3.feature b/docker/test/integration/features/s3.feature
new file mode 100644
index 0000000..897bf18
--- /dev/null
+++ b/docker/test/integration/features/s3.feature
@@ -0,0 +1,155 @@
+Feature: Sending data from MiNiFi-C++ to an AWS server
+  In order to transfer data to interact with AWS servers
+  As a user of MiNiFi
+  I need to have PutS3Object and DeleteS3Object processors
+
+  Background:
+    Given the content of "/tmp/output" is monitored
+
+  Scenario: A MiNiFi instance transfers encoded data to s3
+    Given a GetFile processor with the "Input Directory" property set to "/tmp/input"
+    And a file with the content "LH_O#L|FD<FASD{FO#@$#$%^ \"#\"$L%:\"@#$L\":test_data#$#%#$%?{\"F{" is present in "/tmp/input"
+    And a PutS3Object processor set up to communicate with an s3 server
+    And a PutFile processor with the "Directory" property set to "/tmp/output"
+    And the "success" relationship of the GetFile processor is connected to the PutS3Object
+    And the "success" relationship of the PutS3Object processor is connected to the PutFile
+
+    And a s3 server "s3" is set up in correspondence with the PutS3Object
+
+    When both instances start up
+
+    Then a flowfile with the content "test" is placed in the monitored directory in less than 60 seconds
+    And the object on the "s3" s3 server is "LH_O#L|FD<FASD{FO#@$#$%^ \"#\"$L%:\"@#$L\":test_data#$#%#$%?{\"F{"
+    And the object content type on the "s3" s3 server is "application/octet-stream" and the object metadata matches use metadata
+
+  Scenario: A MiNiFi instance transfers encoded data through a http proxy to s3
+    Given a GetFile processor with the "Input Directory" property set to "/tmp/input"
+    And a file with the content "LH_O#L|FD<FASD{FO#@$#$%^ \"#\"$L%:\"@#$L\":test_data#$#%#$%?{\"F{" is present in "/tmp/input"
+    And a PutS3Object processor set up to communicate with an s3 server
+    And these processor properties are set to match the http proxy:
+    | processor name  | property name  | property value |
+    | PutS3Object     | Proxy Host     | http-proxy     |
+    | PutS3Object     | Proxy Port     | 3128           |
+    | PutS3Object     | Proxy Username | admin          |
+    | PutS3Object     | Proxy Password | test101        |
+    And a PutFile processor with the "Directory" property set to "/tmp/output"
+    And the "success" relationship of the GetFile processor is connected to the PutS3Object
+    And the "success" relationship of the PutS3Object processor is connected to the PutFile
+
+    And a s3 server "s3" is set up in correspondence with the PutS3Object
+    And the http proxy server "http-proxy" is set up 
+    When all instances start up
+
+    Then a flowfile with the content "test" is placed in the monitored directory in less than 90 seconds
+    And the object on the "s3" s3 server is "LH_O#L|FD<FASD{FO#@$#$%^ \"#\"$L%:\"@#$L\":test_data#$#%#$%?{\"F{"
+    And the object content type on the "s3" s3 server is "application/octet-stream" and the object metadata matches use metadata
+    And no errors were generated on the "http-proxy" regarding "http://s3-server:9090/test_bucket/test_object_key"
+
+  Scenario: A MiNiFi instance can remove s3 bucket objects
+    Given a GetFile processor with the "Input Directory" property set to "/tmp/input"
+    And a file with the content "LH_O#L|FD<FASD{FO#@$#$%^ \"#\"$L%:\"@#$L\":test_data#$#%#$%?{\"F{" is present in "/tmp/input"
+    And a PutS3Object processor set up to communicate with an s3 server
+    And a DeleteS3Object processor set up to communicate with the same s3 server
+    And a PutFile processor with the "Directory" property set to "/tmp/output"
+    And the processors are connected up as described here:
+      | source name    | relationship name | destination name |
+      | GetFile        | success           | PutS3Object      |
+      | PutS3Object    | success           | DeleteS3Object   |
+      | DeleteS3Object | success           | PutFile          |
+
+    And a s3 server "s3" is set up in correspondence with the PutS3Object
+
+    When both instances start up
+
+    Then a flowfile with the content "test" is placed in the monitored directory in less than 120 seconds
+    And the object bucket on the "s3" s3 server is empty
+
+  Scenario: Deletion of a non-existent s3 object succeeds
+    Given a GetFile processor with the "Input Directory" property set to "/tmp/input"
+    And a file with the content "test" is present in "/tmp/input"
+    And a DeleteS3Object processor set up to communicate with an s3 server
+    And a PutFile processor with the "Directory" property set to "/tmp/output"
+    And the "success" relationship of the GetFile processor is connected to the DeleteS3Object
+    And the "success" relationship of the DeleteS3Object processor is connected to the PutFile
+
+    And a s3 server "s3" is set up in correspondence with the DeleteS3Object
+
+    When both instances start up
+
+    Then a flowfile with the content "test" is placed in the monitored directory in less than 120 seconds
+    And the object bucket on the "s3" s3 server is empty
+
+  Scenario: Deletion of a s3 object through a proxy-server succeeds
+    Given a GetFile processor with the "Input Directory" property set to "/tmp/input"
+    And a file with the content "LH_O#L|FD<FASD{FO#@$#$%^ \"#\"$L%:\"@#$L\":test_data#$#%#$%?{\"F{" is present in "/tmp/input"
+    And a PutS3Object processor set up to communicate with an s3 server
+    And a DeleteS3Object processor set up to communicate with the same s3 server
+    And a PutFile processor with the "Directory" property set to "/tmp/output"
+    And these processor properties are set to match the http proxy:
+      | processor name  | property name  | property value |
+      | DeleteS3Object     | Proxy Host     | http-proxy     |
+      | DeleteS3Object     | Proxy Port     | 3128           |
+      | DeleteS3Object     | Proxy Username | admin          |
+      | DeleteS3Object     | Proxy Password | test101        |
+    And the processors are connected up as described here:
+      | source name    | relationship name | destination name |
+      | GetFile        | success           | PutS3Object      |
+      | PutS3Object    | success           | DeleteS3Object   |
+      | DeleteS3Object | success           | PutFile          |
+
+    And a s3 server "s3" is set up in correspondence with the PutS3Object
+    And the http proxy server "http-proxy" is set up 
+
+    When all instances start up
+
+    Then a flowfile with the content "test" is placed in the monitored directory in less than 120 seconds
+    And the object bucket on the "s3" s3 server is empty
+    And no errors were generated on the "http-proxy" regarding "http://s3-server:9090/test_bucket/test_object_key"
+
+  Scenario: A MiNiFi instance can download s3 bucket objects directly
+    Given a GetFile processor with the "Input Directory" property set to "/tmp/input"
+    And a file with the content "test" is present in "/tmp/input"
+    And a PutS3Object processor set up to communicate with an s3 server
+    And the "success" relationship of the GetFile processor is connected to the PutS3Object
+
+    Given a GenerateFlowFile processor with the "File Size" property set to "1 kB" in a "secondary" flow
+    And a FetchS3Object processor set up to communicate with the same s3 server
+    And a PutFile processor with the "Directory" property set to "/tmp/output"
+    And the processors are connected up as described here:
+      | source name      | relationship name | destination name |
+      | GenerateFlowFile | success           | FetchS3Object    |
+      | FetchS3Object    | success           | PutFile          |
+
+    And a s3 server "s3" is set up in correspondence with the PutS3Object
+
+    When all instances start up
+
+    Then a flowfile with the content "test" is placed in the monitored directory in less than 120 seconds
+
+  Scenario: A MiNiFi instance can download s3 bucket objects via a http-proxy
+    Given a GetFile processor with the "Input Directory" property set to "/tmp/input"
+    And a file with the content "test" is present in "/tmp/input"
+    And a PutS3Object processor set up to communicate with an s3 server
+    And the "success" relationship of the GetFile processor is connected to the PutS3Object
+
+    Given a GenerateFlowFile processor with the "File Size" property set to "1 kB" in a "secondary" flow
+    And a FetchS3Object processor set up to communicate with the same s3 server
+    And these processor properties are set to match the http proxy:
+      | processor name | property name  | property value |
+      | FetchS3Object  | Proxy Host     | http-proxy     |
+      | FetchS3Object  | Proxy Port     | 3128           |
+      | FetchS3Object  | Proxy Username | admin          |
+      | FetchS3Object  | Proxy Password | test101        |
+    And a PutFile processor with the "Directory" property set to "/tmp/output"
+    And the processors are connected up as described here:
+      | source name      | relationship name | destination name |
+      | GenerateFlowFile | success           | FetchS3Object    |
+      | FetchS3Object    | success           | PutFile          |
+
+    And a s3 server "s3" is set up in correspondence with the PutS3Object
+    And a http proxy server "http-proxy" is set up accordingly 
+
+    When all instances start up
+
+    Then a flowfile with the content "test" is placed in the monitored directory in less than 120 seconds
+    And no errors were generated on the "http-proxy" regarding "http://s3-server:9090/test_bucket/test_object_key"
diff --git a/docker/test/integration/minifi/__init__.py b/docker/test/integration/minifi/__init__.py
index 68d136a..e69de29 100644
--- a/docker/test/integration/minifi/__init__.py
+++ b/docker/test/integration/minifi/__init__.py
@@ -1,69 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the \"License\"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#     http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an \"AS IS\" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-import gzip
-import logging
-import tarfile
-import uuid
-import xml.etree.cElementTree as elementTree
-from xml.etree.cElementTree import Element
-from io import StringIO
-from io import BytesIO
-from textwrap import dedent
-
-import docker
-import os
-import yaml
-from copy import copy
-
-import time
-from collections import OrderedDict
-
-from .core.Connectable import Connectable
-from .core.Cluster import Cluster
-from .core.Connectable import Connectable
-from .core.ControllerService import ControllerService
-from .core.InputPort import InputPort
-from .core.Processor import Processor
-from .core.RemoteProcessGroup import RemoteProcessGroup
-from .core.SingleNodeDockerCluster import SingleNodeDockerCluster
-from .core.SSLContextService import SSLContextService
-from .core.DockerTestCluster import DockerTestCluster
-from .core.OutputEventHandler import OutputEventHandler
-
-from .flow_serialization.Minifi_flow_yaml_serializer import Minifi_flow_yaml_serializer
-from .flow_serialization.Nifi_flow_xml_serializer import Nifi_flow_xml_serializer
-
-from .processors.GenerateFlowFile import GenerateFlowFile
-from .processors.GetFile import GetFile
-from .processors.InvokeHTTP import InvokeHTTP
-from .processors.ListenHTTP import ListenHTTP
-from .processors.LogAttribute import LogAttribute
-from .processors.PublishKafka import PublishKafka
-from .processors.PublishKafkaSSL import PublishKafkaSSL
-from .processors.PutFile import PutFile
-from .processors.PutS3Object import PutS3Object
-from .processors.DeleteS3Object import DeleteS3Object
-from .processors.FetchS3Object import FetchS3Object
-
-from .validators.OutputValidator import OutputValidator
-from .validators.EmptyFilesOutPutValidator import EmptyFilesOutPutValidator
-from .validators.SegfaultValidator import SegfaultValidator
-from .validators.NoFileOutPutValidator import NoFileOutPutValidator
-from .validators.SingleFileOutputValidator import SingleFileOutputValidator
-from .validators.FileOutputValidator import FileOutputValidator
-
-logging.basicConfig(level=logging.DEBUG)
-
-
diff --git a/docker/test/integration/minifi/core/Cluster.py b/docker/test/integration/minifi/core/Cluster.py
index 0818f37..db69ec9 100644
--- a/docker/test/integration/minifi/core/Cluster.py
+++ b/docker/test/integration/minifi/core/Cluster.py
@@ -5,7 +5,7 @@
     Docker swarms, or cloud compute/container services.
     """
 
-    def deploy_flow(self, flow, name=None, vols=None):
+    def deploy_flow(self, name=None):
         """
         Deploys a flow to the cluster.
         """
diff --git a/docker/test/integration/minifi/core/Connectable.py b/docker/test/integration/minifi/core/Connectable.py
index c3472a7..6af3a7d 100644
--- a/docker/test/integration/minifi/core/Connectable.py
+++ b/docker/test/integration/minifi/core/Connectable.py
@@ -9,7 +9,7 @@
         self.uuid = uuid.uuid4()
 
         if name is None:
-            self.name = str(self.uuid)
+            self.name = "node_of_" + str(self.uuid)
         else:
             self.name = name
 
@@ -37,41 +37,14 @@
 
         return self
 
-    def __rshift__(self, other):
-        """
-        Right shift operator to support flow DSL, for example:
+    def get_name(self):
+        return self.name
 
-            GetFile('/input') >> LogAttribute() >> PutFile('/output')
+    def set_name(self, name):
+        self.name = name
 
-        """
+    def get_uuid(self):
+        return self.uuid
 
-        connected = copy(self)
-        connected.connections = copy(self.connections)
-
-        if self.out_proc is self:
-            connected.out_proc = connected
-        else:
-            connected.out_proc = copy(connected.out_proc)
-
-        if isinstance(other, tuple):
-            if isinstance(other[0], tuple):
-                for rel_tuple in other:
-                    rel = {rel_tuple[0]: rel_tuple[1]}
-                    connected.out_proc.connect(rel)
-            else:
-                rel = {other[0]: other[1]}
-                connected.out_proc.connect(rel)
-        else:
-            connected.out_proc.connect({'success': other})
-            connected.out_proc = other
-
-        return connected
-
-    def __invert__(self):
-        """
-        Invert operation to set empty file filtering on incoming connections
-        GetFile('/input') >> ~LogAttribute()
-        """
-        self.drop_empty_flowfiles = True
-
-        return self
+    def set_uuid(self, uuid):
+        self.uuid = uuid
diff --git a/docker/test/integration/minifi/core/ControllerService.py b/docker/test/integration/minifi/core/ControllerService.py
index d8b4e17..6263ff5 100644
--- a/docker/test/integration/minifi/core/ControllerService.py
+++ b/docker/test/integration/minifi/core/ControllerService.py
@@ -1,4 +1,5 @@
 import uuid
+import logging
 
 class ControllerService(object):
     def __init__(self, name=None, properties=None):
diff --git a/docker/test/integration/minifi/core/DockerTestCluster.py b/docker/test/integration/minifi/core/DockerTestCluster.py
index 34b4bbc..772558a 100644
--- a/docker/test/integration/minifi/core/DockerTestCluster.py
+++ b/docker/test/integration/minifi/core/DockerTestCluster.py
@@ -7,80 +7,19 @@
 import time
 import uuid
 
-from os.path import join
-from threading import Event
-from watchdog.events import FileSystemEventHandler
-from watchdog.observers import Observer
-
-from .OutputEventHandler import OutputEventHandler
 from .SingleNodeDockerCluster import SingleNodeDockerCluster
-from ..validators.FileOutputValidator import FileOutputValidator
 from .utils import retry_check
-
+from .FileSystemObserver import FileSystemObserver
 
 class DockerTestCluster(SingleNodeDockerCluster):
-    def __init__(self, output_validator):
-
-        # Create test input/output directories
-        test_cluster_id = str(uuid.uuid4())
-
+    def __init__(self):
         self.segfault = False
 
-        self.tmp_test_output_dir = '/tmp/.nifi-test-output.' + test_cluster_id
-        self.tmp_test_input_dir = '/tmp/.nifi-test-input.' + test_cluster_id
-        self.tmp_test_resources_dir = '/tmp/.nifi-test-resources.' + test_cluster_id
-
-        logging.info('Creating tmp test input dir: %s', self.tmp_test_input_dir)
-        os.makedirs(self.tmp_test_input_dir)
-        logging.info('Creating tmp test output dir: %s', self.tmp_test_output_dir)
-        os.makedirs(self.tmp_test_output_dir)
-        logging.info('Creating tmp test resource dir: %s', self.tmp_test_resources_dir)
-        os.makedirs(self.tmp_test_resources_dir)
-        os.chmod(self.tmp_test_output_dir, 0o777)
-        os.chmod(self.tmp_test_input_dir, 0o777)
-        os.chmod(self.tmp_test_resources_dir, 0o777)
-
-        # Add resources
-        test_dir = os.environ['PYTHONPATH'].split(':')[-1] # Based on DockerVerify.sh
-        shutil.copytree(test_dir + "/resources/kafka_broker/conf/certs", self.tmp_test_resources_dir + "/certs")
-
-        # Point output validator to ephemeral output dir
-        self.output_validator = output_validator
-        if isinstance(output_validator, FileOutputValidator):
-            output_validator.set_output_dir(self.tmp_test_output_dir)
-
-        # Start observing output dir
-        self.done_event = Event()
-        self.event_handler = OutputEventHandler(self.output_validator, self.done_event)
-        self.observer = Observer()
-        self.observer.schedule(self.event_handler, self.tmp_test_output_dir)
-        self.observer.start()
-
         super(DockerTestCluster, self).__init__()
 
+    def deploy_flow(self):
 
-
-    def deploy_flow(self,
-                    flow,
-                    name=None,
-                    vols=None,
-                    engine='minifi-cpp'):
-        """
-        Performs a standard container flow deployment with the addition
-        of volumes supporting test input/output directories.
-        """
-
-        if vols is None:
-            vols = {}
-
-        vols[self.tmp_test_input_dir] = {'bind': '/tmp/input', 'mode': 'rw'}
-        vols[self.tmp_test_output_dir] = {'bind': '/tmp/output', 'mode': 'rw'}
-        vols[self.tmp_test_resources_dir] = {'bind': '/tmp/resources', 'mode': 'rw'}
-
-        super(DockerTestCluster, self).deploy_flow(flow,
-                                                   vols=vols,
-                                                   name=name,
-                                                   engine=engine)
+        super(DockerTestCluster, self).deploy_flow()
 
     def start_flow(self, name):
         container = self.containers[name]
@@ -102,80 +41,54 @@
             return True
         return False
 
-    def put_test_data(self, contents):
-        """
-        Creates a randomly-named file in the test input dir and writes
-        the given content to it.
-        """
-
-        self.test_data = contents
-        file_name = str(uuid.uuid4())
-        file_abs_path = join(self.tmp_test_input_dir, file_name)
-        self.put_file_contents(contents.encode('utf-8'), file_abs_path)
-
-    def put_test_resource(self, file_name, contents):
-        """
-        Creates a resource file in the test resource dir and writes
-        the given content to it.
-        """
-
-        file_abs_path = join(self.tmp_test_resources_dir, file_name)
-        self.put_file_contents(contents, file_abs_path)
-
-    def restart_observer_if_needed(self):
-        if self.observer.is_alive():
-            return
-
-        self.observer = Observer()
-        self.done_event.clear()
-        self.observer.schedule(self.event_handler, self.tmp_test_output_dir)
-        self.observer.start()
-
-    def wait_for_output(self, timeout_seconds):
-        logging.info('Waiting up to %d seconds for test output...', timeout_seconds)
-        self.restart_observer_if_needed()
-        self.done_event.wait(timeout_seconds)
-        self.observer.stop()
-        self.observer.join()
-
-    def log_nifi_output(self):
-
+    def get_app_log(self):
         for container in self.containers.values():
             container = self.client.containers.get(container.id)
-            logging.info('Container logs for container \'%s\':\n%s', container.name, container.logs().decode("utf-8"))
             if b'Segmentation fault' in container.logs():
                 logging.warn('Container segfaulted: %s', container.name)
                 self.segfault=True
             if container.status == 'running':
-                apps = [("MiNiFi", self.minifi_root + '/logs/minifi-app.log'), ("NiFi", self.nifi_root + '/logs/nifi-app.log')]
+                apps = [("MiNiFi", self.minifi_root + '/logs/minifi-app.log'), ("NiFi", self.nifi_root + '/logs/nifi-app.log'), ("Kafka", self.kafka_broker_root + '/logs/server.log')]
                 for app in apps:
                     app_log_status, app_log = container.exec_run('/bin/sh -c \'cat ' + app[1] + '\'')
                     if app_log_status == 0:
                         logging.info('%s app logs for container \'%s\':\n', app[0], container.name)
-                        for line in app_log.decode("utf-8").splitlines():
-                            logging.info(line)
+                        return app_log
                         break
                 else:
                     logging.warning("The container is running, but none of %s logs were found", " or ".join([x[0] for x in apps]))
-
             else:
                 logging.info(container.status)
-                logging.info('Could not cat app logs for container \'%s\' because it is not running',
-                             container.name)
-            stats = container.stats(stream=False)
-            logging.info('Container stats:\n%s', stats)
+                logging.info('Could not cat app logs for container \'%s\' because it is not running', container.name)
+        return None
 
-    def check_output(self, timeout=10, subdir=''):
-        """
-        Wait for flow output, validate it, and log minifi output.
-        """
-        if subdir:
-            self.output_validator.subdir = subdir
-        self.wait_for_output(timeout)
-        self.log_nifi_output()
-        if self.segfault:
-            return False
-        return self.output_validator.validate()
+    def wait_for_app_logs(self, log, timeout_seconds, count=1):
+        wait_start_time = time.perf_counter()
+        for container_name, container in self.containers.items():
+            logging.info('Waiting for app-logs `%s` in container `%s`', log, container_name)
+            while (time.perf_counter() - wait_start_time) < timeout_seconds:
+                logs = self.get_app_log()
+                if logs is not None and count <= logs.decode("utf-8").count(log):
+                    return True
+                if logs is not None:
+                    for line in logs.decode("utf-8").splitlines():
+                        logging.info("App-log: %s", line)
+                time.sleep(1)
+        return False
+
+    def log_nifi_output(self):
+        app_log = self.get_app_log()
+        if app_log is None:
+            return
+        for line in app_log.decode("utf-8").splitlines():
+            logging.info(line)
+
+    def check_minifi_container_started(self):
+        for container in self.containers.values():
+            container = self.client.containers.get(container.id)
+            if b'Segmentation fault' in container.logs():
+                logging.warn('Container segfaulted: %s', container.name)
+                raise Exception("Container failed to start up.")
 
     def check_http_proxy_access(self, url):
         output = subprocess.check_output(["docker", "exec", "http-proxy", "cat", "/var/log/squid/access.log"]).decode(sys.stdout.encoding)
@@ -185,10 +98,10 @@
              output.count("TCP_DENIED/407") == 0 and "TCP_MISS" in output)
 
     @retry_check()
-    def check_s3_server_object_data(self):
+    def check_s3_server_object_data(self, test_data):
         s3_mock_dir = subprocess.check_output(["docker", "exec", "s3-server", "find", "/tmp/", "-type", "d", "-name", "s3mock*"]).decode(sys.stdout.encoding).strip()
         file_data = subprocess.check_output(["docker", "exec", "s3-server", "cat", s3_mock_dir + "/test_bucket/test_object_key/fileData"]).decode(sys.stdout.encoding)
-        return file_data == self.test_data
+        return file_data == test_data
 
     @retry_check()
     def check_s3_server_object_metadata(self, content_type="application/octet-stream", metadata=dict()):
@@ -203,38 +116,17 @@
         ls_result = subprocess.check_output(["docker", "exec", "s3-server", "ls", s3_mock_dir + "/test_bucket/"]).decode(sys.stdout.encoding)
         return not ls_result
 
-    def rm_out_child(self, dir):
-        logging.info('Removing %s from output folder', os.path.join(self.tmp_test_output_dir, dir))
-        shutil.rmtree(os.path.join(self.tmp_test_output_dir, dir))
-
     def wait_for_container_logs(self, container_name, log, timeout, count=1):
         logging.info('Waiting for logs `%s` in container `%s`', log, container_name)
         container = self.containers[container_name]
         check_count = 0
         while check_count <= timeout:
-            if container.logs().decode("utf-8").count(log) == count:
+            if count <= container.logs().decode("utf-8").count(log):
                 return True
             else:
                 check_count += 1
                 time.sleep(1)
         return False
 
-    def put_file_contents(self, contents, file_abs_path):
-        logging.info('Writing %d bytes of content to file: %s', len(contents), file_abs_path)
-        with open(file_abs_path, 'wb') as test_input_file:
-            test_input_file.write(contents)
-
-
-    def __exit__(self, exc_type, exc_val, exc_tb):
-        """
-        Clean up ephemeral test resources.
-        """
-
-        logging.info('Removing tmp test input dir: %s', self.tmp_test_input_dir)
-        shutil.rmtree(self.tmp_test_input_dir)
-        logging.info('Removing tmp test output dir: %s', self.tmp_test_output_dir)
-        shutil.rmtree(self.tmp_test_output_dir)
-        logging.info('Removing tmp test resources dir: %s', self.tmp_test_output_dir)
-        shutil.rmtree(self.tmp_test_resources_dir)
-
-        super(DockerTestCluster, self).__exit__(exc_type, exc_val, exc_tb)
+    def segfault_happened(self):
+        return self.segfault
diff --git a/docker/test/integration/minifi/core/DockerTestDirectoryBindings.py b/docker/test/integration/minifi/core/DockerTestDirectoryBindings.py
new file mode 100644
index 0000000..d9b6a2b
--- /dev/null
+++ b/docker/test/integration/minifi/core/DockerTestDirectoryBindings.py
@@ -0,0 +1,104 @@
+import logging
+import os
+import shutil
+
+class DockerTestDirectoryBindings:
+    def __init__(self):
+        self.data_directories = {}
+
+    def __del__(self):
+        self.delete_data_directories()
+
+    def create_new_data_directories(self, test_id):
+        self.data_directories[test_id] = {
+            "input_dir": "/tmp/.nifi-test-input." + test_id,
+            "output_dir": "/tmp/.nifi-test-output." + test_id,
+            "resources_dir": "/tmp/.nifi-test-resources." + test_id
+        }
+
+        [self.create_directory(directory) for directory in self.data_directories[test_id].values()]
+
+        # Add resources
+        test_dir = os.environ['PYTHONPATH'].split(':')[-1] # Based on DockerVerify.sh
+        shutil.copytree(test_dir + "/resources/kafka_broker/conf/certs", self.data_directories[test_id]["resources_dir"] + "/certs")
+
+    def get_data_directories(self, test_id):
+        return self.data_directories[test_id]
+
+    def docker_path_to_local_path(self, test_id, docker_path):
+        # Docker paths are currently hard-coded
+        if docker_path == "/tmp/input":
+            return self.data_directories[test_id]["input_dir"]
+        if docker_path == "/tmp/output":
+            return self.data_directories[test_id]["output_dir"]
+        if docker_path == "/tmp/resources":
+            return self.data_directories[test_id]["resources_dir"]
+        # Might be worth reworking these
+        if docker_path == "/tmp/output/success":
+            self.create_directory(self.data_directories[test_id]["output_dir"] + "/success")
+            return self.data_directories[test_id]["output_dir"] + "/success"
+        if docker_path == "/tmp/output/failure":
+            self.create_directory(self.data_directories[test_id]["output_dir"] + "/failure")
+            return self.data_directories[test_id]["output_dir"] + "/failure"
+        raise Exception("Docker directory \"%s\" has no preset bindings." % docker_path)
+
+    def get_directory_bindings(self, test_id):
+        """
+        Performs a standard container flow deployment with the addition
+        of volumes supporting test input/output directories.
+        """
+        vols = {}
+        vols[self.data_directories[test_id]["input_dir"]] = {"bind": "/tmp/input", "mode": "rw"}
+        vols[self.data_directories[test_id]["output_dir"]] = {"bind": "/tmp/output", "mode": "rw"}
+        vols[self.data_directories[test_id]["resources_dir"]] = {"bind": "/tmp/resources", "mode": "rw"}
+        return vols
+
+    @staticmethod
+    def create_directory(dir):
+        logging.info("Creating tmp dir: %s", dir)
+        os.makedirs(dir)
+        os.chmod(dir, 0o777)
+
+    @staticmethod
+    def delete_tmp_directory(dir):
+        assert dir.startswith("/tmp/")
+        if not dir.endswith("/"):
+            dir = dir + "/"
+        # Sometimes rmtree does clean not up as expected, setting ignore_errors does not help either
+        shutil.rmtree(dir, ignore_errors=True)
+
+    def delete_data_directories(self):
+        for directories in self.data_directories.values():
+            for directory in directories.values():
+                self.delete_tmp_directory(directory)
+
+    @staticmethod
+    def put_file_contents(file_abs_path, contents):
+        logging.info('Writing %d bytes of content to file: %s', len(contents), file_abs_path)
+        with open(file_abs_path, 'wb') as test_input_file:
+            test_input_file.write(contents)
+
+    def put_test_resource(self, test_id, file_name, contents):
+        """
+        Creates a resource file in the test resource dir and writes
+        the given content to it.
+        """
+
+        file_abs_path = os.path.join(self.data_directories[test_id]["resources_dir"], file_name)
+        self.put_file_contents(file_abs_path, contents)
+
+    def put_test_input(self, test_id, file_name, contents):
+        file_abs_path = os.path.join(self.data_directories[test_id]["input_dir"], file_name)
+        self.put_file_contents(file_abs_path, contents)
+
+    def put_file_to_docker_path(self, test_id, path, file_name, contents):
+        file_abs_path = os.path.join(self.docker_path_to_local_path(test_id, path), file_name)
+        self.put_file_contents(file_abs_path, contents)
+
+    def get_out_subdir(self, test_id, dir):
+        return os.path.join(self.data_directories[test_id]["output_dir"], dir)
+
+    def rm_out_child(self, test_id, dir):
+        child = os.path.join(self.data_directories[test_id]["output_dir"], dir)
+        logging.info('Removing %s from output folder', child)
+        shutil.rmtree(child)
diff --git a/docker/test/integration/minifi/core/FileSystemObserver.py b/docker/test/integration/minifi/core/FileSystemObserver.py
new file mode 100644
index 0000000..b4c0f7f
--- /dev/null
+++ b/docker/test/integration/minifi/core/FileSystemObserver.py
@@ -0,0 +1,46 @@
+import logging
+import time
+from threading import Event
+
+from watchdog.events import FileSystemEventHandler
+from watchdog.observers import Observer
+
+from .OutputEventHandler import OutputEventHandler
+from ..validators.FileOutputValidator import FileOutputValidator
+
+class FileSystemObserver(object):
+    def __init__(self, test_output_dir):
+
+        self.test_output_dir = test_output_dir
+
+        # Start observing output dir
+        self.done_event = Event()
+        self.event_handler = OutputEventHandler(self.done_event)
+        self.observer = Observer()
+        self.observer.schedule(self.event_handler, self.test_output_dir)
+        self.observer.start()
+
+    def get_output_dir(self):
+        return self.test_output_dir
+
+    def restart_observer_if_needed(self):
+        if self.observer.is_alive():
+            return
+
+        self.observer = Observer()
+        self.done_event.clear()
+        self.observer.schedule(self.event_handler, self.test_output_dir)
+        self.observer.start()
+
+    def wait_for_output(self, timeout_seconds, max_files):
+        logging.info('Waiting up to %d seconds for test output...', timeout_seconds)
+        self.restart_observer_if_needed()
+        wait_start_time = time.perf_counter()
+        for i in range(0, max_files):
+            # Note: The timing on Event.wait() is inaccurate
+            self.done_event.wait(timeout_seconds)
+            current_time = time.perf_counter()
+            if timeout_seconds < (current_time - wait_start_time):
+                break
+        self.observer.stop()
+        self.observer.join()
diff --git a/docker/test/integration/minifi/core/OutputEventHandler.py b/docker/test/integration/minifi/core/OutputEventHandler.py
index 3d4c984..f505695 100644
--- a/docker/test/integration/minifi/core/OutputEventHandler.py
+++ b/docker/test/integration/minifi/core/OutputEventHandler.py
@@ -3,22 +3,15 @@
 from watchdog.events import FileSystemEventHandler
 
 class OutputEventHandler(FileSystemEventHandler):
-    def __init__(self, validator, done_event):
-        self.validator = validator
+    def __init__(self, done_event):
         self.done_event = done_event
 
     def on_created(self, event):
         logging.info('Output file created: ' + event.src_path)
-        self.check(event)
+        self.done_event.set()
 
     def on_modified(self, event):
         logging.info('Output file modified: ' + event.src_path)
-        self.check(event)
 
-    def check(self, event):
-        if self.validator.validate():
-            logging.info('Output file is valid')
-            self.done_event.set()
-        else:
-            logging.info('Output file is invalid')
-
+    def on_deleted(self, event):
+        logging.info('Output file modified: ' + event.src_path)
diff --git a/docker/test/integration/minifi/core/Processor.py b/docker/test/integration/minifi/core/Processor.py
index e7e41e5..ddc2a9d 100644
--- a/docker/test/integration/minifi/core/Processor.py
+++ b/docker/test/integration/minifi/core/Processor.py
@@ -1,5 +1,7 @@
 from .Connectable import Connectable
 
+import logging
+
 class Processor(Connectable):
     def __init__(self,
                  clazz,
@@ -37,6 +39,15 @@
         }
         self.schedule.update(schedule)
 
+    def set_property(self, key, value):
+        if value.isdigit():
+            self.properties[key] = int(value)
+        else:
+            self.properties[key] = value
+
+    def set_scheduling_period(self, value):
+        self.schedule['scheduling period'] = value
+
     def nifi_property_key(self, key):
         """
         Returns the Apache NiFi-equivalent property key for the given key. This is often, but not always, the same as
diff --git a/docker/test/integration/minifi/core/RemoteProcessGroup.py b/docker/test/integration/minifi/core/RemoteProcessGroup.py
index 6901fb6..6132ad4 100644
--- a/docker/test/integration/minifi/core/RemoteProcessGroup.py
+++ b/docker/test/integration/minifi/core/RemoteProcessGroup.py
@@ -1,8 +1,7 @@
 import uuid
 
 class RemoteProcessGroup(object):
-    def __init__(self, url,
-                 name=None):
+    def __init__(self, url, name=None):
         self.uuid = uuid.uuid4()
 
         if name is None:
@@ -11,3 +10,10 @@
             self.name = name
 
         self.url = url
+
+
+    def get_name(self):
+    	return self.name
+
+    def get_uuid(self):
+    	return self.uuid
diff --git a/docker/test/integration/minifi/core/SSL_cert_utils.py b/docker/test/integration/minifi/core/SSL_cert_utils.py
new file mode 100644
index 0000000..c2461c3
--- /dev/null
+++ b/docker/test/integration/minifi/core/SSL_cert_utils.py
@@ -0,0 +1,54 @@
+import time
+import logging
+
+from M2Crypto import X509, EVP, RSA, ASN1
+
+def gen_cert():
+    """
+    Generate TLS certificate request for testing
+    """
+
+    req, key = gen_req()
+    pub_key = req.get_pubkey()
+    subject = req.get_subject()
+    cert = X509.X509()
+    # noinspection PyTypeChecker
+    cert.set_serial_number(1)
+    cert.set_version(2)
+    cert.set_subject(subject)
+    t = int(time.time())
+    now = ASN1.ASN1_UTCTIME()
+    now.set_time(t)
+    now_plus_year = ASN1.ASN1_UTCTIME()
+    now_plus_year.set_time(t + 60 * 60 * 24 * 365)
+    cert.set_not_before(now)
+    cert.set_not_after(now_plus_year)
+    issuer = X509.X509_Name()
+    issuer.C = 'US'
+    issuer.CN = 'minifi-listen'
+    cert.set_issuer(issuer)
+    cert.set_pubkey(pub_key)
+    cert.sign(key, 'sha256')
+
+    return cert, key
+
+def rsa_gen_key_callback():
+    pass
+
+def gen_req():
+    """
+    Generate TLS certificate request for testing
+    """
+
+    logging.info('Generating test certificate request')
+    key = EVP.PKey()
+    req = X509.Request()
+    rsa = RSA.gen_key(1024, 65537, rsa_gen_key_callback)
+    key.assign_rsa(rsa)
+    req.set_pubkey(key)
+    name = req.get_subject()
+    name.C = 'US'
+    name.CN = 'minifi-listen'
+    req.sign(key, 'sha256')
+
+    return req, key
diff --git a/docker/test/integration/minifi/core/SingleNodeDockerCluster.py b/docker/test/integration/minifi/core/SingleNodeDockerCluster.py
index f48c288..b4dede7 100644
--- a/docker/test/integration/minifi/core/SingleNodeDockerCluster.py
+++ b/docker/test/integration/minifi/core/SingleNodeDockerCluster.py
@@ -3,6 +3,7 @@
 import logging
 import os
 import tarfile
+import time
 import uuid
 
 from collections import OrderedDict
@@ -22,8 +23,13 @@
     def __init__(self):
         self.minifi_version = os.environ['MINIFI_VERSION']
         self.nifi_version = '1.7.0'
+        self.engine = 'minifi-cpp'
+        self.flow = None
+        self.name = None
+        self.vols = {}
         self.minifi_root = '/opt/minifi/nifi-minifi-cpp-' + self.minifi_version
         self.nifi_root = '/opt/nifi/nifi-' + self.nifi_version
+        self.kafka_broker_root = '/opt/kafka'
         self.network = None
         self.containers = OrderedDict()
         self.images = []
@@ -32,44 +38,84 @@
         # Get docker client
         self.client = docker.from_env()
 
-    def deploy_flow(self,
-                    flow,
-                    name=None,
-                    vols=None,
-                    engine='minifi-cpp'):
+    def __del__(self):
+        """
+        Clean up ephemeral cluster resources
+        """
+
+        # Containers and networks are expected to be freed outside of this function
+
+        # Clean up images
+        for image in reversed(self.images):
+            logging.info('Cleaning up image: %s', image[0].id)
+            self.client.images.remove(image[0].id, force=True)
+
+        # Clean up tmp files
+        for tmp_file in self.tmp_files:
+            os.remove(tmp_file)
+
+    def set_name(self, name):
+        self.name = name
+
+    def get_name(self):
+        return self.name
+
+    def set_engine(self, engine):
+        self.engine = engine
+
+    def get_engine(self):
+        return self.engine
+
+    def get_flow(self):
+        return self.flow
+
+    def set_flow(self, flow):
+        self.flow = flow
+
+    def set_directory_bindings(self, bindings):
+        self.vols = bindings
+
+    @staticmethod
+    def create_docker_network():
+        net_name = 'minifi_integration_test_network-' + str(uuid.uuid4())
+        logging.info('Creating network: %s', net_name)
+        return docker.from_env().networks.create(net_name)
+
+    def set_network(self, network):
+        self.network = network
+
+    def deploy_flow(self):
         """
         Compiles the flow to a valid config file and overlays it into a new image.
         """
 
-        if vols is None:
-            vols = {}
+        if self.vols is None:
+            self.vols = {}
 
-        logging.info('Deploying %s flow...%s', engine,name)
+        if self.name is None:
+            self.name = self.engine + '-' + str(uuid.uuid4())
+            logging.info('Flow name was not provided; using generated name \'%s\'', self.name)
 
-        if name is None:
-            name = engine + '-' + str(uuid.uuid4())
-            logging.info('Flow name was not provided; using generated name \'%s\'', name)
+        logging.info('Deploying %s flow \"%s\"...', self.engine, self.name)
 
         # Create network if necessary
         if self.network is None:
-            net_name = 'nifi-' + str(uuid.uuid4())
-            logging.info('Creating network: %s', net_name)
-            self.network = self.client.networks.create(net_name)
+            self.set_network(self.create_docker_network())
 
-        if engine == 'nifi':
-            self.deploy_nifi_flow(flow, name, vols)
-        elif engine == 'minifi-cpp':
-            self.deploy_minifi_cpp_flow(flow, name, vols)
-        elif engine == 'kafka-broker':
-            self.deploy_kafka_broker(name)
-        elif engine == 'http-proxy':
+        if self.engine == 'nifi':
+            self.deploy_nifi_flow()
+        elif self.engine == 'minifi-cpp':
+            self.deploy_minifi_cpp_flow()
+        elif self.engine == 'kafka-broker':
+            self.deploy_kafka_broker()
+        elif self.engine == 'http-proxy':
             self.deploy_http_proxy()
-        elif engine == 's3-server':
+        elif self.engine == 's3-server':
             self.deploy_s3_server()
         else:
-            raise Exception('invalid flow engine: \'%s\'' % engine)
+            raise Exception('invalid flow engine: \'%s\'' % self.engine)
 
-    def deploy_minifi_cpp_flow(self, flow, name, vols):
+    def deploy_minifi_cpp_flow(self):
 
         # Build configured image
         dockerfile = dedent("""FROM {base_image}
@@ -77,12 +123,12 @@
                 ADD config.yml {minifi_root}/conf/config.yml
                 RUN chown minificpp:minificpp {minifi_root}/conf/config.yml
                 USER minificpp
-                """.format(name=name,hostname=name,
+                """.format(name=self.name,hostname=self.name,
                            base_image='apacheminificpp:' + self.minifi_version,
                            minifi_root=self.minifi_root))
 
         serializer = Minifi_flow_yaml_serializer()
-        test_flow_yaml = serializer.serialize(flow)
+        test_flow_yaml = serializer.serialize(self.flow)
         logging.info('Using generated flow config yml:\n%s', test_flow_yaml)
 
         conf_file_buffer = BytesIO()
@@ -105,20 +151,19 @@
         finally:
             conf_file_buffer.close()
 
-        logging.info('Creating and running docker container for flow...')
-
         container = self.client.containers.run(
                 configured_image[0],
                 detach=True,
-                name=name,
+                name=self.name,
                 network=self.network.name,
-                volumes=vols)
-
+                volumes=self.vols)
+        self.network.reload()
+        
         logging.info('Started container \'%s\'', container.name)
 
         self.containers[container.name] = container
 
-    def deploy_nifi_flow(self, flow, name, vols):
+    def deploy_nifi_flow(self):
         dockerfile = dedent(r"""FROM {base_image}
                 USER root
                 ADD flow.xml.gz {nifi_root}/conf/flow.xml.gz
@@ -126,12 +171,12 @@
                 RUN sed -i -e 's/^\(nifi.remote.input.host\)=.*/\1={name}/' {nifi_root}/conf/nifi.properties
                 RUN sed -i -e 's/^\(nifi.remote.input.socket.port\)=.*/\1=5000/' {nifi_root}/conf/nifi.properties
                 USER nifi
-                """.format(name=name,
+                """.format(name=self.name,
                            base_image='apache/nifi:' + self.nifi_version,
                            nifi_root=self.nifi_root))
 
         serializer = Nifi_flow_xml_serializer()
-        test_flow_xml = serializer.serialize(flow, self.nifi_version)
+        test_flow_xml = serializer.serialize(self.flow, self.nifi_version)
         logging.info('Using generated flow config xml:\n%s', test_flow_xml)
 
         conf_file_buffer = BytesIO()
@@ -160,16 +205,16 @@
         container = self.client.containers.run(
                 configured_image[0],
                 detach=True,
-                name=name,
-                hostname=name,
+                name=self.name,
+                hostname=self.name,
                 network=self.network.name,
-                volumes=vols)
+                volumes=self.vols)
 
         logging.info('Started container \'%s\'', container.name)
 
         self.containers[container.name] = container
 
-    def deploy_kafka_broker(self, name):
+    def deploy_kafka_broker(self):
         logging.info('Creating and running docker containers for kafka broker...')
         zookeeper = self.client.containers.run(
                     self.client.images.pull("wurstmeister/zookeeper:latest"),
@@ -287,33 +332,3 @@
         except Exception as e:
             logging.info(e)
             raise
-
-    def __enter__(self):
-        """
-        Allocate ephemeral cluster resources.
-        """
-        return self
-
-    def __exit__(self, exc_type, exc_val, exc_tb):
-        """
-        Clean up ephemeral cluster resources
-        """
-
-        # Clean up containers
-        for container in self.containers.values():
-            logging.info('Cleaning up container: %s', container.name)
-            container.remove(v=True, force=True)
-
-        # Clean up images
-        for image in reversed(self.images):
-            logging.info('Cleaning up image: %s', image[0].id)
-            self.client.images.remove(image[0].id, force=True)
-
-        # Clean up network
-        if self.network is not None:
-            logging.info('Cleaning up network network: %s', self.network.name)
-            self.network.remove()
-
-        # Clean up tmp files
-        for tmp_file in self.tmp_files:
-            os.remove(tmp_file)
diff --git a/docker/test/integration/minifi/processors/GenerateFlowFile.py b/docker/test/integration/minifi/processors/GenerateFlowFile.py
index 93d42ca..65af6cf 100644
--- a/docker/test/integration/minifi/processors/GenerateFlowFile.py
+++ b/docker/test/integration/minifi/processors/GenerateFlowFile.py
@@ -1,8 +1,7 @@
 from ..core.Processor import Processor
 
 class GenerateFlowFile(Processor):
-    def __init__(self, file_size, schedule={'scheduling period': '0 sec'}):
+    def __init__(self, schedule={'scheduling period': '2 sec'}):
         super(GenerateFlowFile, self).__init__('GenerateFlowFile',
-			properties={'File Size': file_size},
 			schedule=schedule,
 			auto_terminate=['success'])
diff --git a/docker/test/integration/minifi/processors/GetFile.py b/docker/test/integration/minifi/processors/GetFile.py
index 29b8180..23a575d 100644
--- a/docker/test/integration/minifi/processors/GetFile.py
+++ b/docker/test/integration/minifi/processors/GetFile.py
@@ -1,7 +1,7 @@
 from ..core.Processor import Processor
 
 class GetFile(Processor):
-	def __init__(self, input_dir, schedule={'scheduling period': '2 sec'}):
+	def __init__(self, input_dir ="/tmp/input", schedule={'scheduling period': '2 sec'}):
 		super(GetFile, self).__init__('GetFile',
 			properties={'Input Directory': input_dir, 'Keep Source File': 'true'},
 			schedule=schedule,
diff --git a/docker/test/integration/minifi/processors/HashContent.py b/docker/test/integration/minifi/processors/HashContent.py
new file mode 100644
index 0000000..f88f05f
--- /dev/null
+++ b/docker/test/integration/minifi/processors/HashContent.py
@@ -0,0 +1,8 @@
+from ..core.Processor import Processor
+
+class HashContent(Processor):
+	def __init__(self, schedule={"scheduling period": "2 sec"}):
+		super(HashContent, self).__init__("HashContent",
+			properties={"Hash Attribute": "hash"},
+			schedule=schedule,
+			auto_terminate=["success", "failure"])
diff --git a/docker/test/integration/minifi/processors/InvokeHTTP.py b/docker/test/integration/minifi/processors/InvokeHTTP.py
index 135c8d5..a19688e 100644
--- a/docker/test/integration/minifi/processors/InvokeHTTP.py
+++ b/docker/test/integration/minifi/processors/InvokeHTTP.py
@@ -1,21 +1,14 @@
 from ..core.Processor import Processor
 
 class InvokeHTTP(Processor):
-    def __init__(self, url,
-        method='GET',
-        proxy_host='',
-        proxy_port='',
-        proxy_username='',
-        proxy_password='',
+    def __init__(self,
         ssl_context_service=None,
         schedule={'scheduling strategy': 'EVENT_DRIVEN'}):
             properties = {
-                'Remote URL': url,
-                'HTTP Method': method,
-                'Proxy Host': proxy_host,
-                'Proxy Port': proxy_port,
-                'invokehttp-proxy-username': proxy_username,
-                'invokehttp-proxy-password': proxy_password }
+                "Proxy Host": "",
+                "Proxy Port": "",
+                "invokehttp-proxy-username": "",
+                "invokehttp-proxy-password": "" }
 
             controller_services = []
 
@@ -28,3 +21,4 @@
                 controller_services = controller_services,
                 auto_terminate = ['success', 'response', 'retry', 'failure', 'no retry'],
                 schedule = schedule)
+            self.out_proc.connect({"failure": self})
diff --git a/docker/test/integration/minifi/processors/ListenHTTP.py b/docker/test/integration/minifi/processors/ListenHTTP.py
index 6f5bca1..7eadc92 100644
--- a/docker/test/integration/minifi/processors/ListenHTTP.py
+++ b/docker/test/integration/minifi/processors/ListenHTTP.py
@@ -1,8 +1,8 @@
 from ..core.Processor import Processor
 
 class ListenHTTP(Processor):
-    def __init__(self, port, cert=None, schedule=None):
-        properties = {'Listening Port': port}
+    def __init__(self, cert=None, schedule=None):
+        properties = {}
 
         if cert is not None:
             properties['SSL Certificate'] = cert
diff --git a/docker/test/integration/minifi/processors/PublishKafkaSSL.py b/docker/test/integration/minifi/processors/PublishKafkaSSL.py
deleted file mode 100644
index 82c33f6..0000000
--- a/docker/test/integration/minifi/processors/PublishKafkaSSL.py
+++ /dev/null
@@ -1,16 +0,0 @@
-from ..core.Processor import Processor
-
-class PublishKafkaSSL(Processor):
-    def __init__(self, schedule={'scheduling strategy': 'EVENT_DRIVEN'}):
-        super(PublishKafkaSSL, self).__init__('PublishKafka',
-            properties={'Client Name': 'LMN', 'Known Brokers': 'kafka-broker:9093',
-                'Topic Name': 'test', 'Batch Size': '10',
-                'Compress Codec': 'none', 'Delivery Guarantee': '1',
-                'Request Timeout': '10 sec', 'Message Timeout': '12 sec',
-                'Security CA': '/tmp/resources/certs/ca-cert',
-                'Security Cert': '/tmp/resources/certs/client_LMN_client.pem',
-                'Security Pass Phrase': 'abcdefgh',
-                'Security Private Key': '/tmp/resources/certs/client_LMN_client.key',
-                'Security Protocol': 'ssl'},
-            auto_terminate=['success'],
-            schedule=schedule)
diff --git a/docker/test/integration/minifi/processors/PutFile.py b/docker/test/integration/minifi/processors/PutFile.py
index 047d32d..db307dd 100644
--- a/docker/test/integration/minifi/processors/PutFile.py
+++ b/docker/test/integration/minifi/processors/PutFile.py
@@ -1,7 +1,7 @@
 from ..core.Processor import Processor
 
 class PutFile(Processor):
-    def __init__(self, output_dir, schedule={'scheduling strategy': 'EVENT_DRIVEN'}):
+    def __init__(self, output_dir="/tmp/output", schedule={'scheduling strategy': 'EVENT_DRIVEN'}):
         super(PutFile, self).__init__('PutFile',
             properties={'Directory': output_dir, 'Directory Permissions': '777', 'Permissions': '777'},
             auto_terminate=['success', 'failure'],
diff --git a/docker/test/integration/minifi/processors/PutS3Object.py b/docker/test/integration/minifi/processors/PutS3Object.py
index 74fb4a8..ab8b1be 100644
--- a/docker/test/integration/minifi/processors/PutS3Object.py
+++ b/docker/test/integration/minifi/processors/PutS3Object.py
@@ -17,4 +17,4 @@
                 'Proxy Port': proxy_port,
                 'Proxy Username': proxy_username,
                 'Proxy Password': proxy_password },
-            auto_terminate = ['success'])
+            auto_terminate = ["success", "failure"])
diff --git a/docker/test/integration/steps/steps.py b/docker/test/integration/steps/steps.py
new file mode 100644
index 0000000..e376c2f
--- /dev/null
+++ b/docker/test/integration/steps/steps.py
@@ -0,0 +1,256 @@
+from MiNiFi_integration_test_driver import MiNiFi_integration_test
+
+from minifi.core.DockerTestCluster import DockerTestCluster
+from minifi.core.FileSystemObserver import FileSystemObserver
+from minifi.core.RemoteProcessGroup import RemoteProcessGroup
+from minifi.core.InputPort import InputPort
+from minifi.core.SSLContextService import SSLContextService
+from minifi.core.SSL_cert_utils import gen_cert, gen_req, rsa_gen_key_callback
+
+from minifi.processors.PublishKafka import PublishKafka
+from minifi.processors.PutS3Object import PutS3Object
+from minifi.processors.DeleteS3Object import DeleteS3Object
+from minifi.processors.FetchS3Object import FetchS3Object
+
+
+from behave import given, then, when
+from behave.model_describe import ModelDescriptor
+from copy import copy
+from copy import deepcopy
+from pydoc import locate
+from pytimeparse.timeparse import timeparse
+
+import os
+import logging
+import re
+import time
+import uuid
+
+# Background
+
+@given("the content of \"{directory}\" is monitored")
+def step_impl(context, directory):
+    context.test.add_file_system_observer(FileSystemObserver(context.test.docker_path_to_local_path(directory)))
+
+# MiNiFi cluster setups
+
+@given("a {processor_type} processor with the \"{property}\" property set to \"{property_value}\" in a \"{cluster_name}\" flow")
+@given("a {processor_type} processor with the \"{property}\" property set to \"{property_value}\" in the \"{cluster_name}\" flow")
+def step_impl(context, processor_type, property, property_value, cluster_name):
+    logging.info("Acquiring " + cluster_name)
+    cluster = context.test.acquire_cluster(cluster_name)
+    processor = locate("minifi.processors." + processor_type + "." + processor_type)()
+    processor.set_property(property, property_value)
+    processor.set_name(processor_type)
+    context.test.add_node(processor)
+    # Assume that the first node declared is primary unless specified otherwise
+    if cluster.get_flow() is None:
+        cluster.set_name(cluster_name)
+        cluster.set_flow(processor)
+
+
+@given("a {processor_type} processor with the \"{property}\" property set to \"{property_value}\"")
+def step_impl(context, processor_type, property, property_value):
+    context.execute_steps("given a {processor_type} processor with the \"{property}\" property set to \"{property_value}\" in the \"{cluster_name}\" flow".
+        format(processor_type=processor_type, property=property, property_value=property_value, cluster_name="primary_cluster"))
+
+@given("a set of processors in the \"{cluster_name}\" flow")
+def step_impl(context, cluster_name):
+    cluster = context.test.acquire_cluster(cluster_name)
+    logging.info(context.table)
+    for row in context.table:
+        processor = locate("minifi.processors." + row["type"] + "." + row["type"])()
+        processor.set_name(row["name"])
+        processor.set_uuid(row["uuid"])
+        context.test.add_node(processor)
+        # Assume that the first node declared is primary unless specified otherwise
+        if cluster.get_flow() is None:
+            cluster.set_flow(processor)
+
+@given("a set of processors")
+def step_impl(context):
+    rendered_table = ModelDescriptor.describe_table(context.table, "    ")
+    context.execute_steps("""given a set of processors in the \"{cluster_name}\" flow
+        {table}
+        """.format(cluster_name="primary_cluster", table=rendered_table))
+
+@given("a RemoteProcessGroup node opened on \"{address}\"")
+def step_impl(context, address):
+    remote_process_group = RemoteProcessGroup(address, "RemoteProcessGroup")
+    context.test.add_remote_process_group(remote_process_group)
+
+@given("a PutS3Object processor set up to communicate with an s3 server")
+def step_impl(context):
+    # PublishKafka is never the first node of a flow potential cluster-flow setup is omitted
+    put_s3 = PutS3Object()
+    put_s3.set_name("PutS3Object")
+    context.test.add_node(put_s3)
+
+@given("a DeleteS3Object processor set up to communicate with the same s3 server")
+@given("a DeleteS3Object processor set up to communicate with an s3 server")
+def step_impl(context):
+    delete_s3 = DeleteS3Object()
+    delete_s3.set_name("DeleteS3Object")
+    context.test.add_node(delete_s3)
+
+@given("a FetchS3Object processor set up to communicate with the same s3 server")
+@given("a FetchS3Object processor set up to communicate with an s3 server")
+def step_impl(context):
+    fetch_s3 = FetchS3Object()
+    fetch_s3.set_name("FetchS3Object")
+    context.test.add_node(fetch_s3)
+
+@given("a PublishKafka processor set up to communicate with a kafka broker instance")
+def step_impl(context):
+    # PublishKafka is never the first node of a flow potential cluster-flow setup is omitted
+    publish_kafka = PublishKafka()
+    publish_kafka.set_name("PublishKafka")
+    context.test.add_node(publish_kafka)
+
+@given("the \"{property_name}\" of the {processor_name} processor is set to \"{property_value}\"")
+def step_impl(context, property_name, processor_name, property_value):
+    processor = context.test.get_node_by_name(processor_name)
+    processor.set_property(property_name, property_value)
+
+
+@given("the scheduling period of the {processor_name} processor is set to \"{sceduling_period}\"")
+def step_impl(context, processor_name, sceduling_period):
+    processor = context.test.get_node_by_name(processor_name)
+    processor.set_scheduling_period(sceduling_period)
+
+@given("these processor properties are set")
+@given("these processor properties are set to match the http proxy")
+def step_impl(context):
+    for row in context.table:
+        context.test.get_node_by_name(row["processor name"]).set_property(row["property name"], row["property value"])
+
+@given("the \"{relationship}\" relationship of the {source_name} processor is connected to the input port on the {remote_process_group_name}")
+def step_impl(context, relationship, source_name, remote_process_group_name):
+    source = context.test.get_node_by_name(source_name)
+    remote_process_group = context.test.get_remote_process_group_by_name(remote_process_group_name)
+    input_port_node = context.test.generate_input_port_for_remote_process_group(remote_process_group, "to_nifi")
+    context.test.add_node(input_port_node)
+    source.out_proc.connect({relationship: input_port_node})
+
+@given("the \"{relationship}\" relationship of the {source_name} is connected to the {destination_name}")
+@given("the \"{relationship}\" relationship of the {source_name} processor is connected to the {destination_name}")
+def step_impl(context, relationship, source_name, destination_name):
+    source = context.test.get_node_by_name(source_name)
+    destination = context.test.get_node_by_name(destination_name)
+    source.out_proc.connect({relationship: destination})
+
+@given("the processors are connected up as described here")
+def step_impl(context):
+    for row in context.table:
+        context.execute_steps(
+            "given the \"" + row["relationship name"] + "\" relationship of the " + row["source name"] + " processor is connected to the " + row["destination name"])
+
+@given("the connection going to the RemoteProcessGroup has \"drop empty\" set")
+def step_impl(context):
+    input_port = context.test.get_node_by_name("to_nifi")
+    input_port.drop_empty_flowfiles = True
+
+@given("a file with the content \"{content}\" is present in \"{path}\"")
+def step_impl(context, content, path):
+    context.test.add_test_data(path, content)
+
+# NiFi setups
+
+@given("a NiFi flow \"{cluster_name}\" receiving data from a RemoteProcessGroup \"{source_name}\" on port {port}")
+def step_impl(context, cluster_name, source_name, port):
+    remote_process_group = context.test.get_remote_process_group_by_name("RemoteProcessGroup")
+    source = context.test.generate_input_port_for_remote_process_group(remote_process_group, "from-minifi")
+    context.test.add_node(source)
+    cluster = context.test.acquire_cluster(cluster_name)
+    cluster.set_name('nifi')
+    cluster.set_engine('nifi')
+    # Assume that the first node declared is primary unless specified otherwise
+    if cluster.get_flow() is None:
+        cluster.set_flow(source)
+
+@given("in the \"{cluster_name}\" flow the \"{relationship}\" relationship of the {source_name} processor is connected to the {destination_name}")
+def step_impl(context, cluster_name, relationship, source_name, destination_name):
+    cluster = context.test.acquire_cluster(cluster_name)
+    source = context.test.get_or_create_node_by_name(source_name)
+    destination = context.test.get_or_create_node_by_name(destination_name)
+    source.out_proc.connect({relationship: destination})
+    if cluster.get_flow() is None:
+        cluster.set_flow(source)
+
+# HTTP proxy setup
+
+@given("the http proxy server \"{cluster_name}\" is set up")
+@given("a http proxy server \"{cluster_name}\" is set up accordingly")
+def step_impl(context, cluster_name):
+    cluster = context.test.acquire_cluster(cluster_name)
+    cluster.set_name(cluster_name)
+    cluster.set_engine("http-proxy")
+    cluster.set_flow(None)
+
+# TLS
+# 
+@given("an ssl context service set up for {producer_name} and {consumer_name}")
+def step_impl(context, producer_name, consumer_name):
+    cert, key = gen_cert()
+    crt_file = '/tmp/resources/test-crt.pem'
+    ssl_context_service = SSLContextService(cert=crt_file, ca_cert=crt_file)
+    context.test.put_test_resource('test-crt.pem', cert.as_pem() + key.as_pem(None, rsa_gen_key_callback))
+    producer = context.test.get_node_by_name(producer_name)
+    producer.controller_services.append(ssl_context_service)
+    producer.set_property("SSL Context Service", ssl_context_service.name)
+    consumer = context.test.get_node_by_name(consumer_name)
+    consumer.set_property("SSL Certificate", crt_file)
+    consumer.set_property("SSL Verify Peer", "no")
+
+# Kafka setup
+
+@given("a kafka broker \"{cluster_name}\" is set up in correspondence with the PublishKafka")
+def step_impl(context, cluster_name):
+    cluster = context.test.acquire_cluster(cluster_name)
+    cluster.set_name(cluster_name)
+    cluster.set_engine("kafka-broker")
+    cluster.set_flow(None)
+
+# s3 setup
+
+@given("a s3 server \"{cluster_name}\" is set up in correspondence with the PutS3Object")
+@given("a s3 server \"{cluster_name}\" is set up in correspondence with the DeleteS3Object")
+def step_impl(context, cluster_name):
+    cluster = context.test.acquire_cluster(cluster_name)
+    cluster.set_name(cluster_name)
+    cluster.set_engine("s3-server")
+    cluster.set_flow(None)
+
+@when("the MiNiFi instance starts up")
+@when("both instances start up")
+@when("all instances start up")
+def step_impl(context):
+    context.test.start()
+
+@then("a flowfile with the content \"{content}\" is placed in the monitored directory in less than {duration}")
+def step_impl(context, content, duration):
+    context.test.check_for_file_with_content_generated(content, timeparse(duration))
+
+@then("at least one empty flowfile is placed in the monitored directory in less than {duration}")
+def step_impl(context, duration):
+    context.test.check_for_multiple_empty_files_generated(timeparse(duration))
+
+@then("no files are placed in the monitored directory in {duration} of running time")
+def step_impl(context, duration):
+    context.test.check_for_no_files_generated(timeparse(duration))
+
+@then("no errors were generated on the \"{cluster_name}\" regarding \"{url}\"")
+def step_impl(context, cluster_name, url):
+    context.test.check_http_proxy_access(cluster_name, url)
+
+@then("the object on the \"{cluster_name}\" s3 server is \"{object_data}\"")
+def step_impl(context, cluster_name, object_data):
+    context.test.check_s3_server_object_data(cluster_name, object_data)
+
+@then("the object content type on the \"{cluster_name}\" s3 server is \"{content_type}\" and the object metadata matches use metadata")
+def step_impl(context, cluster_name, content_type):
+    context.test.check_s3_server_object_metadata(cluster_name, content_type)
+
+@then("the object bucket on the \"{cluster_name}\" s3 server is empty")
+def step_impl(context, cluster_name):
+    context.test.check_empty_s3_bucket(cluster_name)
diff --git a/docker/test/integration/test_filesystem_ops.py b/docker/test/integration/test_filesystem_ops.py
deleted file mode 100644
index 5b48e6e..0000000
--- a/docker/test/integration/test_filesystem_ops.py
+++ /dev/null
@@ -1,51 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the \"License\"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#     http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an \"AS IS\" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-from minifi import *
-
-def test_get_put():
-    """
-    Verify basic file get/put operations.
-    """
-
-    flow = GetFile('/tmp/input') >> LogAttribute() >> PutFile('/tmp/output')
-
-    with DockerTestCluster(SingleFileOutputValidator('test')) as cluster:
-        cluster.put_test_data('test')
-        cluster.deploy_flow(flow)
-
-        assert cluster.check_output()
-
-
-def test_file_exists_failure():
-    """
-    Verify that putting to a file that already exists fails.
-    """
-
-    flow = (GetFile('/tmp/input')
-
-            # First put should succeed
-            >> PutFile('/tmp')
-
-            # Second put should fail (file exists)
-            >> PutFile('/tmp')
-            >> (('success', LogAttribute()),
-                ('failure', LogAttribute() >> PutFile('/tmp/output'))))
-
-    with DockerTestCluster(SingleFileOutputValidator('test')) as cluster:
-        cluster.put_test_data('test')
-        cluster.deploy_flow(flow)
-
-        assert cluster.check_output()
diff --git a/docker/test/integration/test_filter_zero_file.py b/docker/test/integration/test_filter_zero_file.py
deleted file mode 100644
index 918582b..0000000
--- a/docker/test/integration/test_filter_zero_file.py
+++ /dev/null
@@ -1,36 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the \"License\"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#     http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an \"AS IS\" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-from minifi import *
-
-def test_filter_zero_file():
-    """
-    Verify sending data from a MiNiFi - C++ to NiFi using S2S protocol.
-    """
-
-    port = InputPort('from-minifi', RemoteProcessGroup('http://nifi:8080/nifi'))
-
-    recv_flow = (port
-                 >> LogAttribute()
-                 >> PutFile('/tmp/output'))
-
-    send_flow = (GenerateFlowFile('0B')
-                 >> LogAttribute()
-                 >> ~port)
-
-    with DockerTestCluster(NoFileOutPutValidator()) as cluster:
-        cluster.deploy_flow(recv_flow, name='nifi', engine='nifi')
-        cluster.deploy_flow(send_flow)
-        assert cluster.check_output(120)
diff --git a/docker/test/integration/test_hash_content.py b/docker/test/integration/test_hash_content.py
deleted file mode 100644
index e60d3a3..0000000
--- a/docker/test/integration/test_hash_content.py
+++ /dev/null
@@ -1,32 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the \"License\"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#     http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an \"AS IS\" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-from minifi import *
-
-def test_hash_invoke():
-    """
-    Verify sending using InvokeHTTP to a receiver using ListenHTTP.
-    """
-    invoke_flow = (GetFile('/tmp/input') >> Processor(name='HashContent',clazz='HashContent',properties={'Hash Attribute': 'hash'},auto_terminate=['failure']) 
-                   >> InvokeHTTP('http://minifi-listen:8080/contentListener', method='POST'))
-
-    listen_flow = ListenHTTP(8080)  >> LogAttribute() >>  PutFile('/tmp/output')
-
-    with DockerTestCluster(SingleFileOutputValidator('test')) as cluster:
-        cluster.put_test_data('test')
-        cluster.deploy_flow(listen_flow, name='minifi-listen')
-        cluster.deploy_flow(invoke_flow, name='minifi-invoke')
-
-        assert cluster.check_output()
diff --git a/docker/test/integration/test_http.py b/docker/test/integration/test_http.py
deleted file mode 100644
index f9431d4..0000000
--- a/docker/test/integration/test_http.py
+++ /dev/null
@@ -1,57 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the \"License\"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#     http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an \"AS IS\" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-from minifi import *
-
-def test_invoke_listen():
-    """
-    Verify sending using InvokeHTTP to a receiver using ListenHTTP.
-    """
-    invoke_flow = (GetFile('/tmp/input')
-                   >> LogAttribute()
-                   >> InvokeHTTP('http://minifi-listen:8080/contentListener', method='POST'))
-
-    listen_flow = ListenHTTP(8080) >> LogAttribute() >> PutFile('/tmp/output')
-
-    with DockerTestCluster(SingleFileOutputValidator('test')) as cluster:
-        cluster.put_test_data('test')
-        cluster.deploy_flow(listen_flow, name='minifi-listen')
-        cluster.deploy_flow(invoke_flow, name='minifi-invoke')
-
-        assert cluster.check_output()
-
-def test_invoke_listen_with_proxy():
-    """
-    Verify sending through a proxy using InvokeHTTP to a receiver using ListenHTTP.
-    """
-    invoke_flow = (GetFile('/tmp/input')
-                   >> LogAttribute()
-                   >> InvokeHTTP('http://minifi-listen:8080/contentListener',
-                                 method='POST',
-                                 proxy_host='http-proxy',
-                                 proxy_port='3128',
-                                 proxy_username='admin',
-                                 proxy_password='test101'))
-
-    listen_flow = ListenHTTP(8080) >> LogAttribute() >> PutFile('/tmp/output')
-
-    with DockerTestCluster(SingleFileOutputValidator('test')) as cluster:
-        cluster.put_test_data('test')
-        cluster.deploy_flow(None, engine='http-proxy')
-        cluster.deploy_flow(listen_flow, name='minifi-listen')
-        cluster.deploy_flow(invoke_flow, name='minifi-invoke')
-
-        assert cluster.check_output()
-        assert cluster.check_http_proxy_access("http://minifi-listen:8080/contentListener")
diff --git a/docker/test/integration/test_rdkafka.py b/docker/test/integration/test_rdkafka.py
deleted file mode 100644
index bea36db..0000000
--- a/docker/test/integration/test_rdkafka.py
+++ /dev/null
@@ -1,98 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the \"License\"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#     http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an \"AS IS\" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-from minifi import *
-
-def test_publish_kafka():
-    """
-    Verify delivery of message to kafka broker
-    """
-    producer_flow = GetFile('/tmp/input') >> PublishKafka() \
-                        >> (('failure', LogAttribute()),
-                            ('success', PutFile('/tmp/output/success')))
-
-    with DockerTestCluster(SingleFileOutputValidator('test', subdir='success')) as cluster:
-        cluster.put_test_data('test')
-        cluster.deploy_flow(None, engine='kafka-broker')
-        cluster.deploy_flow(producer_flow, name='minifi-producer', engine='minifi-cpp')
-
-        assert cluster.check_output(30)
-
-def test_no_broker():
-    """
-    Verify failure case when broker is down
-    """
-    producer_flow = (GetFile('/tmp/input') >> PublishKafka()
-                        >> (('failure', PutFile('/tmp/output')),
-                            ('success', LogAttribute())))
-
-    with DockerTestCluster(SingleFileOutputValidator('no broker')) as cluster:
-        cluster.put_test_data('no broker')
-        cluster.deploy_flow(producer_flow, name='minifi-producer', engine='minifi-cpp')
-
-        assert cluster.check_output(60)
-
-def test_broker_on_off():
-    """
-    Verify delivery of message when broker is unstable
-    """
-    producer_flow = (GetFile('/tmp/input') >> PublishKafka()
-                     >> (('success', PutFile('/tmp/output/success')),
-                         ('failure', PutFile('/tmp/output/failure'))))
-
-    with DockerTestCluster(SingleFileOutputValidator('test')) as cluster:
-        cluster.put_test_data('test')
-        cluster.deploy_flow(None, engine='kafka-broker')
-        cluster.deploy_flow(producer_flow, name='minifi-producer', engine='minifi-cpp')
-        start_count = 1
-        stop_count = 0
-
-        def start_kafka():
-            nonlocal start_count
-            assert cluster.start_flow('kafka-broker')
-            assert cluster.start_flow('kafka-consumer')
-            start_count += 1
-            assert cluster.wait_for_container_logs('zookeeper', 'Established session', 30, start_count)
-        def stop_kafka():
-            nonlocal stop_count
-            assert cluster.stop_flow('kafka-consumer')
-            assert cluster.stop_flow('kafka-broker')
-            stop_count += 1
-            assert cluster.wait_for_container_logs('zookeeper', 'Processed session termination for sessionid', 30, stop_count)
-
-        assert cluster.check_output(30, subdir='success')
-        stop_kafka()
-        assert cluster.check_output(60, subdir='failure')
-        start_kafka()
-        cluster.rm_out_child('success')
-        assert cluster.check_output(60, subdir='success')
-        stop_kafka()
-        cluster.rm_out_child('failure')
-        assert cluster.check_output(60, subdir='failure')
-
-def test_ssl():
-    """
-    Verify security connection
-    """
-    producer_flow = GetFile('/tmp/input') >> PublishKafkaSSL() \
-                    >> (('failure', LogAttribute()),
-                        ('success', PutFile('/tmp/output/ssl')))
-
-    with DockerTestCluster(SingleFileOutputValidator('test', subdir='ssl')) as cluster:
-        cluster.put_test_data('test')
-        cluster.deploy_flow(None, engine='kafka-broker')
-        cluster.deploy_flow(producer_flow, name='minifi-producer', engine='minifi-cpp')
-
-        assert cluster.check_output(30)
diff --git a/docker/test/integration/test_s2s.py b/docker/test/integration/test_s2s.py
deleted file mode 100644
index ff1a312..0000000
--- a/docker/test/integration/test_s2s.py
+++ /dev/null
@@ -1,38 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the \"License\"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#     http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an \"AS IS\" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-from minifi import *
-
-def test_minifi_to_nifi():
-    """
-    Verify sending data from a MiNiFi - C++ to NiFi using S2S protocol.
-    """
-
-    port = InputPort('from-minifi', RemoteProcessGroup('http://nifi:8080/nifi'))
-
-    recv_flow = (port
-                 >> LogAttribute()
-                 >> PutFile('/tmp/output'))
-
-    send_flow = (GetFile('/tmp/input')
-                 >> LogAttribute()
-                 >> port)
-
-    with DockerTestCluster(SingleFileOutputValidator('test')) as cluster:
-        cluster.put_test_data('test')
-        cluster.deploy_flow(recv_flow, name='nifi', engine='nifi')
-        cluster.deploy_flow(send_flow)
-
-        assert cluster.check_output(120)
diff --git a/docker/test/integration/test_s3.py b/docker/test/integration/test_s3.py
deleted file mode 100644
index a4bcebe..0000000
--- a/docker/test/integration/test_s3.py
+++ /dev/null
@@ -1,144 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the \"License\"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#     http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an \"AS IS\" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-from minifi import *
-
-def test_put_s3_object():
-    """
-    Verify delivery of S3 object to AWS server
-    """
-    flow = (GetFile('/tmp/input') >> PutS3Object() \
-                 >> LogAttribute() \
-                 >> PutFile('/tmp/output/success'))
-
-    with DockerTestCluster(SingleFileOutputValidator('test', subdir='success')) as cluster:
-        cluster.put_test_data('LH_O#L|FD<FASD{FO#@$#$%^ "#"$L%:"@#$L":test_data#$#%#$%?{"F{')
-        cluster.deploy_flow(None, engine='s3-server')
-        cluster.deploy_flow(flow, engine='minifi-cpp', name='minifi-cpp')
-
-        assert cluster.check_output(60)
-
-        assert cluster.check_s3_server_object_data()
-        assert cluster.check_s3_server_object_metadata()
-
-def test_put_s3_object_proxy():
-    """
-    Verify delivery of S3 object to AWS server through proxy server
-    """
-    flow = (GetFile('/tmp/input') \
-            >> PutS3Object(proxy_host='http-proxy',
-                           proxy_port='3128',
-                           proxy_username='admin',
-                           proxy_password='test101') \
-            >> LogAttribute() \
-            >> PutFile('/tmp/output/success'))
-
-    with DockerTestCluster(SingleFileOutputValidator('test', subdir='success')) as cluster:
-        cluster.put_test_data('LH_O#L|FD<FASD{FO#@$#$%^ "#"$L%:"@#$L":test_data#$#%#$%?{"F{')
-        cluster.deploy_flow(None, engine='http-proxy')
-        cluster.deploy_flow(None, engine='s3-server')
-        cluster.deploy_flow(flow, engine='minifi-cpp', name='minifi-cpp')
-
-        assert cluster.check_output(60)
-
-        assert cluster.check_http_proxy_access("http://s3-server:9090/test_bucket/test_object_key")
-        assert cluster.check_s3_server_object_data()
-        assert cluster.check_s3_server_object_metadata()
-
-def test_delete_s3_object():
-    """
-    Verify deletion of S3 object
-    """
-    flow = (GetFile('/tmp/input') >> PutS3Object() \
-            >> LogAttribute() \
-            >> DeleteS3Object() \
-            >> PutFile('/tmp/output/success'))
-
-    with DockerTestCluster(SingleFileOutputValidator('test', subdir='success')) as cluster:
-        cluster.put_test_data('test_data')
-        cluster.deploy_flow(None, engine='s3-server')
-        cluster.deploy_flow(flow, engine='minifi-cpp', name='minifi-cpp')
-        assert cluster.check_output(60)
-        assert cluster.is_s3_bucket_empty()
-
-def test_delete_s3_non_existing_object():
-    """
-    Verify deletion of a non-existing S3 object should succeed
-    """
-    flow = (GetFile('/tmp/input')
-            >> DeleteS3Object() \
-            >> PutFile('/tmp/output/success'))
-
-    with DockerTestCluster(SingleFileOutputValidator('test', subdir='success')) as cluster:
-        cluster.put_test_data('test_data')
-        cluster.deploy_flow(None, engine='s3-server')
-        cluster.deploy_flow(flow, engine='minifi-cpp', name='minifi-cpp')
-        assert cluster.check_output(60)
-
-def test_delete_s3_object_proxy():
-    """
-    Verify deletion of S3 object through proxy server
-    """
-    flow = (GetFile('/tmp/input') >> PutS3Object() \
-            >> LogAttribute() \
-            >> DeleteS3Object(proxy_host='http-proxy',
-                              proxy_port='3128',
-                              proxy_username='admin',
-                              proxy_password='test101') \
-            >> PutFile('/tmp/output/success'))
-
-    with DockerTestCluster(SingleFileOutputValidator('test', subdir='success')) as cluster:
-        cluster.put_test_data('test_data')
-        cluster.deploy_flow(None, engine='s3-server')
-        cluster.deploy_flow(None, engine='http-proxy')
-        cluster.deploy_flow(flow, engine='minifi-cpp', name='minifi-cpp')
-        assert cluster.check_output(60)
-        assert cluster.is_s3_bucket_empty()
-        assert cluster.check_http_proxy_access("http://s3-server:9090/test_bucket/test_object_key")
-
-def test_fetch_s3_object():
-    """
-    Verify fetch of S3 object
-    """
-    put_flow = (GetFile('/tmp/input') >> PutS3Object())
-    fetch_flow = (GenerateFlowFile("1 kB", schedule={'scheduling period': '5 sec'}) >> FetchS3Object() >> LogAttribute() >> PutFile('/tmp/output/success'))
-
-    with DockerTestCluster(SingleFileOutputValidator('test', subdir='success')) as cluster:
-        cluster.put_test_data('test_data')
-        cluster.deploy_flow(None, engine='s3-server')
-        cluster.deploy_flow(put_flow, engine='minifi-cpp', name='minifi-cpp-put')
-        cluster.deploy_flow(fetch_flow, engine='minifi-cpp', name='minifi-cpp-fetch')
-        assert cluster.check_output(60)
-
-def test_fetch_s3_object_proxy():
-    """
-    Verify fetch of S3 object
-    """
-    put_flow = (GetFile('/tmp/input') >> PutS3Object() >> LogAttribute())
-    fetch_flow = (GenerateFlowFile("1 kB", schedule={'scheduling period': '5 sec'}) \
-                  >> FetchS3Object(proxy_host='http-proxy',
-                                   proxy_port='3128',
-                                   proxy_username='admin',
-                                   proxy_password='test101') \
-                  >> LogAttribute() >> PutFile('/tmp/output/success'))
-
-    with DockerTestCluster(SingleFileOutputValidator('test', subdir='success')) as cluster:
-        cluster.put_test_data('test_data')
-        cluster.deploy_flow(None, engine='s3-server')
-        cluster.deploy_flow(None, engine='http-proxy')
-        cluster.deploy_flow(put_flow, engine='minifi-cpp', name='minifi-cpp-put')
-        cluster.deploy_flow(fetch_flow, engine='minifi-cpp', name='minifi-cpp-fetch')
-        assert cluster.check_output(60)
-        assert cluster.check_http_proxy_access("http://s3-server:9090/test_bucket/test_object_key")
diff --git a/docker/test/integration/test_zero_file.py b/docker/test/integration/test_zero_file.py
deleted file mode 100644
index 1a0cf05..0000000
--- a/docker/test/integration/test_zero_file.py
+++ /dev/null
@@ -1,36 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the \"License\"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#     http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an \"AS IS\" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-from minifi import *
-
-def test_zero_file():
-    """
-    Verify sending data from a MiNiFi - C++ to NiFi using S2S protocol.
-    """
-
-    port = InputPort('from-minifi', RemoteProcessGroup('http://nifi:8080/nifi'))
-
-    recv_flow = (port
-                 >> LogAttribute(schedule={'scheduling strategy': 'TIMER_DRIVEN'})
-                 >> PutFile('/tmp/output'))
-
-    send_flow = (GenerateFlowFile('0B')
-                 >> LogAttribute()
-                 >> port)
-
-    with DockerTestCluster(EmptyFilesOutPutValidator()) as cluster:
-        cluster.deploy_flow(recv_flow, name='nifi', engine='nifi')
-        cluster.deploy_flow(send_flow)
-        assert cluster.check_output(120)
diff --git a/docker/test/test_https.py b/docker/test/test_https.py
deleted file mode 100644
index 2ea1bed..0000000
--- a/docker/test/test_https.py
+++ /dev/null
@@ -1,100 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the \"License\"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#     http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an \"AS IS\" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-import time
-
-from M2Crypto import X509, EVP, RSA, ASN1
-
-from minifi import *
-
-def callback():
-    pass
-
-
-def test_invoke_listen_https_one_way():
-    """
-    Verify sending using InvokeHTTP to a receiver using ListenHTTP (with TLS).
-    """
-
-    cert, key = gen_cert()
-
-    # TODO define SSLContextService class & generate config yml for services
-    crt_file = '/tmp/resources/test-crt.pem'
-
-    invoke_flow = (GetFile('/tmp/input')
-                   >> InvokeHTTP('https://minifi-listen:4430/contentListener',
-                                 method='POST',
-                                 ssl_context_service=SSLContextService(cert=crt_file, ca_cert=crt_file)))
-
-    listen_flow = (ListenHTTP(4430, cert=crt_file)
-                   >> LogAttribute()
-                   >> PutFile('/tmp/output'))
-
-    with DockerTestCluster(SingleFileOutputValidator('test')) as cluster:
-        cluster.put_test_resource('test-crt.pem', cert.as_pem() + key.as_pem(None, callback))
-        cluster.put_test_data('test')
-        cluster.deploy_flow(listen_flow, name='minifi-listen')
-        cluster.deploy_flow(invoke_flow, name='minifi-invoke')
-
-        assert cluster.check_output()
-
-
-def gen_cert():
-    """
-    Generate TLS certificate request for testing
-    """
-
-    req, key = gen_req()
-    pub_key = req.get_pubkey()
-    subject = req.get_subject()
-    cert = X509.X509()
-    # noinspection PyTypeChecker
-    cert.set_serial_number(1)
-    cert.set_version(2)
-    cert.set_subject(subject)
-    t = int(time.time())
-    now = ASN1.ASN1_UTCTIME()
-    now.set_time(t)
-    now_plus_year = ASN1.ASN1_UTCTIME()
-    now_plus_year.set_time(t + 60 * 60 * 24 * 365)
-    cert.set_not_before(now)
-    cert.set_not_after(now_plus_year)
-    issuer = X509.X509_Name()
-    issuer.C = 'US'
-    issuer.CN = 'minifi-listen'
-    cert.set_issuer(issuer)
-    cert.set_pubkey(pub_key)
-    cert.sign(key, 'sha256')
-
-    return cert, key
-
-
-def gen_req():
-    """
-    Generate TLS certificate request for testing
-    """
-
-    logging.info('Generating test certificate request')
-    key = EVP.PKey()
-    req = X509.Request()
-    rsa = RSA.gen_key(1024, 65537, callback)
-    key.assign_rsa(rsa)
-    req.set_pubkey(key)
-    name = req.get_subject()
-    name.C = 'US'
-    name.CN = 'minifi-listen'
-    req.sign(key, 'sha256')
-
-    return req, key