tree: ffbdab52cff322f7affc560fa93fe5921a09950d [path history] [tgz]
  1. minifi/
  2. .gitignore
  3. README.md
  4. test_filesystem_ops.py
  5. test_http.py
  6. test_https.py
  7. test_s2s.py
docker/test/integration/README.md

Apache MiNiFi Docker System Integration Tests

Apache MiNiFi includes a suite of docker-based system integration tests. These tests are designed to test the integration between distinct MiNiFi instances as well as other systems which are available in docker, such as Apache NiFi.

Test Execution Lifecycle

Each test involves the following stages as part of its execution lifecycle:

Definition of flows/Flow DSL

Flows are defined using a python-native domain specific language (DSL). The DSL supports the standard primitives which make up a NiFi/MiNiFi flow, such as processors, connections, and controller services. Several processors defined in the DSL have optional, named parameters enabling concise flow expression.

By default, all relationships are set to auto-terminate. If a relationship is used, it is automatically taken out of the auto_terminate list.

Example Trivial Flow:

flow = GetFile('/tmp/input') >> LogAttribute() >> PutFile('/tmp/output')

Supported Processors

The following processors/parameters are supported:

GetFile

  • input_dir

PutFile

  • output_dir

LogAttribute

ListenHTTP

  • port
  • cert=None

InvokeHTTP

  • url
  • method=‘GET’
  • ssl_context_service=None

Remote Process Groups

Remote process groups and input ports are supported.

Example InputPort/RemoteProcessGroup:

port = InputPort('from-minifi', RemoteProcessGroup('http://nifi:8080/nifi'))

InputPorts may be used as inputs or outputs in the flow DSL:

recv_flow = (port
             >> LogAttribute()
             >> PutFile('/tmp/output'))

send_flow = (GetFile('/tmp/input')
             >> LogAttribute()
             >> port)

These example flows could be deployed as separate NiFi/MiNiFi instances where the send_flow would send data to the recv_flow using the site-to-site protocol.

Definition of an output validator

The output validator is responsible for checking the state of a cluster for valid output conditions. Currently, the only supported output validator is the SingleFileOutputValidator, which looks for a single file to be written to /tmp/output by a flow having a given string as its contents.

Example SingleFileOutputValidator:

SingleFileOutputValidator('example output')

This example SingleFileOutputValidator would validate that a single file is written with the contents ‘example output.’

Creation of a DockerTestCluster

DockerTestCluster instances are used to deploy one or more flow to a simulated or actual multi-host docker cluster. This enables testing of interactions between multiple system components, such as MiNiFi flows. Before the test cluster is destroyed, an assertion may be performed on the results of the check_output() method of the cluster. This invokes the validator supplied at construction against the output state of the system.

Creation of a DockerTestCluster is simple:

Example DockerTestCluster Instantiation:

with DockerTestCluster(SingleFileOutputValidator('test')) as cluster:
  ...
  # Perform test operations
  ...
  assert cluster.check_output()

Note that a docker cluster must be created inside of a with structure to ensure that all resources are ccreated and destroyed cleanly.

Insertion of test input data

Although arbitrary NiFi flows can ingest data from a multitude of sources, a MiNiFi system integration test is expected to receive input via deterministed, controlled channels. The primary supported method of providing input to a MiNiFi system integration test is to insert data into the filesystem at /tmp/input.

To write a string to the contents of a file in /tmp/input, use the put_test_data() method.

Example put_test_data() Usage:

cluster.put_test_data('test')

This writes a file with a random name to /tmp/input, with the contents ‘test.’

To provide a resource to a container, such as a TLS certificate, use the put_test_resource() method to write a resource file to /tmp/resources.

Example put_test_resource() Usage:

cluster.put_test_resource('test-resource', 'resource contents')

This writes a file to /tmp/resources/test-resource with the contents ‘resource contents.’

Deployment of one or more flows

Deployment of flows to a test cluster is performed using the deploy_flow() method of a cluster. Each flow is deployed as a separate docker service having its own DNS name. If a name is not provided upon deployment, a random name will be used.

Example deploy_flow() Usage:

cluster.deploy_flow(flow, name='test-flow')

The deploy_flow function defaults to a MiNiFi - C++ engine, but other engines, such as NiFi may be used:

cluster.deploy_flow(flow, engine='nifi')

Execution of one or more flows

Flows are executed immediately upon deployment and according to schedule properties defined in the flow.yml. As such, to minimize test latency it is important to ensure that test inputs are added to the test cluster before flows are deployed. Filesystem events are monitored using event APIs, ensuring that flows are executed immediately upon input availability and output is validated immediately after it is written to disk.

Output validation

As soon as data is written to /tmp/output, the OutputValidator (defined according to the documentation above) is executed on the output. The check_output() cluster method waits for up to 5 seconds for valid output.

Cluster teardown/cleanup

The deployment of a test cluster involves creating one or more docker containers and networks, as well as temporary files/directories on the host system. All resources are cleaned up automatically as long as clusters are created within a with block.


# Using the with block ensures that all cluster resources are cleaned up after # the test cluster is no longer needed. with DockerTestCluster(SingleFileOutputValidator('test')) as cluster: ... # Perform test operations ... assert cluster.check_output()