blob: 87285e409e1d0b9df6848d18c8d8c2b700531cbc [file] [log] [blame]
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from filesystem_validation.FileSystemObserver import FileSystemObserver
from minifi.core.RemoteProcessGroup import RemoteProcessGroup
from ssl_utils.SSL_cert_utils import make_server_cert
from minifi.core.Funnel import Funnel
from minifi.controllers.SSLContextService import SSLContextService
from minifi.controllers.GCPCredentialsControllerService import GCPCredentialsControllerService
from minifi.controllers.ElasticsearchCredentialsService import ElasticsearchCredentialsService
from minifi.controllers.ODBCService import ODBCService
from minifi.controllers.KubernetesControllerService import KubernetesControllerService
from minifi.controllers.JsonRecordSetWriter import JsonRecordSetWriter
from minifi.controllers.JsonTreeReader import JsonTreeReader
from minifi.controllers.XMLReader import XMLReader
from minifi.controllers.XMLRecordSetWriter import XMLRecordSetWriter
from minifi.controllers.CouchbaseClusterService import CouchbaseClusterService
from minifi.controllers.XMLReader import XMLReader
from behave import given, then, when
from behave.model_describe import ModelDescriptor
from pydoc import locate
import logging
import time
import uuid
import humanfriendly
import OpenSSL.crypto
import os
# Background
@given("the content of \"{directory}\" is monitored")
def step_impl(context, directory):
context.test.add_file_system_observer(FileSystemObserver(context.directory_bindings.docker_path_to_local_path(directory)))
@given("there is a \"{subdir}\" subdirectory in the monitored directory")
def step_impl(context, subdir):
output_dir = context.test.file_system_observer.get_output_dir() + "/" + subdir
os.mkdir(output_dir)
os.chmod(output_dir, 0o777)
def __create_processor(context, processor_type, processor_name, property_name, property_value, container_name, engine='minifi-cpp'):
container = context.test.acquire_container(context=context, name=container_name, engine=engine)
processor = locate("minifi.processors." + processor_type + "." + processor_type)(context=context)
processor.set_name(processor_name)
if property_name is not None:
processor.set_property(property_name, property_value)
context.test.add_node(processor)
# Assume that the first node declared is primary unless specified otherwise
if not container.get_start_nodes():
container.add_start_node(processor)
# MiNiFi cluster setups
@given("a {processor_type} processor with the name \"{processor_name}\" and the \"{property_name}\" property set to \"{property_value}\" in a \"{minifi_container_name}\" flow")
@given("a {processor_type} processor with the name \"{processor_name}\" and the \"{property_name}\" property set to \"{property_value}\" in the \"{minifi_container_name}\" flow")
def step_impl(context, processor_type, processor_name, property_name, property_value, minifi_container_name):
__create_processor(context, processor_type, processor_name, property_name, property_value, minifi_container_name)
@given(
"a {processor_type} processor with the name \"{processor_name}\" and the \"{property_name}\" property set to \"{property_value}\" in the \"{minifi_container_name}\" flow using the \"{engine_name}\" engine")
def step_impl(context, processor_type, processor_name, property_name, property_value, minifi_container_name, engine_name):
__create_processor(context, processor_type, processor_name, property_name, property_value, minifi_container_name, engine_name)
@given("a {processor_type} processor with the \"{property_name}\" property set to \"{property_value}\" in a \"{minifi_container_name}\" flow")
@given("a {processor_type} processor with the \"{property_name}\" property set to \"{property_value}\" in the \"{minifi_container_name}\" flow")
def step_impl(context, processor_type, property_name, property_value, minifi_container_name):
__create_processor(context, processor_type, processor_type, property_name, property_value, minifi_container_name)
@given("a {processor_type} processor with the \"{property_name}\" property set to \"{property_value}\" in the \"{minifi_container_name}\" flow using the \"{engine_name}\" engine")
def step_impl(context, processor_type, property_name, property_value, minifi_container_name, engine_name):
__create_processor(context, processor_type, processor_type, property_name, property_value, minifi_container_name, engine_name)
@given("a {processor_type} processor with the \"{property_name}\" property set to \"{property_value}\"")
def step_impl(context, processor_type, property_name, property_value):
__create_processor(context, processor_type, processor_type, property_name, property_value, "minifi-cpp-flow")
@given("a {processor_type} processor with the name \"{processor_name}\" and the \"{property_name}\" property set to \"{property_value}\"")
def step_impl(context, processor_type, property_name, property_value, processor_name):
__create_processor(context, processor_type, processor_name, property_name, property_value, "minifi-cpp-flow")
@given("a {processor_type} processor with the name \"{processor_name}\" in the \"{minifi_container_name}\" flow")
def step_impl(context, processor_type, processor_name, minifi_container_name):
__create_processor(context, processor_type, processor_name, None, None, minifi_container_name)
@given("a {processor_type} processor with the name \"{processor_name}\" in the \"{minifi_container_name}\" flow using the \"{engine_name}\" engine")
def step_impl(context, processor_type, processor_name, minifi_container_name, engine_name):
__create_processor(context, processor_type, processor_name, None, None, minifi_container_name, engine_name)
@given("a {processor_type} processor with the name \"{processor_name}\"")
def step_impl(context, processor_type, processor_name):
__create_processor(context, processor_type, processor_name, None, None, "minifi-cpp-flow")
@given("a {processor_type} processor in the \"{minifi_container_name}\" flow")
@given("a {processor_type} processor in a \"{minifi_container_name}\" flow")
@given("a {processor_type} processor set up in a \"{minifi_container_name}\" flow")
def step_impl(context, processor_type, minifi_container_name):
__create_processor(context, processor_type, processor_type, None, None, minifi_container_name)
@given("a {processor_type} processor")
@given("a {processor_type} processor set up to communicate with an s3 server")
@given("a {processor_type} processor set up to communicate with the same s3 server")
@given("a {processor_type} processor set up to communicate with an Azure blob storage")
@given("a {processor_type} processor set up to communicate with a kafka broker instance")
@given("a {processor_type} processor set up to communicate with an MQTT broker instance")
@given("a {processor_type} processor set up to communicate with the Splunk HEC instance")
@given("a {processor_type} processor set up to communicate with the kinesis server")
def step_impl(context, processor_type):
__create_processor(context, processor_type, processor_type, None, None, "minifi-cpp-flow")
@given("a {processor_type} processor in a Kubernetes cluster")
@given("a {processor_type} processor in the Kubernetes cluster")
def step_impl(context, processor_type):
__create_processor(context, processor_type, processor_type, None, None, "kubernetes", "kubernetes")
@given("a set of processors in the \"{minifi_container_name}\" flow")
def step_impl(context, minifi_container_name):
container = context.test.acquire_container(context=context, name=minifi_container_name)
logging.info(context.table)
for row in context.table:
processor = locate("minifi.processors." + row["type"] + "." + row["type"])(context=context)
processor.set_name(row["name"])
processor.set_uuid(row["uuid"])
context.test.add_node(processor)
# Assume that the first node declared is primary unless specified otherwise
if not container.get_start_nodes():
container.add_start_node(processor)
@given("a set of processors")
def step_impl(context):
rendered_table = ModelDescriptor.describe_table(context.table, " ")
context.execute_steps("""given a set of processors in the \"{minifi_container_name}\" flow
{table}
""".format(minifi_container_name="minifi-cpp-flow", table=rendered_table))
@given("a RemoteProcessGroup node with name \"{rpg_name}\" is opened on \"{address}\" with transport protocol set to \"{transport_protocol}\"")
def step_impl(context, rpg_name, address, transport_protocol):
remote_process_group = RemoteProcessGroup(address, rpg_name, transport_protocol)
context.test.add_remote_process_group(remote_process_group)
@given("a RemoteProcessGroup node with name \"{rpg_name}\" is opened on \"{address}\"")
def step_impl(context, rpg_name, address):
context.execute_steps(f"given a RemoteProcessGroup node with name \"{rpg_name}\" is opened on \"{address}\" with transport protocol set to \"RAW\"")
@given("the \"{property_name}\" property of the {processor_name} processor is set to \"{property_value}\"")
def step_impl(context, property_name, processor_name, property_value):
processor = context.test.get_node_by_name(processor_name)
if property_value == "(not set)":
processor.unset_property(property_name)
else:
processor.set_property(property_name, property_value)
@given("the \"{property_name}\" property of the {controller_name} controller is set to \"{property_value}\"")
def step_impl(context, property_name, controller_name, property_value):
container = context.test.acquire_container(context=context, name="minifi-cpp-flow")
container.get_controller(controller_name).set_property(property_name, property_value)
@given("the \"{property_name}\" properties of the {processor_name_one} and {processor_name_two} processors are set to the same random guid")
def step_impl(context, property_name, processor_name_one, processor_name_two):
uuid_str = str(uuid.uuid4())
context.test.get_node_by_name(processor_name_one).set_property(property_name, uuid_str)
context.test.get_node_by_name(processor_name_two).set_property(property_name, uuid_str)
@given("the max concurrent tasks attribute of the {processor_name} processor is set to {max_concurrent_tasks:d}")
def step_impl(context, processor_name, max_concurrent_tasks):
processor = context.test.get_node_by_name(processor_name)
processor.set_max_concurrent_tasks(max_concurrent_tasks)
@given("the \"{property_name}\" property of the {processor_name} processor is set to match the attribute \"{attribute_key}\" to \"{attribute_value}\"")
def step_impl(context, property_name, processor_name, attribute_key, attribute_value):
processor = context.test.get_node_by_name(processor_name)
if attribute_value == "(not set)":
# Ignore filtering
processor.set_property(property_name, "true")
return
filtering = "${" + attribute_key + ":equals('" + attribute_value + "')}"
logging.info("Filter: \"%s\"", filtering)
logging.info("Key: \"%s\", value: \"%s\"", attribute_key, attribute_value)
processor.set_property(property_name, filtering)
@given("the scheduling period of the {processor_name} processor is set to \"{scheduling_period}\"")
def step_impl(context, processor_name, scheduling_period):
processor = context.test.get_node_by_name(processor_name)
processor.set_scheduling_strategy("TIMER_DRIVEN")
processor.set_scheduling_period(scheduling_period)
@given("these processor properties are set")
@given("these processor properties are set to match the http proxy")
def step_impl(context):
for row in context.table:
context.test.get_node_by_name(row["processor name"]).set_property(row["property name"], row["property value"])
@given("an input port with name \"{port_name}\" is created on the RemoteProcessGroup named \"{rpg_name}\"")
def step_impl(context, port_name, rpg_name):
remote_process_group = context.test.get_remote_process_group_by_name(rpg_name)
input_port_node = context.test.generate_input_port_for_remote_process_group(remote_process_group, port_name)
context.test.add_node(input_port_node)
@given("an input port using compression with name \"{port_name}\" is created on the RemoteProcessGroup named \"{rpg_name}\"")
def step_impl(context, port_name, rpg_name):
remote_process_group = context.test.get_remote_process_group_by_name(rpg_name)
input_port_node = context.test.generate_input_port_for_remote_process_group(remote_process_group, port_name, True)
context.test.add_node(input_port_node)
@given("an output port with name \"{port_name}\" is created on the RemoteProcessGroup named \"{rpg_name}\"")
def step_impl(context, port_name, rpg_name):
remote_process_group = context.test.get_remote_process_group_by_name(rpg_name)
input_port_node = context.test.generate_output_port_for_remote_process_group(remote_process_group, port_name)
context.test.add_node(input_port_node)
@given("an output port using compression with name \"{port_name}\" is created on the RemoteProcessGroup named \"{rpg_name}\"")
def step_impl(context, port_name, rpg_name):
remote_process_group = context.test.get_remote_process_group_by_name(rpg_name)
input_port_node = context.test.generate_output_port_for_remote_process_group(remote_process_group, port_name, True)
context.test.add_node(input_port_node)
@given("the output port \"{port_name}\" is connected to the {destination_name} processor")
def step_impl(context, port_name, destination_name):
destination = context.test.get_node_by_name(destination_name)
output_port_node = context.test.get_node_by_name(port_name)
output_port_node.out_proc.connect({"undefined": destination})
@given("the \"{relationship}\" relationship of the {source_name} is connected to the {destination_name}")
@given("the \"{relationship}\" relationship of the {source_name} processor is connected to the {destination_name}")
def step_impl(context, relationship, source_name, destination_name):
source = context.test.get_node_by_name(source_name)
destination = context.test.get_node_by_name(destination_name)
source.out_proc.connect({relationship: destination})
@given("the processors are connected up as described here")
def step_impl(context):
for row in context.table:
context.execute_steps(
"given the \"" + row["relationship name"] + "\" relationship of the " + row["source name"] + " processor is connected to the " + row["destination name"])
@given("the connection going to the RemoteProcessGroup has \"drop empty\" set")
def step_impl(context):
input_port = context.test.get_node_by_name("to_nifi")
input_port.drop_empty_flowfiles = True
@given("a file with the content \"{content}\" is present in \"{path}\"")
@given("a file with the content '{content}' is present in '{path}'")
@then("a file with the content \"{content}\" is placed in \"{path}\"")
def step_impl(context, content, path):
context.test.add_test_data(path, content)
@given("a file of size {size} is present in \"{path}\"")
def step_impl(context, size: str, path: str):
context.test.add_random_test_data(path, humanfriendly.parse_size(size))
@given("{number_of_files:d} files with the content \"{content}\" are present in \"{path}\"")
def step_impl(context, number_of_files, content, path):
for i in range(0, number_of_files):
context.test.add_test_data(path, content)
@given("an empty file is present in \"{path}\"")
def step_impl(context, path):
context.test.add_test_data(path, "")
@given("a file with filename \"{file_name}\" and content \"{content}\" is present in \"{path}\"")
def step_impl(context, file_name, content, path):
context.test.add_test_data(path, content, file_name)
@given("a Funnel with the name \"{funnel_name}\" is set up")
def step_impl(context, funnel_name):
funnel = Funnel(funnel_name)
context.test.add_node(funnel)
@given("the Funnel with the name \"{source_name}\" is connected to the {destination_name}")
def step_impl(context, source_name, destination_name):
source = context.test.get_or_create_node_by_name(source_name)
destination = context.test.get_or_create_node_by_name(destination_name)
source.out_proc.connect({'success': destination})
@given("\"{processor_name}\" processor is a start node")
@given("\"{processor_name}\" port is a start node")
def step_impl(context, processor_name):
container = context.test.acquire_container(context=context, name="minifi-cpp-flow")
processor = context.test.get_or_create_node_by_name(processor_name)
container.add_start_node(processor)
# NiFi setups
@given("a NiFi flow is receiving data from the RemoteProcessGroup named \"{rpg_name}\" in an input port named \"{input_port_name}\" which has the same id as the port named \"{rpg_port_name}\"")
def step_impl(context, input_port_name, rpg_port_name, rpg_name):
remote_process_group = context.test.get_remote_process_group_by_name(rpg_name)
source = context.test.generate_input_port_for_remote_process_group(remote_process_group, input_port_name)
source.instance_id = context.test.get_node_by_name(rpg_port_name).instance_id
context.test.add_node(source)
container = context.test.acquire_container(context=context, name='nifi', engine='nifi')
# Assume that the first node declared is primary unless specified otherwise
if not container.get_start_nodes():
container.add_start_node(source)
@given("a NiFi flow is sending data to an output port named \"{port_name}\" with the id of the port named \"{rpg_port_name}\" from the RemoteProcessGroup named \"{rpg_name}\"")
def step_impl(context, port_name, rpg_port_name, rpg_name):
remote_process_group = context.test.get_remote_process_group_by_name(rpg_name)
destination = context.test.generate_output_port_for_remote_process_group(remote_process_group, port_name)
destination.instance_id = context.test.get_node_by_name(rpg_port_name).instance_id
context.test.add_node(destination)
@given("a NiFi flow with the name \"{flow_name}\" is set up")
def step_impl(context, flow_name):
context.test.acquire_container(context=context, name=flow_name, engine='nifi')
@given("SSL is enabled in NiFi flow")
def step_impl(context):
context.test.enable_ssl_in_nifi()
@given("a transient MiNiFi flow with the name \"{flow_name}\" is set up")
def step_impl(context, flow_name):
context.test.acquire_transient_minifi(context=context, name=flow_name)
@given("the provenance repository is enabled in MiNiFi")
def step_impl(context):
context.test.enable_provenance_repository_in_minifi()
@given("C2 is enabled in MiNiFi")
def step_impl(context):
context.test.enable_c2_in_minifi()
@given("Prometheus is enabled in MiNiFi")
def step_impl(context):
context.test.enable_prometheus_in_minifi()
@given("log metrics publisher is enabled in MiNiFi")
def step_impl(context):
context.test.enable_log_metrics_publisher_in_minifi()
@given("Prometheus with SSL is enabled in MiNiFi")
def step_impl(context):
context.test.enable_prometheus_with_ssl_in_minifi()
@given("OpenSSL FIPS mode is enabled in MiNiFi")
def step_impl(context):
context.test.enable_openssl_fips_mode_in_minifi()
@given("OpenSSL FIPS mode is disabled in MiNiFi")
def step_impl(context):
context.test.disable_openssl_fips_mode_in_minifi()
# HTTP proxy setup
@given("the http proxy server is set up")
@given("a http proxy server is set up accordingly")
def step_impl(context):
context.test.acquire_container(context=context, name="http-proxy", engine="http-proxy")
# TLS
@given("an ssl context service is set up for {processor_name}")
@given("an ssl context service with a manual CA cert file is set up for {processor_name}")
def step_impl(context, processor_name):
ssl_context_service = SSLContextService(cert='/tmp/resources/minifi_client.crt',
key='/tmp/resources/minifi_client.key',
ca_cert='/tmp/resources/root_ca.crt')
processor = context.test.get_node_by_name(processor_name)
processor.controller_services.append(ssl_context_service)
processor.set_property('SSL Context Service', ssl_context_service.name)
@given("an ssl context service using the system CA cert store is set up for {processor_name}")
def step_impl(context, processor_name):
ssl_context_service = SSLContextService(cert='/tmp/resources/minifi_client.crt',
key='/tmp/resources/minifi_client.key',
use_system_cert_store='true')
processor = context.test.get_node_by_name(processor_name)
processor.controller_services.append(ssl_context_service)
processor.set_property('SSL Context Service', ssl_context_service.name)
# Record set reader and writer
@given("a JsonRecordSetWriter controller service is set up with \"{}\" output grouping in the \"{minifi_container_name}\" flow")
def step_impl(context, output_grouping: str, minifi_container_name: str):
json_record_set_writer = JsonRecordSetWriter(name="JsonRecordSetWriter", output_grouping=output_grouping)
container = context.test.acquire_container(context=context, name=minifi_container_name)
container.add_controller(json_record_set_writer)
@given("a JsonTreeReader controller service is set up in the \"{minifi_container_name}\" flow")
def step_impl(context, minifi_container_name: str):
json_record_set_reader = JsonTreeReader("JsonTreeReader")
container = context.test.acquire_container(context=context, name=minifi_container_name)
container.add_controller(json_record_set_reader)
@given("a JsonRecordSetWriter controller service is set up with \"{}\" output grouping")
def step_impl(context, output_grouping: str):
context.execute_steps(f"given a JsonRecordSetWriter controller service is set up with \"{output_grouping}\" output grouping in the \"minifi-cpp-flow\" flow")
@given("a JsonTreeReader controller service is set up")
def step_impl(context):
context.execute_steps("given a JsonTreeReader controller service is set up in the \"minifi-cpp-flow\" flow")
@given("a XMLReader controller service is set up")
def step_impl(context):
xml_reader = XMLReader("XMLReader")
container = context.test.acquire_container(context=context, name="minifi-cpp-flow")
container.add_controller(xml_reader)
@given("a XMLRecordSetWriter controller service is set up")
def step_impl(context):
xml_record_set_writer = XMLRecordSetWriter("XMLRecordSetWriter")
container = context.test.acquire_container(context=context, name="minifi-cpp-flow")
container.add_controller(xml_record_set_writer)
# Kubernetes
def __set_up_the_kubernetes_controller_service(context, processor_name, service_property_name, properties):
kubernetes_controller_service = KubernetesControllerService("Kubernetes Controller Service", properties)
processor = context.test.get_node_by_name(processor_name)
processor.controller_services.append(kubernetes_controller_service)
processor.set_property(service_property_name, kubernetes_controller_service.name)
@given("the {processor_name} processor has a {service_property_name} which is a Kubernetes Controller Service")
@given("the {processor_name} processor has an {service_property_name} which is a Kubernetes Controller Service")
def step_impl(context, processor_name, service_property_name):
__set_up_the_kubernetes_controller_service(context, processor_name, service_property_name, {})
@given("the {processor_name} processor has a {service_property_name} which is a Kubernetes Controller Service with the \"{property_name}\" property set to \"{property_value}\"")
@given("the {processor_name} processor has an {service_property_name} which is a Kubernetes Controller Service with the \"{property_name}\" property set to \"{property_value}\"")
def step_impl(context, processor_name, service_property_name, property_name, property_value):
__set_up_the_kubernetes_controller_service(context, processor_name, service_property_name, {property_name: property_value})
# MQTT setup
@when("an MQTT broker is set up in correspondence with the PublishMQTT")
@given("an MQTT broker is set up in correspondence with the PublishMQTT")
@given("an MQTT broker is set up in correspondence with the ConsumeMQTT")
@given("an MQTT broker is set up in correspondence with the PublishMQTT and ConsumeMQTT")
def step_impl(context):
context.test.acquire_container(context=context, name="mqtt-broker", engine="mqtt-broker")
context.test.start('mqtt-broker')
# s3 setup
@given("a s3 server is set up in correspondence with the PutS3Object")
@given("a s3 server is set up in correspondence with the DeleteS3Object")
def step_impl(context):
context.test.acquire_container(context=context, name="s3-server", engine="s3-server")
@given("a kinesis server is set up in correspondence with the PutKinesisStream")
def step_impl(context):
context.test.acquire_container(context=context, name="kinesis-server", engine="kinesis-server")
# azure storage setup
@given("an Azure storage server is set up")
def step_impl(context):
context.test.acquire_container(context=context, name="azure-storage-server", engine="azure-storage-server")
# syslog client
@given(u'a Syslog client with {protocol} protocol is setup to send logs to minifi')
def step_impl(context, protocol):
client_name = "syslog-" + protocol.lower() + "-client"
context.test.acquire_container(context=context, name=client_name, engine=client_name)
# google cloud storage setup
@given("a Google Cloud storage server is set up")
@given("a Google Cloud storage server is set up with some test data")
@given('a Google Cloud storage server is set up and a single object with contents "preloaded data" is present')
def step_impl(context):
context.test.acquire_container(context=context, name="fake-gcs-server", engine="fake-gcs-server")
# elasticsearch
@given('an Elasticsearch server is set up and running')
@given('an Elasticsearch server is set up and a single document is present with "preloaded_id" in "my_index"')
@given('an Elasticsearch server is set up and a single document is present with "preloaded_id" in "my_index" with "value1" in "field1"')
def step_impl(context):
context.test.start_elasticsearch(context)
context.test.create_doc_elasticsearch(context.test.get_container_name_with_postfix("elasticsearch"), "my_index", "preloaded_id")
# opensearch
@given('an Opensearch server is set up and running')
@given('an Opensearch server is set up and a single document is present with "preloaded_id" in "my_index"')
@given('an Opensearch server is set up and a single document is present with "preloaded_id" in "my_index" with "value1" in "field1"')
def step_impl(context):
context.test.start_opensearch(context)
context.test.add_elastic_user_to_opensearch(context.test.get_container_name_with_postfix("opensearch"))
context.test.create_doc_elasticsearch(context.test.get_container_name_with_postfix("opensearch"), "my_index", "preloaded_id")
def setUpSslContextServiceForProcessor(context, processor_name: str):
minifi_crt_file = '/tmp/resources/minifi_client.crt'
minifi_key_file = '/tmp/resources/minifi_client.key'
root_ca_crt_file = '/tmp/resources/root_ca.crt'
ssl_context_service = SSLContextService(cert=minifi_crt_file, ca_cert=root_ca_crt_file, key=minifi_key_file)
processor = context.test.get_node_by_name(processor_name)
processor.controller_services.append(ssl_context_service)
processor.set_property("SSL Context Service", ssl_context_service.name)
def setUpSslContextServiceForRPG(context, rpg_name: str):
minifi_crt_file = '/tmp/resources/minifi_client.crt'
minifi_key_file = '/tmp/resources/minifi_client.key'
root_ca_crt_file = '/tmp/resources/root_ca.crt'
ssl_context_service = SSLContextService(cert=minifi_crt_file, ca_cert=root_ca_crt_file, key=minifi_key_file)
container = context.test.acquire_container(context=context, name="minifi-cpp-flow")
container.add_controller(ssl_context_service)
rpg = context.test.get_remote_process_group_by_name(rpg_name)
rpg.add_property("SSL Context Service", ssl_context_service.name)
@given(u'a SSL context service is set up for PostElasticsearch and Elasticsearch')
def step_impl(context):
setUpSslContextServiceForProcessor(context, "PostElasticsearch")
@given(u'a SSL context service is set up for PostElasticsearch and Opensearch')
def step_impl(context):
root_ca_crt_file = '/tmp/resources/root_ca.crt'
ssl_context_service = SSLContextService(ca_cert=root_ca_crt_file)
post_elasticsearch_json = context.test.get_node_by_name("PostElasticsearch")
post_elasticsearch_json.controller_services.append(ssl_context_service)
post_elasticsearch_json.set_property("SSL Context Service", ssl_context_service.name)
@given(u'an ElasticsearchCredentialsService is set up for PostElasticsearch with Basic Authentication')
def step_impl(context):
elasticsearch_credential_service = ElasticsearchCredentialsService()
post_elasticsearch_json = context.test.get_node_by_name("PostElasticsearch")
post_elasticsearch_json.controller_services.append(elasticsearch_credential_service)
post_elasticsearch_json.set_property("Elasticsearch Credentials Provider Service", elasticsearch_credential_service.name)
@given(u'an ElasticsearchCredentialsService is set up for PostElasticsearch with ApiKey')
def step_impl(context):
api_key = context.test.elastic_generate_apikey("elasticsearch")
elasticsearch_credential_service = ElasticsearchCredentialsService(api_key)
post_elasticsearch_json = context.test.get_node_by_name("PostElasticsearch")
post_elasticsearch_json.controller_services.append(elasticsearch_credential_service)
post_elasticsearch_json.set_property("Elasticsearch Credentials Provider Service", elasticsearch_credential_service.name)
# splunk hec
@given("a Splunk HEC is set up and running")
def step_impl(context):
context.test.start_splunk(context)
# TCP client
@given('a TCP client is set up to send a test TCP message to minifi')
def step_impl(context):
context.test.acquire_container(context=context, name="tcp-client", engine="tcp-client")
@given("SSL is enabled for the Splunk HEC and the SSL context service is set up for PutSplunkHTTP and QuerySplunkIndexingStatus")
def step_impl(context):
minifi_crt_file = '/tmp/resources/minifi_client.crt'
minifi_key_file = '/tmp/resources/minifi_client.key'
root_ca_crt_file = '/tmp/resources/root_ca.crt'
ssl_context_service = SSLContextService(name='SSLContextService', cert=minifi_crt_file, ca_cert=root_ca_crt_file, key=minifi_key_file)
splunk_cert, splunk_key = make_server_cert(context.test.get_container_name_with_postfix("splunk"), context.root_ca_cert, context.root_ca_key)
put_splunk_http = context.test.get_node_by_name("PutSplunkHTTP")
put_splunk_http.controller_services.append(ssl_context_service)
put_splunk_http.set_property("SSL Context Service", ssl_context_service.name)
query_splunk_indexing_status = context.test.get_node_by_name("QuerySplunkIndexingStatus")
query_splunk_indexing_status.controller_services.append(ssl_context_service)
query_splunk_indexing_status.set_property("SSL Context Service", ssl_context_service.name)
context.test.enable_splunk_hec_ssl('splunk', OpenSSL.crypto.dump_certificate(OpenSSL.crypto.FILETYPE_PEM, splunk_cert), OpenSSL.crypto.dump_privatekey(OpenSSL.crypto.FILETYPE_PEM, splunk_key), OpenSSL.crypto.dump_certificate(OpenSSL.crypto.FILETYPE_PEM, context.root_ca_cert))
@given(u'the {processor_one} processor is set up with a GCPCredentialsControllerService to communicate with the Google Cloud storage server')
def step_impl(context, processor_one):
gcp_controller_service = GCPCredentialsControllerService(credentials_location="Use Anonymous credentials")
p1 = context.test.get_node_by_name(processor_one)
p1.controller_services.append(gcp_controller_service)
p1.set_property("GCP Credentials Provider Service", gcp_controller_service.name)
processor = context.test.get_node_by_name(processor_one)
processor.set_property("Endpoint Override URL", f"fake-gcs-server-{context.feature_id}:4443")
@given(u'the {processor_one} and the {processor_two} processors are set up with a GCPCredentialsControllerService to communicate with the Google Cloud storage server')
def step_impl(context, processor_one, processor_two):
gcp_controller_service = GCPCredentialsControllerService(credentials_location="Use Anonymous credentials")
p1 = context.test.get_node_by_name(processor_one)
p2 = context.test.get_node_by_name(processor_two)
p1.controller_services.append(gcp_controller_service)
p1.set_property("GCP Credentials Provider Service", gcp_controller_service.name)
p2.controller_services.append(gcp_controller_service)
p2.set_property("GCP Credentials Provider Service", gcp_controller_service.name)
processor_one = context.test.get_node_by_name(processor_one)
processor_one.set_property("Endpoint Override URL", f"fake-gcs-server-{context.feature_id}:4443")
processor_two = context.test.get_node_by_name(processor_two)
processor_two.set_property("Endpoint Override URL", f"fake-gcs-server-{context.feature_id}:4443")
# SQL
@given("an ODBCService is setup up for {processor_name} with the name \"{service_name}\"")
def step_impl(context, processor_name, service_name):
odbc_service = ODBCService(name=service_name,
connection_string="Driver={{PostgreSQL ANSI}};Server={server_hostname};Port=5432;Database=postgres;Uid=postgres;Pwd=password;".format(server_hostname=context.test.get_container_name_with_postfix("postgresql-server")))
processor = context.test.get_node_by_name(processor_name)
processor.controller_services.append(odbc_service)
processor.set_property("DB Controller Service", odbc_service.name)
@given("a PostgreSQL server is set up")
def step_impl(context):
context.test.enable_sql_in_minifi()
context.test.acquire_container(context=context, name="postgresql-server", engine="postgresql-server")
@when("the MiNiFi instance starts up")
@when("both instances start up")
@when("all instances start up")
@when("all other processes start up")
def step_impl(context):
context.test.start()
@when("\"{container_name}\" flow is stopped")
def step_impl(context, container_name):
context.test.stop(container_name)
@when("\"{container_name}\" flow is restarted")
def step_impl(context, container_name):
context.test.restart(container_name)
@then("\"{container_name}\" flow is stopped")
def step_impl(context, container_name):
context.test.stop(container_name)
@then("\"{container_name}\" flow is killed")
def step_impl(context, container_name):
context.test.kill(container_name)
@then("\"{container_name}\" flow is restarted")
def step_impl(context, container_name):
context.test.restart(container_name)
@when("\"{container_name}\" flow is started")
@then("\"{container_name}\" flow is started")
def step_impl(context, container_name):
context.test.start(container_name)
@then("{duration} later")
def step_impl(context, duration):
time.sleep(humanfriendly.parse_timespan(duration))
@when("content \"{content}\" is added to file \"{file_name}\" present in directory \"{path}\" {seconds:d} seconds later")
def step_impl(context, content, file_name, path, seconds):
time.sleep(seconds)
context.test.add_test_data(path, content, file_name)
@then("a flowfile with the content \"{content}\" is placed in the monitored directory in less than {duration}")
@then("a flowfile with the content '{content}' is placed in the monitored directory in less than {duration}")
@then("{number_of_flow_files:d} flowfiles with the content \"{content}\" are placed in the monitored directory in less than {duration}")
def step_impl(context, content, duration, number_of_flow_files=1):
context.test.check_for_multiple_files_generated(number_of_flow_files, humanfriendly.parse_timespan(duration), [content])
@then("a flowfile with the JSON content \"{content}\" is placed in the monitored directory in less than {duration}")
@then("a flowfile with the JSON content '{content}' is placed in the monitored directory in less than {duration}")
def step_impl(context, content, duration):
context.test.check_for_single_json_file_with_content_generated(content, humanfriendly.parse_timespan(duration))
@then("at least one flowfile's content match the following regex: \"{regex}\" in less than {duration}")
@then("at least one flowfile's content match the following regex: '{regex}' in less than {duration}")
def step_impl(context, regex: str, duration: str):
context.test.check_for_at_least_one_file_with_matching_content(regex, humanfriendly.parse_timespan(duration))
@then("at least one flowfile with the content \"{content}\" is placed in the monitored directory in less than {duration}")
@then("at least one flowfile with the content '{content}' is placed in the monitored directory in less than {duration}")
def step_impl(context, content, duration):
context.test.check_for_at_least_one_file_with_content_generated(content, humanfriendly.parse_timespan(duration))
@then("no files are placed in the monitored directory in {duration} of running time")
def step_impl(context, duration):
context.test.check_for_no_files_generated(humanfriendly.parse_timespan(duration))
@then("there is exactly {num_flowfiles} files in the monitored directory")
def step_impl(context, num_flowfiles):
context.test.check_for_num_files_generated(int(num_flowfiles), humanfriendly.parse_timespan("1"))
@then("{num_flowfiles} flowfiles are placed in the monitored directory in less than {duration}")
def step_impl(context, num_flowfiles, duration):
if num_flowfiles == 0:
context.execute_steps(f"no files are placed in the monitored directory in {duration} of running time")
return
context.test.check_for_num_files_generated(int(num_flowfiles), humanfriendly.parse_timespan(duration))
@then("at least one flowfile is placed in the monitored directory in less than {duration}")
def step_impl(context, duration):
context.test.check_for_num_file_range_generated_with_timeout(1, float('inf'), humanfriendly.parse_timespan(duration))
@then("at least one flowfile with minimum size of \"{size}\" is placed in the monitored directory in less than {duration}")
def step_impl(context, duration: str, size: str):
context.test.check_for_num_file_range_and_min_size_generated(1, float('inf'), humanfriendly.parse_size(size), humanfriendly.parse_timespan(duration))
@then("one flowfile with the contents \"{content}\" is placed in the monitored directory in less than {duration}")
def step_impl(context, content, duration):
context.test.check_for_multiple_files_generated(1, humanfriendly.parse_timespan(duration), [content])
@then("two flowfiles with the contents \"{content_1}\" and \"{content_2}\" are placed in the monitored directory in less than {duration}")
@then("two flowfiles with the contents '{content_1}' and '{content_2}' are placed in the monitored directory in less than {duration}")
def step_impl(context, content_1, content_2, duration):
context.test.check_for_multiple_files_generated(2, humanfriendly.parse_timespan(duration), [content_1, content_2])
@then("exactly these flowfiles are in the monitored directory in less than {duration}: \"\"")
def step_impl(context, duration):
context.execute_steps(f"Then no files are placed in the monitored directory in {duration} of running time")
@then("exactly these flowfiles are in the monitored directory's \"{subdir}\" subdirectory in less than {duration}: \"\"")
def step_impl(context, duration, subdir):
assert context.test.check_subdirectory(sub_directory=subdir, expected_contents=[], timeout=humanfriendly.parse_timespan(duration)) or context.test.cluster.log_app_output()
@then("exactly these flowfiles are in the monitored directory in less than {duration}: \"{contents}\"")
def step_impl(context, duration, contents):
contents_arr = contents.split(",")
context.test.check_for_multiple_files_generated(len(contents_arr), humanfriendly.parse_timespan(duration), contents_arr)
@then("exactly these flowfiles are in the monitored directory's \"{subdir}\" subdirectory in less than {duration}: \"{contents}\"")
def step_impl(context, duration, subdir, contents):
contents_arr = contents.split(",")
assert context.test.check_subdirectory(sub_directory=subdir, expected_contents=contents_arr, timeout=humanfriendly.parse_timespan(duration)) or context.test.cluster.log_app_output()
@then("flowfiles with these contents are placed in the monitored directory in less than {duration}: \"{contents}\"")
def step_impl(context, duration, contents):
contents_arr = contents.split(",")
context.test.check_for_multiple_files_generated(0, humanfriendly.parse_timespan(duration), contents_arr)
@then("after a wait of {duration}, at least {lower_bound:d} and at most {upper_bound:d} flowfiles are produced and placed in the monitored directory")
def step_impl(context, lower_bound, upper_bound, duration):
context.test.check_for_num_file_range_generated_after_wait(lower_bound, upper_bound, humanfriendly.parse_timespan(duration))
@then("{number_of_files:d} flowfiles are placed in the monitored directory in {duration}")
@then("{number_of_files:d} flowfile is placed in the monitored directory in {duration}")
def step_impl(context, number_of_files, duration):
context.test.check_for_multiple_files_generated(number_of_files, humanfriendly.parse_timespan(duration))
@then("at least one empty flowfile is placed in the monitored directory in less than {duration}")
def step_impl(context, duration):
context.test.check_for_an_empty_file_generated(humanfriendly.parse_timespan(duration))
@then("no errors were generated on the http-proxy regarding \"{url}\"")
def step_impl(context, url):
context.test.check_http_proxy_access('http-proxy', url)
@then("there is a record on the kinesis server with \"{record_data}\"")
def step_impl(context, record_data):
context.test.check_kinesis_server_record_data("kinesis-server", record_data)
@then("the object on the s3 server is \"{object_data}\"")
def step_impl(context, object_data):
context.test.check_s3_server_object_data("s3-server", object_data)
@then("the object on the s3 server is present and matches the original hash")
def step_impl(context):
context.test.check_s3_server_large_object_data("s3-server")
@then("the object content type on the s3 server is \"{content_type}\" and the object metadata matches use metadata")
def step_impl(context, content_type):
context.test.check_s3_server_object_metadata("s3-server", content_type)
@then("the object bucket on the s3 server is empty")
def step_impl(context):
context.test.check_empty_s3_bucket("s3-server")
# Azure
@when("test blob \"{blob_name}\" with the content \"{content}\" is created on Azure blob storage")
def step_impl(context, blob_name, content):
context.test.add_test_blob(blob_name, content, False)
@when("test blob \"{blob_name}\" with the content \"{content}\" and a snapshot is created on Azure blob storage")
def step_impl(context, blob_name, content):
context.test.add_test_blob(blob_name, content, True)
@when("test blob \"{blob_name}\" is created on Azure blob storage")
def step_impl(context, blob_name):
context.test.add_test_blob(blob_name, "", False)
@when("test blob \"{blob_name}\" is created on Azure blob storage with a snapshot")
def step_impl(context, blob_name):
context.test.add_test_blob(blob_name, "", True)
@then("the object on the Azure storage server is \"{object_data}\"")
def step_impl(context, object_data):
context.test.check_azure_storage_server_data("azure-storage-server", object_data)
@then("the Azure blob storage becomes empty in {timeout_seconds:d} seconds")
def step_impl(context, timeout_seconds):
context.test.check_azure_blob_storage_is_empty(timeout_seconds)
@then("the blob and snapshot count becomes {blob_and_snapshot_count:d} in {timeout_seconds:d} seconds")
def step_impl(context, blob_and_snapshot_count, timeout_seconds):
context.test.check_azure_blob_and_snapshot_count(blob_and_snapshot_count, timeout_seconds)
# SQL
@then("the query \"{query}\" returns {number_of_rows:d} rows in less than {timeout_seconds:d} seconds on the PostgreSQL server")
def step_impl(context, query: str, number_of_rows: int, timeout_seconds: int):
context.test.check_query_results(context.test.get_container_name_with_postfix("postgresql-server"), query, number_of_rows, timeout_seconds)
@then("the Minifi logs contain the following message: \"{log_message}\" in less than {duration}")
@then("the Minifi logs contain the following message: '{log_message}' in less than {duration}")
def step_impl(context, log_message, duration):
context.test.check_minifi_log_contents(log_message, humanfriendly.parse_timespan(duration))
@then("the Minifi logs contain the following message: \"{log_message}\" {count:d} times after {seconds:d} seconds")
def step_impl(context, log_message, count, seconds):
time.sleep(seconds)
context.test.check_minifi_log_contents(log_message, 1, count)
@then("the Minifi logs do not contain the following message: \"{log_message}\" after {seconds:d} seconds")
def step_impl(context, log_message, seconds):
context.test.check_minifi_log_does_not_contain(log_message, seconds)
@then("the Minifi logs match the following regex: \"{regex}\" in less than {duration}")
def step_impl(context, regex, duration):
context.test.check_minifi_log_matches_regex(regex, humanfriendly.parse_timespan(duration))
# MQTT
@then("the MQTT broker has a log line matching \"{log_pattern}\"")
def step_impl(context, log_pattern):
context.test.check_container_log_matches_regex('mqtt-broker', log_pattern, 60, count=1)
@then("the MQTT broker has {log_count} log lines matching \"{log_pattern}\"")
def step_impl(context, log_count, log_pattern):
context.test.check_container_log_matches_regex('mqtt-broker', log_pattern, 60, count=int(log_count))
@when("a test message \"{message}\" is published to the MQTT broker on topic \"{topic}\"")
def step_impl(context, message, topic):
context.test.publish_test_mqtt_message(topic, message)
@then("the \"{minifi_container_name}\" flow has a log line matching \"{log_pattern}\" in less than {duration}")
def step_impl(context, minifi_container_name, log_pattern, duration):
context.test.check_container_log_matches_regex(minifi_container_name, log_pattern, humanfriendly.parse_timespan(duration), count=1)
# Google Cloud Storage
@then('an object with the content \"{content}\" is present in the Google Cloud storage')
def step_imp(context, content):
context.test.check_google_cloud_storage("fake-gcs-server", content)
@then("the test bucket of Google Cloud Storage is empty")
def step_impl(context):
context.test.check_empty_gcs_bucket("fake-gcs-server")
# Splunk
@then('an event is registered in Splunk HEC with the content \"{content}\"')
def step_imp(context, content):
context.test.check_splunk_event("splunk", content)
@then('an event is registered in Splunk HEC with the content \"{content}\" with \"{source}\" set as source and \"{source_type}\" set as sourcetype and \"{host}\" set as host')
def step_imp(context, content, source, source_type, host):
attr = {"source": source, "sourcetype": source_type, "host": host}
context.test.check_splunk_event_with_attributes("splunk", content, attr)
# Prometheus
@given("a Prometheus server is set up")
def step_impl(context):
context.test.acquire_container(context=context, name="prometheus", engine="prometheus")
@given("a Prometheus server is set up with SSL")
def step_impl(context):
context.test.acquire_container(context=context, name="prometheus", engine="prometheus-ssl")
@then("\"{metric_class}\" are published to the Prometheus server in less than {timeout_seconds:d} seconds")
@then("\"{metric_class}\" is published to the Prometheus server in less than {timeout_seconds:d} seconds")
def step_impl(context, metric_class, timeout_seconds):
context.test.check_metric_class_on_prometheus(metric_class, timeout_seconds)
@then("\"{metric_class}\" processor metric is published to the Prometheus server in less than {timeout_seconds:d} seconds for \"{processor_name}\" processor")
def step_impl(context, metric_class, timeout_seconds, processor_name):
context.test.check_processor_metric_on_prometheus(metric_class, timeout_seconds, processor_name)
@then("all Prometheus metric types are only defined once")
def step_impl(context):
context.test.check_all_prometheus_metric_types_are_defined_once()
@then("Elasticsearch is empty")
def step_impl(context):
context.test.check_empty_elastic(context.test.get_container_name_with_postfix("elasticsearch"))
@then(u'Elasticsearch has a document with "{doc_id}" in "{index}" that has "{value}" set in "{field}"')
def step_impl(context, doc_id, index, value, field):
context.test.check_elastic_field_value(context.test.get_container_name_with_postfix("elasticsearch"), index_name=index, doc_id=doc_id, field_name=field, field_value=value)
@then("Opensearch is empty")
def step_impl(context):
context.test.check_empty_elastic(f"opensearch-{context.feature_id}")
@then(u'Opensearch has a document with "{doc_id}" in "{index}" that has "{value}" set in "{field}"')
def step_impl(context, doc_id, index, value, field):
context.test.check_elastic_field_value(f"opensearch-{context.feature_id}", index_name=index, doc_id=doc_id, field_name=field, field_value=value)
# MiNiFi C2 Server
@given("ssl properties are set up for MiNiFi C2 server")
def step_impl(context):
context.test.enable_c2_with_ssl_in_minifi()
context.test.set_ssl_context_properties_in_minifi()
@given("SSL properties are set in MiNiFi")
def step_impl(context):
context.test.set_ssl_context_properties_in_minifi()
@given(u'a MiNiFi C2 server is set up')
def step_impl(context):
context.test.acquire_container(context=context, name="minifi-c2-server", engine="minifi-c2-server")
@given(u'a MiNiFi C2 server is started')
def step_impl(context):
context.test.start_minifi_c2_server(context)
@then("the MiNiFi C2 server logs contain the following message: \"{log_message}\" in less than {duration}")
def step_impl(context, log_message, duration):
context.test.check_container_log_contents("minifi-c2-server", log_message, humanfriendly.parse_timespan(duration))
@then("the MiNiFi C2 SSL server logs contain the following message: \"{log_message}\" in less than {duration}")
def step_impl(context, log_message, duration):
context.test.check_container_log_contents("minifi-c2-server-ssl", log_message, humanfriendly.parse_timespan(duration))
@given(u'a MiNiFi C2 server is set up with SSL')
def step_impl(context):
context.test.acquire_container(context=context, name="minifi-c2-server", engine="minifi-c2-server-ssl")
@given(u'flow configuration path is set up in flow url property')
def step_impl(context):
context.test.acquire_container(context=context, name="minifi-cpp-flow", engine="minifi-cpp")
context.test.fetch_flow_config_from_c2_url_in_minifi()
# MiNiFi memory usage
@then(u'the peak memory usage of the agent is more than {size} in less than {duration}')
def step_impl(context, size: str, duration: str) -> None:
context.test.check_if_peak_memory_usage_exceeded(humanfriendly.parse_size(size), humanfriendly.parse_timespan(duration))
@then(u'the memory usage of the agent is less than {size} in less than {duration}')
def step_impl(context, size: str, duration: str) -> None:
context.test.check_if_memory_usage_is_below(humanfriendly.parse_size(size), humanfriendly.parse_timespan(duration))
@then(u'the memory usage of the agent decreases to {peak_usage_percent}% peak usage in less than {duration}')
def step_impl(context, peak_usage_percent: str, duration: str) -> None:
context.test.check_memory_usage_compared_to_peak(float(peak_usage_percent) * 0.01, humanfriendly.parse_timespan(duration))
@given(u'a MiNiFi CPP server with yaml config')
def step_impl(context):
context.test.set_yaml_in_minifi()
@given(u'a MiNiFi CPP server with json config')
def step_impl(context):
context.test.set_json_in_minifi()
# MiNiFi controller
@given(u'controller socket properties are set up')
def step_impl(context):
context.test.set_controller_socket_properties_in_minifi()
@when(u'MiNiFi config is updated through MiNiFi controller in the \"{minifi_container_name}\" flow')
def step_impl(context, minifi_container_name: str):
context.test.update_flow_config_through_controller(minifi_container_name)
@when(u'MiNiFi config is updated through MiNiFi controller')
def step_impl(context):
context.execute_steps(f"when MiNiFi config is updated through MiNiFi controller in the \"minifi-cpp-flow-{context.feature_id}\" flow")
@then(u'the updated config is persisted in the \"{minifi_container_name}\" flow')
def step_impl(context, minifi_container_name: str):
context.test.check_minifi_controller_updated_config_is_persisted(minifi_container_name)
@then(u'the updated config is persisted')
def step_impl(context):
context.execute_steps(f"then the updated config is persisted in the \"minifi-cpp-flow-{context.feature_id}\" flow")
@when(u'the {component} component is stopped through MiNiFi controller in the \"{minifi_container_name}\" flow')
def step_impl(context, minifi_container_name: str, component: str):
context.test.stop_component_through_controller(component, minifi_container_name)
@when(u'the {component} component is stopped through MiNiFi controller')
def step_impl(context, component: str):
context.execute_steps(f"when the {component} component is stopped through MiNiFi controller in the \"minifi-cpp-flow-{context.feature_id}\" flow")
@when(u'the {component} component is started through MiNiFi controller in the \"{minifi_container_name}\" flow')
def step_impl(context, minifi_container_name: str, component: str):
context.test.start_component_through_controller(component, minifi_container_name)
@when(u'the {component} component is started through MiNiFi controller')
def step_impl(context, component: str):
context.execute_steps(f"when the {component} component is started through MiNiFi controller in the \"minifi-cpp-flow-{context.feature_id}\" flow")
@then(u'the {component} component is not running in the \"{minifi_container_name}\" flow')
def step_impl(context, component: str, minifi_container_name: str):
context.test.check_component_not_running_through_controller(component, minifi_container_name)
@then(u'the {component} component is not running')
def step_impl(context, component: str):
context.execute_steps(f"then the {component} component is not running in the \"minifi-cpp-flow-{context.feature_id}\" flow")
@then(u'the {component} component is running in the \"{minifi_container_name}\" flow')
def step_impl(context, component: str, minifi_container_name: str):
context.test.check_component_running_through_controller(component, minifi_container_name)
@then(u'the {component} component is running')
def step_impl(context, component: str):
context.execute_steps(f"then the {component} component is running in the \"minifi-cpp-flow-{context.feature_id}\" flow")
@then(u'connection \"{connection}\" can be seen through MiNiFi controller in the \"{minifi_container_name}\" flow')
def step_impl(context, connection: str, minifi_container_name: str):
context.test.connection_found_through_controller(connection, minifi_container_name)
@then(u'connection \"{connection}\" can be seen through MiNiFi controller')
def step_impl(context, connection: str):
context.execute_steps(f"then connection \"{connection}\" can be seen through MiNiFi controller in the \"minifi-cpp-flow-{context.feature_id}\" flow")
@then(u'{connection_count:d} connections can be seen full through MiNiFi controller in the \"{minifi_container_name}\" flow')
def step_impl(context, connection_count: int, minifi_container_name: str):
context.test.check_connections_full_through_controller(connection_count, minifi_container_name)
@then(u'{connection_count:d} connections can be seen full through MiNiFi controller')
def step_impl(context, connection_count: int):
context.execute_steps(f"then {connection_count:d} connections can be seen full through MiNiFi controller in the \"minifi-cpp-flow-{context.feature_id}\" flow")
@then(u'connection \"{connection}\" has {size:d} size and {max_size:d} max size through MiNiFi controller in the \"{minifi_container_name}\" flow')
def step_impl(context, connection: str, size: int, max_size: int, minifi_container_name: str):
context.test.check_connection_size_through_controller(connection, size, max_size, minifi_container_name)
@then(u'connection \"{connection}\" has {size:d} size and {max_size:d} max size through MiNiFi controller')
def step_impl(context, connection: str, size: int, max_size: int):
context.execute_steps(f"then connection \"{connection}\" has {size:d} size and {max_size:d} max size through MiNiFi controller in the \"minifi-cpp-flow-{context.feature_id}\" flow")
@then(u'manifest can be retrieved through MiNiFi controller in the \"{minifi_container_name}\" flow')
def step_impl(context, minifi_container_name: str):
context.test.manifest_can_be_retrieved_through_minifi_controller(minifi_container_name)
@then(u'manifest can be retrieved through MiNiFi controller')
def step_impl(context):
context.execute_steps(f"then manifest can be retrieved through MiNiFi controller in the \"minifi-cpp-flow-{context.feature_id}\" flow")
@then(u'debug bundle can be retrieved through MiNiFi controller in the \"{minifi_container_name}\" flow')
def step_impl(context, minifi_container_name: str):
context.test.debug_bundle_can_be_retrieved_through_minifi_controller(minifi_container_name)
@then(u'debug bundle can be retrieved through MiNiFi controller')
def step_impl(context):
context.execute_steps(f"then debug bundle can be retrieved through MiNiFi controller in the \"minifi-cpp-flow-{context.feature_id}\" flow")
# Grafana Loki
@given("a Grafana Loki server is set up")
def step_impl(context):
context.test.acquire_container(context=context, name="grafana-loki-server", engine="grafana-loki-server")
@given("a Grafana Loki server with SSL is set up")
def step_impl(context):
context.test.enable_ssl_in_grafana_loki()
context.test.acquire_container(context=context, name="grafana-loki-server", engine="grafana-loki-server")
@given("a Grafana Loki server is set up with multi-tenancy enabled")
def step_impl(context):
context.test.enable_multi_tenancy_in_grafana_loki()
context.test.acquire_container(context=context, name="grafana-loki-server", engine="grafana-loki-server")
@then("\"{lines}\" lines are published to the Grafana Loki server in less than {timeout_seconds:d} seconds")
@then("\"{lines}\" line is published to the Grafana Loki server in less than {timeout_seconds:d} seconds")
def step_impl(context, lines: str, timeout_seconds: int):
context.test.check_lines_on_grafana_loki(lines.split(";"), timeout_seconds, False)
@then("\"{lines}\" lines are published to the \"{tenant_id}\" tenant on the Grafana Loki server in less than {timeout_seconds:d} seconds")
@then("\"{lines}\" line is published to the \"{tenant_id}\" tenant on the Grafana Loki server in less than {timeout_seconds:d} seconds")
def step_impl(context, lines: str, tenant_id: str, timeout_seconds: int):
context.test.check_lines_on_grafana_loki(lines.split(";"), timeout_seconds, False, tenant_id)
@then("\"{lines}\" lines are published using SSL to the Grafana Loki server in less than {timeout_seconds:d} seconds")
@then("\"{lines}\" line is published using SSL to the Grafana Loki server in less than {timeout_seconds:d} seconds")
def step_impl(context, lines: str, timeout_seconds: int):
context.test.check_lines_on_grafana_loki(lines.split(";"), timeout_seconds, True)
@given(u'a SSL context service is set up for Grafana Loki processor \"{processor_name}\"')
@given(u'a SSL context service is set up for the following processor: \"{processor_name}\"')
def step_impl(context, processor_name: str):
setUpSslContextServiceForProcessor(context, processor_name)
@given(u'a SSL context service is set up for the following remote process group: \"{remote_process_group}\"')
def step_impl(context, remote_process_group: str):
setUpSslContextServiceForRPG(context, remote_process_group)
# Nginx reverse proxy
@given(u'a reverse proxy is set up to forward requests to the Grafana Loki server')
def step_impl(context):
context.test.acquire_container(context=context, name="reverse-proxy", engine="reverse-proxy")
# Python
@given("python with langchain is installed on the MiNiFi agent {install_mode}")
def step_impl(context, install_mode):
if install_mode == "with required python packages":
context.test.use_nifi_python_processors_with_system_python_packages_installed_in_minifi()
elif install_mode == "with a pre-created virtualenv":
context.test.use_nifi_python_processors_with_virtualenv_in_minifi()
elif install_mode == "with a pre-created virtualenv containing the required python packages":
context.test.use_nifi_python_processors_with_virtualenv_packages_installed_in_minifi()
elif install_mode == "using inline defined Python dependencies to install packages":
context.test.remove_python_requirements_txt_in_minifi()
else:
raise Exception("Unknown python install mode.")
@given("python processors without dependencies are present on the MiNiFi agent")
def step_impl(context):
context.test.use_nifi_python_processors_without_dependencies_in_minifi()
@given("the example MiNiFi python processors are present")
def step_impl(context):
context.test.enable_example_minifi_python_processors()
@given("a non-sensitive parameter in the flow config called '{parameter_name}' with the value '{parameter_value}' in the parameter context '{parameter_context_name}'")
def step_impl(context, parameter_context_name, parameter_name, parameter_value):
container = context.test.acquire_container(context=context, name='minifi-cpp-flow', engine='minifi-cpp')
container.add_parameter_to_flow_config(parameter_context_name, parameter_name, parameter_value)
@given("parameter context name is set to '{parameter_context_name}'")
def step_impl(context, parameter_context_name):
container = context.test.acquire_container(context=context, name='minifi-cpp-flow', engine='minifi-cpp')
container.set_parameter_context_name(parameter_context_name)
# Modbus
@given(u'there is an accessible PLC with modbus enabled')
def step_impl(context):
context.test.acquire_container(context=context, name="diag-slave-tcp", engine="diag-slave-tcp")
context.test.start('diag-slave-tcp')
@given(u'PLC register has been set with {modbus_cmd} command')
def step_impl(context, modbus_cmd):
context.test.set_value_on_plc_with_modbus(context.test.get_container_name_with_postfix('diag-slave-tcp'), modbus_cmd)
# Couchbase
@when(u'a Couchbase server is started')
def step_impl(context):
context.test.start_couchbase_server(context)
@given("a CouchbaseClusterService is setup up with the name \"{service_name}\"")
def step_impl(context, service_name):
couchbase_cluster_controller_service = CouchbaseClusterService(
name=service_name,
connection_string="couchbase://{server_hostname}".format(server_hostname=context.test.get_container_name_with_postfix("couchbase-server")))
container = context.test.acquire_container(context=context, name="minifi-cpp-flow")
container.add_controller(couchbase_cluster_controller_service)
@given("a CouchbaseClusterService is set up up with SSL connection with the name \"{service_name}\"")
def step_impl(context, service_name):
ssl_context_service = SSLContextService(name="SSLContextService",
ca_cert='/tmp/resources/root_ca.crt')
container = context.test.acquire_container(context=context, name="minifi-cpp-flow")
container.add_controller(ssl_context_service)
couchbase_cluster_controller_service = CouchbaseClusterService(
name=service_name,
connection_string="couchbases://{server_hostname}".format(server_hostname=context.test.get_container_name_with_postfix("couchbase-server")),
ssl_context_service=ssl_context_service)
container.add_controller(couchbase_cluster_controller_service)
@then("a document with id \"{doc_id}\" in bucket \"{bucket_name}\" is present with data '{data}' of type \"{data_type}\" in Couchbase")
def step_impl(context, doc_id: str, bucket_name: str, data: str, data_type: str):
context.test.check_is_data_present_on_couchbase(doc_id, bucket_name, data, data_type)
@given("a CouchbaseClusterService is setup up using mTLS authentication with the name \"{service_name}\"")
def step_impl(context, service_name):
ssl_context_service = SSLContextService(name="SSLContextService",
cert='/tmp/resources/clientuser.crt',
key='/tmp/resources/clientuser.key',
ca_cert='/tmp/resources/root_ca.crt')
container = context.test.acquire_container(context=context, name="minifi-cpp-flow")
container.add_controller(ssl_context_service)
couchbase_cluster_controller_service = CouchbaseClusterService(
name=service_name,
connection_string="couchbases://{server_hostname}".format(server_hostname=context.test.get_container_name_with_postfix("couchbase-server")),
ssl_context_service=ssl_context_service)
container.add_controller(couchbase_cluster_controller_service)
@given("a LlamaCpp model is present on the MiNiFi host")
def step_impl(context):
context.test.llama_model_is_downloaded_in_minifi()
@when(u'NiFi is started')
def step_impl(context):
context.test.start_nifi(context)