| # Licensed to the Apache Software Foundation (ASF) under one or more |
| # contributor license agreements. See the NOTICE file distributed with |
| # this work for additional information regarding copyright ownership. |
| # The ASF licenses this file to You under the Apache License, Version 2.0 |
| # (the "License"); you may not use this file except in compliance with |
| # the License. You may obtain a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, software |
| # distributed under the License is distributed on an "AS IS" BASIS, |
| # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| # See the License for the specific language governing permissions and |
| # limitations under the License. |
| |
| |
| from minifi.core.FileSystemObserver import FileSystemObserver |
| from minifi.core.RemoteProcessGroup import RemoteProcessGroup |
| from minifi.core.SSL_cert_utils import gen_cert, rsa_gen_key_callback, make_ca, make_cert, dump_certificate, dump_privatekey |
| from minifi.core.Funnel import Funnel |
| |
| from minifi.controllers.SSLContextService import SSLContextService |
| from minifi.controllers.GCPCredentialsControllerService import GCPCredentialsControllerService |
| from minifi.controllers.ODBCService import ODBCService |
| from minifi.controllers.KubernetesControllerService import KubernetesControllerService |
| |
| from behave import given, then, when |
| from behave.model_describe import ModelDescriptor |
| from pydoc import locate |
| from pytimeparse.timeparse import timeparse |
| |
| import logging |
| import time |
| import uuid |
| import binascii |
| |
| from kafka import KafkaProducer |
| from confluent_kafka.admin import AdminClient, NewTopic |
| from confluent_kafka import Producer |
| import socket |
| import os |
| |
| |
| # Background |
| @given("the content of \"{directory}\" is monitored") |
| def step_impl(context, directory): |
| context.test.add_file_system_observer(FileSystemObserver(context.directory_bindings.docker_path_to_local_path(context.test_id, directory))) |
| |
| |
| def __create_processor(context, processor_type, processor_name, property_name, property_value, container_name, engine='minifi-cpp'): |
| container = context.test.acquire_container(container_name, engine) |
| processor = locate("minifi.processors." + processor_type + "." + processor_type)() |
| processor.set_name(processor_name) |
| if property_name is not None: |
| processor.set_property(property_name, property_value) |
| context.test.add_node(processor) |
| # Assume that the first node declared is primary unless specified otherwise |
| if not container.get_start_nodes(): |
| container.add_start_node(processor) |
| |
| |
| # MiNiFi cluster setups |
| @given("a {processor_type} processor with the name \"{processor_name}\" and the \"{property_name}\" property set to \"{property_value}\" in a \"{minifi_container_name}\" flow") |
| @given("a {processor_type} processor with the name \"{processor_name}\" and the \"{property_name}\" property set to \"{property_value}\" in the \"{minifi_container_name}\" flow") |
| def step_impl(context, processor_type, processor_name, property_name, property_value, minifi_container_name): |
| __create_processor(context, processor_type, processor_name, property_name, property_value, minifi_container_name) |
| |
| |
| @given("a {processor_type} processor with the name \"{processor_name}\" and the \"{property_name}\" property set to \"{property_value}\" in a \"{minifi_container_name}\" flow with engine \"{engine_name}\"") |
| @given("a {processor_type} processor with the name \"{processor_name}\" and the \"{property_name}\" property set to \"{property_value}\" in the \"{minifi_container_name}\" flow with engine \"{engine_name}\"") |
| def step_impl(context, processor_type, processor_name, property_name, property_value, minifi_container_name, engine_name): |
| __create_processor(context, processor_type, processor_name, property_name, property_value, minifi_container_name, engine_name) |
| |
| |
| @given("a {processor_type} processor with the \"{property_name}\" property set to \"{property_value}\" in a \"{minifi_container_name}\" flow") |
| @given("a {processor_type} processor with the \"{property_name}\" property set to \"{property_value}\" in the \"{minifi_container_name}\" flow") |
| def step_impl(context, processor_type, property_name, property_value, minifi_container_name): |
| __create_processor(context, processor_type, processor_type, property_name, property_value, minifi_container_name) |
| |
| |
| @given("a {processor_type} processor the \"{property_name}\" property set to \"{property_value}\" in the \"{minifi_container_name}\" flow with engine \"{engine_name}\"") |
| def step_impl(context, processor_type, property_name, property_value, minifi_container_name, engine_name): |
| __create_processor(context, processor_type, processor_type, property_name, property_value, minifi_container_name, engine_name) |
| |
| |
| @given("a {processor_type} processor with the \"{property_name}\" property set to \"{property_value}\"") |
| def step_impl(context, processor_type, property_name, property_value): |
| __create_processor(context, processor_type, processor_type, property_name, property_value, "minifi-cpp-flow") |
| |
| |
| @given("a {processor_type} processor with the name \"{processor_name}\" and the \"{property_name}\" property set to \"{property_value}\"") |
| def step_impl(context, processor_type, property_name, property_value, processor_name): |
| __create_processor(context, processor_type, processor_name, property_name, property_value, "minifi-cpp-flow") |
| |
| |
| @given("a {processor_type} processor with the name \"{processor_name}\" in the \"{minifi_container_name}\" flow") |
| def step_impl(context, processor_type, processor_name, minifi_container_name): |
| __create_processor(context, processor_type, processor_name, None, None, minifi_container_name) |
| |
| |
| @given("a {processor_type} processor with the name \"{processor_name}\" in the \"{minifi_container_name}\" flow with engine \"{engine_name}\"") |
| def step_impl(context, processor_type, processor_name, minifi_container_name, engine_name): |
| __create_processor(context, processor_type, processor_name, None, None, minifi_container_name, engine_name) |
| |
| |
| @given("a {processor_type} processor with the name \"{processor_name}\"") |
| def step_impl(context, processor_type, processor_name): |
| __create_processor(context, processor_type, processor_name, None, None, "minifi-cpp-flow") |
| |
| |
| @given("a {processor_type} processor in the \"{minifi_container_name}\" flow") |
| @given("a {processor_type} processor in a \"{minifi_container_name}\" flow") |
| @given("a {processor_type} processor set up in a \"{minifi_container_name}\" flow") |
| def step_impl(context, processor_type, minifi_container_name): |
| __create_processor(context, processor_type, processor_type, None, None, minifi_container_name) |
| |
| |
| @given("a {processor_type} processor") |
| @given("a {processor_type} processor set up to communicate with an s3 server") |
| @given("a {processor_type} processor set up to communicate with the same s3 server") |
| @given("a {processor_type} processor set up to communicate with an Azure blob storage") |
| @given("a {processor_type} processor set up to communicate with a kafka broker instance") |
| @given("a {processor_type} processor set up to communicate with an MQTT broker instance") |
| @given("a {processor_type} processor set up to communicate with the Splunk HEC instance") |
| def step_impl(context, processor_type): |
| __create_processor(context, processor_type, processor_type, None, None, "minifi-cpp-flow") |
| |
| |
| @given("a {processor_type} processor in a Kubernetes cluster") |
| @given("a {processor_type} processor in the Kubernetes cluster") |
| def step_impl(context, processor_type): |
| __create_processor(context, processor_type, processor_type, None, None, "kubernetes", "kubernetes") |
| |
| |
| @given("a set of processors in the \"{minifi_container_name}\" flow") |
| def step_impl(context, minifi_container_name): |
| container = context.test.acquire_container(minifi_container_name) |
| logging.info(context.table) |
| for row in context.table: |
| processor = locate("minifi.processors." + row["type"] + "." + row["type"])() |
| processor.set_name(row["name"]) |
| processor.set_uuid(row["uuid"]) |
| context.test.add_node(processor) |
| # Assume that the first node declared is primary unless specified otherwise |
| if not container.get_start_nodes(): |
| container.add_start_node(processor) |
| |
| |
| @given("a set of processors") |
| def step_impl(context): |
| rendered_table = ModelDescriptor.describe_table(context.table, " ") |
| context.execute_steps("""given a set of processors in the \"{minifi_container_name}\" flow |
| {table} |
| """.format(minifi_container_name="minifi-cpp-flow", table=rendered_table)) |
| |
| |
| @given("a RemoteProcessGroup node opened on \"{address}\"") |
| def step_impl(context, address): |
| remote_process_group = RemoteProcessGroup(address, "RemoteProcessGroup") |
| context.test.add_remote_process_group(remote_process_group) |
| |
| |
| @given("a kafka producer workflow publishing files placed in \"{directory}\" to a broker exactly once") |
| def step_impl(context, directory): |
| context.execute_steps(""" |
| given a GetFile processor with the \"Input Directory\" property set to \"{directory}\" |
| and the \"Keep Source File\" property of the GetFile processor is set to \"false\" |
| and a PublishKafka processor set up to communicate with a kafka broker instance |
| and the "success" relationship of the GetFile processor is connected to the PublishKafka""".format(directory=directory)) |
| |
| |
| @given("the \"{property_name}\" property of the {processor_name} processor is set to \"{property_value}\"") |
| def step_impl(context, property_name, processor_name, property_value): |
| processor = context.test.get_node_by_name(processor_name) |
| if property_value == "(not set)": |
| processor.unset_property(property_name) |
| else: |
| processor.set_property(property_name, property_value) |
| |
| |
| @given("the \"{property_name}\" properties of the {processor_name_one} and {processor_name_two} processors are set to the same random guid") |
| def step_impl(context, property_name, processor_name_one, processor_name_two): |
| uuid_str = str(uuid.uuid4()) |
| context.test.get_node_by_name(processor_name_one).set_property(property_name, uuid_str) |
| context.test.get_node_by_name(processor_name_two).set_property(property_name, uuid_str) |
| |
| |
| @given("the max concurrent tasks attribute of the {processor_name} processor is set to {max_concurrent_tasks:d}") |
| def step_impl(context, processor_name, max_concurrent_tasks): |
| processor = context.test.get_node_by_name(processor_name) |
| processor.set_max_concurrent_tasks(max_concurrent_tasks) |
| |
| |
| @given("the \"{property_name}\" property of the {processor_name} processor is set to match {key_attribute_encoding} encoded kafka message key \"{message_key}\"") |
| def step_impl(context, property_name, processor_name, key_attribute_encoding, message_key): |
| encoded_key = "" |
| if(key_attribute_encoding.lower() == "hex"): |
| # Hex is presented upper-case to be in sync with NiFi |
| encoded_key = binascii.hexlify(message_key.encode("utf-8")).upper() |
| elif(key_attribute_encoding.lower() == "(not set)"): |
| encoded_key = message_key.encode("utf-8") |
| else: |
| encoded_key = message_key.encode(key_attribute_encoding) |
| logging.info("%s processor is set up to match encoded key \"%s\"", processor_name, encoded_key) |
| filtering = "${kafka.key:equals('" + encoded_key.decode("utf-8") + "')}" |
| logging.info("Filter: \"%s\"", filtering) |
| processor = context.test.get_node_by_name(processor_name) |
| processor.set_property(property_name, filtering) |
| |
| |
| @given("the \"{property_name}\" property of the {processor_name} processor is set to match the attribute \"{attribute_key}\" to \"{attribute_value}\"") |
| def step_impl(context, property_name, processor_name, attribute_key, attribute_value): |
| processor = context.test.get_node_by_name(processor_name) |
| if attribute_value == "(not set)": |
| # Ignore filtering |
| processor.set_property(property_name, "true") |
| return |
| filtering = "${" + attribute_key + ":equals('" + attribute_value + "')}" |
| logging.info("Filter: \"%s\"", filtering) |
| logging.info("Key: \"%s\", value: \"%s\"", attribute_key, attribute_value) |
| processor.set_property(property_name, filtering) |
| |
| |
| @given("the scheduling period of the {processor_name} processor is set to \"{sceduling_period}\"") |
| def step_impl(context, processor_name, sceduling_period): |
| processor = context.test.get_node_by_name(processor_name) |
| processor.set_scheduling_strategy("TIMER_DRIVEN") |
| processor.set_scheduling_period(sceduling_period) |
| |
| |
| @given("these processor properties are set") |
| @given("these processor properties are set to match the http proxy") |
| def step_impl(context): |
| for row in context.table: |
| context.test.get_node_by_name(row["processor name"]).set_property(row["property name"], row["property value"]) |
| |
| |
| @given("the \"{relationship}\" relationship of the {source_name} processor is connected to the input port on the {remote_process_group_name}") |
| def step_impl(context, relationship, source_name, remote_process_group_name): |
| source = context.test.get_node_by_name(source_name) |
| remote_process_group = context.test.get_remote_process_group_by_name(remote_process_group_name) |
| input_port_node = context.test.generate_input_port_for_remote_process_group(remote_process_group, "to_nifi") |
| context.test.add_node(input_port_node) |
| source.out_proc.connect({relationship: input_port_node}) |
| |
| |
| @given("the \"{relationship}\" relationship of the {source_name} is connected to the {destination_name}") |
| @given("the \"{relationship}\" relationship of the {source_name} processor is connected to the {destination_name}") |
| def step_impl(context, relationship, source_name, destination_name): |
| source = context.test.get_node_by_name(source_name) |
| destination = context.test.get_node_by_name(destination_name) |
| source.out_proc.connect({relationship: destination}) |
| |
| |
| @given("the processors are connected up as described here") |
| def step_impl(context): |
| for row in context.table: |
| context.execute_steps( |
| "given the \"" + row["relationship name"] + "\" relationship of the " + row["source name"] + " processor is connected to the " + row["destination name"]) |
| |
| |
| @given("the connection going to the RemoteProcessGroup has \"drop empty\" set") |
| def step_impl(context): |
| input_port = context.test.get_node_by_name("to_nifi") |
| input_port.drop_empty_flowfiles = True |
| |
| |
| @given("a file with the content \"{content}\" is present in \"{path}\"") |
| def step_impl(context, content, path): |
| context.test.add_test_data(path, content) |
| |
| |
| @given("an empty file is present in \"{path}\"") |
| def step_impl(context, path): |
| context.test.add_test_data(path, "") |
| |
| |
| @given("a file with filename \"{file_name}\" and content \"{content}\" is present in \"{path}\"") |
| def step_impl(context, file_name, content, path): |
| context.test.add_test_data(path, content, file_name) |
| |
| |
| @given("a Funnel with the name \"{funnel_name}\" is set up") |
| def step_impl(context, funnel_name): |
| funnel = Funnel(funnel_name) |
| context.test.add_node(funnel) |
| |
| |
| @given("the Funnel with the name \"{source_name}\" is connected to the {destination_name}") |
| def step_impl(context, source_name, destination_name): |
| source = context.test.get_or_create_node_by_name(source_name) |
| destination = context.test.get_or_create_node_by_name(destination_name) |
| source.out_proc.connect({'success': destination}) |
| |
| |
| @given("\"{processor_name}\" processor is a start node") |
| def step_impl(context, processor_name): |
| container = context.test.acquire_container("minifi-cpp-flow") |
| processor = context.test.get_or_create_node_by_name(processor_name) |
| container.add_start_node(processor) |
| |
| |
| # NiFi setups |
| @given("a NiFi flow receiving data from a RemoteProcessGroup \"{source_name}\" on port 8080") |
| def step_impl(context, source_name): |
| remote_process_group = context.test.get_remote_process_group_by_name("RemoteProcessGroup") |
| source = context.test.generate_input_port_for_remote_process_group(remote_process_group, source_name) |
| context.test.add_node(source) |
| container = context.test.acquire_container('nifi', 'nifi') |
| # Assume that the first node declared is primary unless specified otherwise |
| if not container.get_start_nodes(): |
| container.add_start_node(source) |
| |
| |
| @given("a NiFi flow with the name \"{flow_name}\" is set up") |
| def step_impl(context, flow_name): |
| context.test.acquire_container(flow_name, 'nifi') |
| |
| |
| # HTTP proxy setup |
| @given("the http proxy server is set up") |
| @given("a http proxy server is set up accordingly") |
| def step_impl(context): |
| context.test.acquire_container("http-proxy", "http-proxy") |
| |
| |
| # TLS |
| @given("an ssl context service set up for {producer_name} and {consumer_name}") |
| def step_impl(context, producer_name, consumer_name): |
| cert, key = gen_cert() |
| crt_file = '/tmp/resources/test-crt.pem' |
| ssl_context_service = SSLContextService(cert=crt_file, ca_cert=crt_file) |
| context.test.put_test_resource('test-crt.pem', cert.as_pem() + key.as_pem(None, rsa_gen_key_callback)) |
| producer = context.test.get_node_by_name(producer_name) |
| producer.controller_services.append(ssl_context_service) |
| producer.set_property("SSL Context Service", ssl_context_service.name) |
| consumer = context.test.get_node_by_name(consumer_name) |
| consumer.set_property("SSL Certificate", crt_file) |
| consumer.set_property("SSL Verify Peer", "no") |
| |
| |
| @given("an ssl context service set up for {producer_name}") |
| def step_impl(context, producer_name): |
| ssl_context_service = SSLContextService(cert="/tmp/resources/certs/client_LMN_client.pem", ca_cert="/tmp/resources/certs/ca-cert", key="/tmp/resources/certs/client_LMN_client.key", passphrase="abcdefgh") |
| producer = context.test.get_node_by_name(producer_name) |
| producer.controller_services.append(ssl_context_service) |
| producer.set_property("SSL Context Service", ssl_context_service.name) |
| |
| |
| # Kubernetes |
| def __set_up_the_kubernetes_controller_service(context, processor_name, service_property_name, properties): |
| kubernetes_controller_service = KubernetesControllerService("Kubernetes Controller Service", properties) |
| processor = context.test.get_node_by_name(processor_name) |
| processor.controller_services.append(kubernetes_controller_service) |
| processor.set_property(service_property_name, kubernetes_controller_service.name) |
| |
| |
| @given("the {processor_name} processor has a {service_property_name} which is a Kubernetes Controller Service") |
| @given("the {processor_name} processor has an {service_property_name} which is a Kubernetes Controller Service") |
| def step_impl(context, processor_name, service_property_name): |
| __set_up_the_kubernetes_controller_service(context, processor_name, service_property_name, {}) |
| |
| |
| @given("the {processor_name} processor has a {service_property_name} which is a Kubernetes Controller Service with the \"{property_name}\" property set to \"{property_value}\"") |
| @given("the {processor_name} processor has an {service_property_name} which is a Kubernetes Controller Service with the \"{property_name}\" property set to \"{property_value}\"") |
| def step_impl(context, processor_name, service_property_name, property_name, property_value): |
| __set_up_the_kubernetes_controller_service(context, processor_name, service_property_name, {property_name: property_value}) |
| |
| |
| # Kafka setup |
| @given("a kafka broker is set up in correspondence with the PublishKafka") |
| @given("a kafka broker is set up in correspondence with the third-party kafka publisher") |
| @given("a kafka broker is set up in correspondence with the publisher flow") |
| def step_impl(context): |
| context.test.acquire_container("kafka-broker", "kafka-broker") |
| |
| |
| # MQTT setup |
| @given("an MQTT broker is set up in correspondence with the PublishMQTT") |
| @given("an MQTT broker is set up in correspondence with the PublishMQTT and ConsumeMQTT") |
| def step_impl(context): |
| context.test.acquire_container("mqtt-broker", "mqtt-broker") |
| |
| |
| # s3 setup |
| @given("a s3 server is set up in correspondence with the PutS3Object") |
| @given("a s3 server is set up in correspondence with the DeleteS3Object") |
| def step_impl(context): |
| context.test.acquire_container("s3-server", "s3-server") |
| |
| |
| # azure storage setup |
| @given("an Azure storage server is set up") |
| def step_impl(context): |
| context.test.acquire_container("azure-storage-server", "azure-storage-server") |
| |
| |
| # syslog client |
| @given(u'a Syslog client with {protocol} protocol is setup to send logs to minifi') |
| def step_impl(context, protocol): |
| client_name = "syslog-" + protocol.lower() + "-client" |
| context.test.acquire_container(client_name, client_name) |
| |
| |
| # google cloud storage setup |
| @given("a Google Cloud storage server is set up") |
| @given("a Google Cloud storage server is set up with some test data") |
| @given('a Google Cloud storage server is set up and a single object with contents "preloaded data" is present') |
| def step_impl(context): |
| context.test.acquire_container("fake-gcs-server", "fake-gcs-server") |
| |
| |
| # splunk hec |
| @given("a Splunk HEC is set up and running") |
| def step_impl(context): |
| context.test.start_splunk() |
| |
| |
| @given("SSL is enabled for the Splunk HEC and the SSL context service is set up for PutSplunkHTTP and QuerySplunkIndexingStatus") |
| def step_impl(context): |
| root_ca_cert, root_ca_key = make_ca("root CA") |
| minifi_cert, minifi_key = make_cert("minifi-cpp-flow", root_ca_cert, root_ca_key) |
| splunk_cert, splunk_key = make_cert("splunk", root_ca_cert, root_ca_key) |
| minifi_crt_file = '/tmp/resources/minifi-cpp-flow.pem' |
| minifi_key_file = '/tmp/resources/minifi-cpp-flow.key' |
| root_ca_crt_file = '/tmp/resources/root_ca.pem' |
| ssl_context_service = SSLContextService(cert=minifi_crt_file, ca_cert=root_ca_crt_file, key=minifi_key_file) |
| context.test.put_test_resource('minifi-cpp-flow.pem', dump_certificate(minifi_cert)) |
| context.test.put_test_resource('minifi-cpp-flow.key', dump_privatekey(minifi_key)) |
| context.test.put_test_resource('root_ca.pem', dump_certificate(root_ca_cert)) |
| |
| put_splunk_http = context.test.get_node_by_name("PutSplunkHTTP") |
| put_splunk_http.controller_services.append(ssl_context_service) |
| put_splunk_http.set_property("SSL Context Service", ssl_context_service.name) |
| query_splunk_indexing_status = context.test.get_node_by_name("QuerySplunkIndexingStatus") |
| query_splunk_indexing_status.controller_services.append(ssl_context_service) |
| query_splunk_indexing_status.set_property("SSL Context Service", ssl_context_service.name) |
| context.test.cluster.enable_splunk_hec_ssl('splunk', dump_certificate(splunk_cert), dump_privatekey(splunk_key), dump_certificate(root_ca_cert)) |
| |
| |
| @given(u'the {processor_one} processor is set up with a GCPCredentialsControllerService to communicate with the Google Cloud storage server') |
| def step_impl(context, processor_one): |
| gcp_controller_service = GCPCredentialsControllerService(credentials_location="Use Anonymous credentials") |
| p1 = context.test.get_node_by_name(processor_one) |
| p1.controller_services.append(gcp_controller_service) |
| p1.set_property("GCP Credentials Provider Service", gcp_controller_service.name) |
| |
| |
| @given(u'the {processor_one} and the {processor_two} processors are set up with a GCPCredentialsControllerService to communicate with the Google Cloud storage server') |
| def step_impl(context, processor_one, processor_two): |
| gcp_controller_service = GCPCredentialsControllerService(credentials_location="Use Anonymous credentials") |
| p1 = context.test.get_node_by_name(processor_one) |
| p2 = context.test.get_node_by_name(processor_two) |
| p1.controller_services.append(gcp_controller_service) |
| p1.set_property("GCP Credentials Provider Service", gcp_controller_service.name) |
| p2.controller_services.append(gcp_controller_service) |
| p2.set_property("GCP Credentials Provider Service", gcp_controller_service.name) |
| |
| |
| @given("the kafka broker is started") |
| def step_impl(context): |
| context.test.start_kafka_broker() |
| |
| |
| @given("the topic \"{topic_name}\" is initialized on the kafka broker") |
| def step_impl(context, topic_name): |
| admin = AdminClient({'bootstrap.servers': "localhost:29092"}) |
| new_topics = [NewTopic(topic_name, num_partitions=3, replication_factor=1)] |
| futures = admin.create_topics(new_topics) |
| # Block until the topic is created |
| for topic, future in futures.items(): |
| try: |
| future.result() |
| print("Topic {} created".format(topic)) |
| except Exception as e: |
| print("Failed to create topic {}: {}".format(topic, e)) |
| |
| |
| # SQL |
| @given("an ODBCService is setup up for {processor_name} with the name \"{service_name}\"") |
| def step_impl(context, processor_name, service_name): |
| odbc_service = ODBCService(name=service_name, connection_string="Driver={PostgreSQL ANSI};Server=postgresql-server;Port=5432;Database=postgres;Uid=postgres;Pwd=password;") |
| processor = context.test.get_node_by_name(processor_name) |
| processor.controller_services.append(odbc_service) |
| processor.set_property("DB Controller Service", odbc_service.name) |
| |
| |
| @given("a PostgreSQL server is set up") |
| def step_impl(context): |
| context.test.acquire_container("postgresql-server", "postgresql-server") |
| |
| |
| # OPC UA |
| @given("an OPC UA server is set up") |
| def step_impl(context): |
| context.test.acquire_container("opcua-server", "opcua-server") |
| |
| |
| @given("an OPC UA server is set up with access control") |
| def step_impl(context): |
| context.test.acquire_container("opcua-server", "opcua-server", ["/opt/open62541/examples/access_control_server"]) |
| |
| |
| @when("the MiNiFi instance starts up") |
| @when("both instances start up") |
| @when("all instances start up") |
| @when("all other processes start up") |
| def step_impl(context): |
| context.test.start() |
| |
| |
| @when("content \"{content}\" is added to file \"{file_name}\" present in directory \"{path}\" {seconds:d} seconds later") |
| def step_impl(context, content, file_name, path, seconds): |
| time.sleep(seconds) |
| context.test.add_test_data(path, content, file_name) |
| |
| |
| # Kafka |
| def delivery_report(err, msg): |
| if err is not None: |
| logging.info('Message delivery failed: {}'.format(err)) |
| else: |
| logging.info('Message delivered to {} [{}]'.format(msg.topic(), msg.partition())) |
| |
| |
| @when("a message with content \"{content}\" is published to the \"{topic_name}\" topic") |
| def step_impl(context, content, topic_name): |
| producer = Producer({"bootstrap.servers": "localhost:29092", "client.id": socket.gethostname()}) |
| producer.produce(topic_name, content.encode("utf-8"), callback=delivery_report) |
| producer.flush(10) |
| |
| |
| @when("a message with content \"{content}\" is published to the \"{topic_name}\" topic using an ssl connection") |
| def step_impl(context, content, topic_name): |
| test_dir = os.environ['TEST_DIRECTORY'] # Based on DockerVerify.sh |
| producer = Producer({ |
| "bootstrap.servers": "localhost:29093", |
| "security.protocol": "ssl", |
| "ssl.ca.location": os.path.join(test_dir, "resources/kafka_broker/conf/certs/ca-cert"), |
| "ssl.certificate.location": os.path.join(test_dir, "resources/kafka_broker/conf/certs/client_LMN_client.pem"), |
| "ssl.key.location": os.path.join(test_dir, "resources/kafka_broker/conf/certs/client_LMN_client.key"), |
| "ssl.key.password": "abcdefgh", |
| "client.id": socket.gethostname()}) |
| producer.produce(topic_name, content.encode("utf-8"), callback=delivery_report) |
| producer.flush(10) |
| |
| |
| # Used for testing transactional message consumption |
| @when("the publisher performs a {transaction_type} transaction publishing to the \"{topic_name}\" topic these messages: {messages}") |
| def step_impl(context, transaction_type, topic_name, messages): |
| producer = Producer({"bootstrap.servers": "localhost:29092", "transactional.id": "1001"}) |
| producer.init_transactions() |
| logging.info("Transaction type: %s", transaction_type) |
| logging.info("Messages: %s", messages.split(", ")) |
| if transaction_type == "SINGLE_COMMITTED_TRANSACTION": |
| producer.begin_transaction() |
| for content in messages.split(", "): |
| producer.produce(topic_name, content.encode("utf-8"), callback=delivery_report) |
| producer.commit_transaction() |
| producer.flush(10) |
| elif transaction_type == "TWO_SEPARATE_TRANSACTIONS": |
| for content in messages.split(", "): |
| producer.begin_transaction() |
| producer.produce(topic_name, content.encode("utf-8"), callback=delivery_report) |
| producer.commit_transaction() |
| producer.flush(10) |
| elif transaction_type == "NON_COMMITTED_TRANSACTION": |
| producer.begin_transaction() |
| for content in messages.split(", "): |
| producer.produce(topic_name, content.encode("utf-8"), callback=delivery_report) |
| producer.flush(10) |
| elif transaction_type == "CANCELLED_TRANSACTION": |
| producer.begin_transaction() |
| for content in messages.split(", "): |
| producer.produce(topic_name, content.encode("utf-8"), callback=delivery_report) |
| producer.flush(10) |
| producer.abort_transaction() |
| else: |
| raise Exception("Unknown transaction type.") |
| |
| |
| @when("a message with content \"{content}\" is published to the \"{topic_name}\" topic with key \"{message_key}\"") |
| def step_impl(context, content, topic_name, message_key): |
| producer = Producer({"bootstrap.servers": "localhost:29092", "client.id": socket.gethostname()}) |
| # Asynchronously produce a message, the delivery report callback |
| # will be triggered from poll() above, or flush() below, when the message has |
| # been successfully delivered or failed permanently. |
| producer.produce(topic_name, content.encode("utf-8"), callback=delivery_report, key=message_key.encode("utf-8")) |
| # Wait for any outstanding messages to be delivered and delivery report |
| # callbacks to be triggered. |
| producer.flush(10) |
| |
| |
| @when("{number_of_messages} kafka messages are sent to the topic \"{topic_name}\"") |
| def step_impl(context, number_of_messages, topic_name): |
| producer = Producer({"bootstrap.servers": "localhost:29092", "client.id": socket.gethostname()}) |
| for i in range(0, int(number_of_messages)): |
| producer.produce(topic_name, str(uuid.uuid4()).encode("utf-8")) |
| producer.flush(10) |
| |
| |
| @when("a message with content \"{content}\" is published to the \"{topic_name}\" topic with headers \"{semicolon_separated_headers}\"") |
| def step_impl(context, content, topic_name, semicolon_separated_headers): |
| # Confluent kafka does not support multiple headers with same key, another API must be used here. |
| headers = [] |
| for header in semicolon_separated_headers.split(";"): |
| kv = header.split(":") |
| headers.append((kv[0].strip(), kv[1].strip().encode("utf-8"))) |
| producer = KafkaProducer(bootstrap_servers='localhost:29092') |
| future = producer.send(topic_name, content.encode("utf-8"), headers=headers) |
| assert future.get(timeout=60) |
| |
| |
| @when("the Kafka consumer is registered in kafka broker") |
| def step_impl(context): |
| context.test.wait_for_kafka_consumer_to_be_registered("kafka-broker") |
| # After the consumer is registered there is still some time needed for consumer-broker synchronization |
| # Unfortunately there are no additional log messages that could be checked for this |
| time.sleep(2) |
| |
| |
| @then("a flowfile with the content \"{content}\" is placed in the monitored directory in less than {duration}") |
| @then("a flowfile with the content '{content}' is placed in the monitored directory in less than {duration}") |
| def step_impl(context, content, duration): |
| context.test.check_for_single_file_with_content_generated(content, timeparse(duration)) |
| |
| |
| @then("a flowfile with the JSON content \"{content}\" is placed in the monitored directory in less than {duration}") |
| @then("a flowfile with the JSON content '{content}' is placed in the monitored directory in less than {duration}") |
| def step_impl(context, content, duration): |
| context.test.check_for_single_json_file_with_content_generated(content, timeparse(duration)) |
| |
| |
| @then("at least one flowfile with the content \"{content}\" is placed in the monitored directory in less than {duration}") |
| @then("at least one flowfile with the content '{content}' is placed in the monitored directory in less than {duration}") |
| def step_impl(context, content, duration): |
| context.test.check_for_at_least_one_file_with_content_generated(content, timeparse(duration)) |
| |
| |
| @then("{num_flowfiles} flowfiles are placed in the monitored directory in less than {duration}") |
| def step_impl(context, num_flowfiles, duration): |
| if num_flowfiles == 0: |
| context.execute_steps("""no files are placed in the monitored directory in {duration} of running time""".format(duration=duration)) |
| return |
| context.test.check_for_num_files_generated(int(num_flowfiles), timeparse(duration)) |
| |
| |
| @then("at least one flowfile is placed in the monitored directory in less than {duration}") |
| def step_impl(context, duration): |
| context.test.check_for_num_file_range_generated(1, float('inf'), timeparse(duration)) |
| |
| |
| @then("one flowfile with the contents \"{content}\" is placed in the monitored directory in less than {duration}") |
| def step_impl(context, content, duration): |
| context.test.check_for_multiple_files_generated(1, timeparse(duration), [content]) |
| |
| |
| @then("two flowfiles with the contents \"{content_1}\" and \"{content_2}\" are placed in the monitored directory in less than {duration}") |
| def step_impl(context, content_1, content_2, duration): |
| context.test.check_for_multiple_files_generated(2, timeparse(duration), [content_1, content_2]) |
| |
| |
| @then("flowfiles with these contents are placed in the monitored directory in less than {duration}: \"{contents}\"") |
| def step_impl(context, duration, contents): |
| contents_arr = contents.split(",") |
| context.test.check_for_multiple_files_generated(0, timeparse(duration), contents_arr) |
| |
| |
| @then("after a wait of {duration}, at least {lower_bound:d} and at most {upper_bound:d} flowfiles are produced and placed in the monitored directory") |
| def step_impl(context, lower_bound, upper_bound, duration): |
| context.test.check_for_num_file_range_generated(lower_bound, upper_bound, timeparse(duration)) |
| |
| |
| @then("{number_of_files:d} flowfiles are placed in the monitored directory in {duration}") |
| @then("{number_of_files:d} flowfile is placed in the monitored directory in {duration}") |
| def step_impl(context, number_of_files, duration): |
| context.test.check_for_multiple_files_generated(number_of_files, timeparse(duration)) |
| |
| |
| @then("at least one empty flowfile is placed in the monitored directory in less than {duration}") |
| def step_impl(context, duration): |
| context.test.check_for_an_empty_file_generated(timeparse(duration)) |
| |
| |
| @then("no files are placed in the monitored directory in {duration} of running time") |
| def step_impl(context, duration): |
| context.test.check_for_no_files_generated(timeparse(duration)) |
| |
| |
| @then("no errors were generated on the http-proxy regarding \"{url}\"") |
| def step_impl(context, url): |
| context.test.check_http_proxy_access('http-proxy', url) |
| |
| |
| @then("the object on the s3 server is \"{object_data}\"") |
| def step_impl(context, object_data): |
| context.test.check_s3_server_object_data("s3-server", object_data) |
| |
| |
| @then("the object content type on the s3 server is \"{content_type}\" and the object metadata matches use metadata") |
| def step_impl(context, content_type): |
| context.test.check_s3_server_object_metadata("s3-server", content_type) |
| |
| |
| @then("the object bucket on the s3 server is empty") |
| def step_impl(context): |
| context.test.check_empty_s3_bucket("s3-server") |
| |
| |
| # Azure |
| @when("test blob \"{blob_name}\" with the content \"{content}\" is created on Azure blob storage") |
| def step_impl(context, blob_name, content): |
| context.test.add_test_blob(blob_name, content, False) |
| |
| |
| @when("test blob \"{blob_name}\" with the content \"{content}\" and a snapshot is created on Azure blob storage") |
| def step_impl(context, blob_name, content): |
| context.test.add_test_blob(blob_name, content, True) |
| |
| |
| @when("test blob \"{blob_name}\" is created on Azure blob storage") |
| def step_impl(context, blob_name): |
| context.test.add_test_blob(blob_name, "", False) |
| |
| |
| @when("test blob \"{blob_name}\" is created on Azure blob storage with a snapshot") |
| def step_impl(context, blob_name): |
| context.test.add_test_blob(blob_name, "", True) |
| |
| |
| @then("the object on the Azure storage server is \"{object_data}\"") |
| def step_impl(context, object_data): |
| context.test.check_azure_storage_server_data("azure-storage-server", object_data) |
| |
| |
| @then("the Azure blob storage becomes empty in {timeout_seconds:d} seconds") |
| def step_impl(context, timeout_seconds): |
| context.test.check_azure_blob_storage_is_empty(timeout_seconds) |
| |
| |
| @then("the blob and snapshot count becomes {blob_and_snapshot_count:d} in {timeout_seconds:d} seconds") |
| def step_impl(context, blob_and_snapshot_count, timeout_seconds): |
| context.test.check_azure_blob_and_snapshot_count(blob_and_snapshot_count, timeout_seconds) |
| |
| |
| # SQL |
| @then("the query \"{query}\" returns {number_of_rows:d} rows in less than {timeout_seconds:d} seconds on the PostgreSQL server") |
| def step_impl(context, query, number_of_rows, timeout_seconds): |
| context.test.check_query_results("postgresql-server", query, number_of_rows, timeout_seconds) |
| |
| |
| @then("the Minifi logs contain the following message: \"{log_message}\" in less than {duration}") |
| def step_impl(context, log_message, duration): |
| context.test.check_minifi_log_contents(log_message, timeparse(duration)) |
| |
| |
| @then("the Minifi logs contain the following message: \"{log_message}\" {count:d} times after {seconds:d} seconds") |
| def step_impl(context, log_message, count, seconds): |
| time.sleep(seconds) |
| context.test.check_minifi_log_contents(log_message, 1, count) |
| |
| |
| @then("the Minifi logs do not contain the following message: \"{log_message}\" after {seconds:d} seconds") |
| def step_impl(context, log_message, seconds): |
| context.test.check_minifi_log_does_not_contain(log_message, seconds) |
| |
| |
| @then("the Minifi logs match the following regex: \"{regex}\" in less than {duration}") |
| def step_impl(context, regex, duration): |
| context.test.check_minifi_log_matches_regex(regex, timeparse(duration)) |
| |
| |
| @then("the OPC UA server logs contain the following message: \"{log_message}\" in less than {duration}") |
| def step_impl(context, log_message, duration): |
| context.test.check_container_log_contents("opcua-server", log_message, timeparse(duration)) |
| |
| |
| # MQTT |
| @then("the MQTT broker has a log line matching \"{log_pattern}\"") |
| def step_impl(context, log_pattern): |
| context.test.check_container_log_matches_regex('mqtt-broker', log_pattern, 30, count=1) |
| |
| |
| @then("the \"{minifi_container_name}\" flow has a log line matching \"{log_pattern}\" in less than {duration}") |
| def step_impl(context, minifi_container_name, log_pattern, duration): |
| context.test.check_container_log_matches_regex(minifi_container_name, log_pattern, timeparse(duration), count=1) |
| |
| |
| @then("an MQTT broker is deployed in correspondence with the PublishMQTT") |
| def step_impl(context): |
| context.test.acquire_container("mqtt-broker", "mqtt-broker") |
| context.test.start() |
| |
| |
| # Google Cloud Storage |
| @then('an object with the content \"{content}\" is present in the Google Cloud storage') |
| def step_imp(context, content): |
| context.test.check_google_cloud_storage("fake-gcs-server", content) |
| |
| |
| @then("the test bucket of Google Cloud Storage is empty") |
| def step_impl(context): |
| context.test.check_empty_gcs_bucket("fake-gcs-server") |
| |
| |
| # Splunk |
| @then('an event is registered in Splunk HEC with the content \"{content}\"') |
| def step_imp(context, content): |
| context.test.check_splunk_event("splunk", content) |
| |
| |
| @then('an event is registered in Splunk HEC with the content \"{content}\" with \"{source}\" set as source and \"{source_type}\" set as sourcetype and \"{host}\" set as host') |
| def step_imp(context, content, source, source_type, host): |
| attr = {"source": source, "sourcetype": source_type, "host": host} |
| context.test.check_splunk_event_with_attributes("splunk", content, attr) |