| # Licensed to the Apache Software Foundation (ASF) under one or more |
| # contributor license agreements. See the NOTICE file distributed with |
| # this work for additional information regarding copyright ownership. |
| # The ASF licenses this file to You under the Apache License, Version 2.0 |
| # (the "License"); you may not use this file except in compliance with |
| # the License. You may obtain a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, software |
| # distributed under the License is distributed on an "AS IS" BASIS, |
| # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| # See the License for the specific language governing permissions and |
| # limitations under the License. |
| |
| |
| import logging |
| import time |
| import uuid |
| |
| from pydoc import locate |
| |
| from minifi.core.InputPort import InputPort |
| |
| from cluster.DockerTestCluster import DockerTestCluster |
| |
| from minifi.validators.EmptyFilesOutPutValidator import EmptyFilesOutPutValidator |
| from minifi.validators.NoFileOutPutValidator import NoFileOutPutValidator |
| from minifi.validators.SingleFileOutputValidator import SingleFileOutputValidator |
| from minifi.validators.MultiFileOutputValidator import MultiFileOutputValidator |
| from minifi.validators.SingleOrMultiFileOutputValidator import SingleOrMultiFileOutputValidator |
| from minifi.validators.SingleOrMultiFileOutputRegexValidator import SingleOrMultiFileOutputRegexValidator |
| from minifi.validators.NoContentCheckFileNumberValidator import NoContentCheckFileNumberValidator |
| from minifi.validators.NumFileRangeValidator import NumFileRangeValidator |
| from minifi.validators.SingleJSONFileOutputValidator import SingleJSONFileOutputValidator |
| from utils import decode_escaped_str, get_minifi_pid, get_peak_memory_usage |
| |
| |
| class MiNiFi_integration_test: |
| def __init__(self, context): |
| self.test_id = context.test_id |
| self.cluster = DockerTestCluster(context) |
| |
| self.connectable_nodes = [] |
| # Remote process groups are not connectables |
| self.remote_process_groups = [] |
| self.file_system_observer = None |
| |
| self.docker_directory_bindings = context.directory_bindings |
| self.cluster.set_directory_bindings(self.docker_directory_bindings.get_directory_bindings(self.test_id), self.docker_directory_bindings.get_data_directories(self.test_id)) |
| |
| def cleanup(self): |
| self.cluster.cleanup() |
| |
| def acquire_container(self, name, engine='minifi-cpp', command=None): |
| return self.cluster.acquire_container(name, engine, command) |
| |
| def start_kafka_broker(self): |
| self.cluster.acquire_container('kafka-broker', 'kafka-broker') |
| self.cluster.deploy_container('zookeeper') |
| self.cluster.deploy_container('kafka-broker') |
| assert self.cluster.wait_for_container_startup_to_finish('kafka-broker') |
| |
| def start_splunk(self): |
| self.cluster.acquire_container('splunk', 'splunk') |
| self.cluster.deploy_container('splunk') |
| assert self.cluster.wait_for_container_startup_to_finish('splunk') |
| assert self.cluster.enable_splunk_hec_indexer('splunk', 'splunk_hec_token') |
| |
| def start_elasticsearch(self): |
| self.cluster.acquire_container('elasticsearch', 'elasticsearch') |
| self.cluster.deploy_container('elasticsearch') |
| assert self.cluster.wait_for_container_startup_to_finish('elasticsearch') |
| |
| def start_opensearch(self): |
| self.cluster.acquire_container('opensearch', 'opensearch') |
| self.cluster.deploy_container('opensearch') |
| assert self.cluster.wait_for_container_startup_to_finish('opensearch') |
| |
| def start_minifi_c2_server(self): |
| self.cluster.acquire_container("minifi-c2-server", "minifi-c2-server") |
| self.cluster.deploy_container('minifi-c2-server') |
| assert self.cluster.wait_for_container_startup_to_finish('minifi-c2-server') |
| |
| def start(self, container_name=None): |
| if container_name is not None: |
| logging.info("Starting container %s", container_name) |
| self.cluster.deploy_container(container_name) |
| assert self.cluster.wait_for_container_startup_to_finish(container_name) |
| return |
| logging.info("MiNiFi_integration_test start") |
| self.cluster.deploy_all() |
| assert self.cluster.wait_for_all_containers_to_finish_startup() |
| |
| def stop(self, container_name): |
| logging.info("Stopping container %s", container_name) |
| self.cluster.stop_container(container_name) |
| |
| def kill(self, container_name): |
| logging.info("Killing container %s", container_name) |
| self.cluster.kill_container(container_name) |
| |
| def restart(self, container_name): |
| logging.info("Restarting container %s", container_name) |
| self.cluster.restart_container(container_name) |
| |
| def add_node(self, processor): |
| if processor.get_name() in (elem.get_name() for elem in self.connectable_nodes): |
| raise Exception("Trying to register processor with an already registered name: \"%s\"" % processor.get_name()) |
| self.connectable_nodes.append(processor) |
| |
| def get_or_create_node_by_name(self, node_name): |
| node = self.get_node_by_name(node_name) |
| if node is None: |
| if node_name == "RemoteProcessGroup": |
| raise Exception("Trying to register RemoteProcessGroup without an input port or address.") |
| node = locate("minifi.processors." + node_name + "." + node_name)() |
| node.set_name(node_name) |
| self.add_node(node) |
| return node |
| |
| def get_node_by_name(self, name): |
| for node in self.connectable_nodes: |
| if name == node.get_name(): |
| return node |
| raise Exception("Trying to fetch unknown node: \"%s\"" % name) |
| |
| def add_remote_process_group(self, remote_process_group): |
| if remote_process_group.get_name() in (elem.get_name() for elem in self.remote_process_groups): |
| raise Exception("Trying to register remote_process_group with an already registered name: \"%s\"" % remote_process_group.get_name()) |
| self.remote_process_groups.append(remote_process_group) |
| |
| def get_remote_process_group_by_name(self, name): |
| for node in self.remote_process_groups: |
| if name == node.get_name(): |
| return node |
| raise Exception("Trying to fetch unknow node: \"%s\"" % name) |
| |
| @staticmethod |
| def generate_input_port_for_remote_process_group(remote_process_group, name): |
| input_port_node = InputPort(name, remote_process_group) |
| # Generate an MD5 hash unique to the remote process group id |
| input_port_node.set_uuid(uuid.uuid3(remote_process_group.get_uuid(), "input_port")) |
| return input_port_node |
| |
| def add_test_data(self, path, test_data, file_name=None): |
| if file_name is None: |
| file_name = str(uuid.uuid4()) |
| test_data = decode_escaped_str(test_data) |
| self.docker_directory_bindings.put_file_to_docker_path(self.test_id, path, file_name, test_data.encode('utf-8')) |
| |
| def put_test_resource(self, file_name, contents): |
| self.docker_directory_bindings.put_test_resource(self.test_id, file_name, contents) |
| |
| def rm_out_child(self): |
| self.docker_directory_bindings.rm_out_child(self.test_id) |
| |
| def add_file_system_observer(self, file_system_observer): |
| self.file_system_observer = file_system_observer |
| |
| def check_for_no_files_generated(self, wait_time_in_seconds): |
| output_validator = NoFileOutPutValidator() |
| output_validator.set_output_dir(self.file_system_observer.get_output_dir()) |
| self.__check_output_after_time_period(wait_time_in_seconds, output_validator) |
| |
| def check_for_single_file_with_content_generated(self, content, timeout_seconds): |
| output_validator = SingleFileOutputValidator(decode_escaped_str(content)) |
| output_validator.set_output_dir(self.file_system_observer.get_output_dir()) |
| self.__check_output(timeout_seconds, output_validator, 1) |
| |
| def check_for_single_json_file_with_content_generated(self, content, timeout_seconds): |
| output_validator = SingleJSONFileOutputValidator(content) |
| output_validator.set_output_dir(self.file_system_observer.get_output_dir()) |
| self.__check_output(timeout_seconds, output_validator, 1) |
| |
| def check_for_multiple_files_generated(self, file_count, timeout_seconds, expected_content=[]): |
| output_validator = MultiFileOutputValidator(file_count, [decode_escaped_str(content) for content in expected_content]) |
| output_validator.set_output_dir(self.file_system_observer.get_output_dir()) |
| self.__check_output(timeout_seconds, output_validator, file_count) |
| |
| def check_for_at_least_one_file_with_matching_content(self, regex, timeout_seconds): |
| output_validator = SingleOrMultiFileOutputRegexValidator(regex) |
| output_validator.set_output_dir(self.file_system_observer.get_output_dir()) |
| self.__check_output(timeout_seconds, output_validator) |
| |
| def check_for_at_least_one_file_with_content_generated(self, content, timeout_seconds): |
| output_validator = SingleOrMultiFileOutputValidator(decode_escaped_str(content)) |
| output_validator.set_output_dir(self.file_system_observer.get_output_dir()) |
| self.__check_output(timeout_seconds, output_validator) |
| |
| def check_for_num_files_generated(self, num_flowfiles, timeout_seconds): |
| output_validator = NoContentCheckFileNumberValidator(num_flowfiles) |
| output_validator.set_output_dir(self.file_system_observer.get_output_dir()) |
| self.__check_output(timeout_seconds, output_validator, max(1, num_flowfiles)) |
| |
| def check_for_num_file_range_generated(self, min_files, max_files, wait_time_in_seconds): |
| output_validator = NumFileRangeValidator(min_files, max_files) |
| output_validator.set_output_dir(self.file_system_observer.get_output_dir()) |
| self.__check_output_after_time_period(wait_time_in_seconds, output_validator) |
| |
| def check_for_an_empty_file_generated(self, timeout_seconds): |
| output_validator = EmptyFilesOutPutValidator() |
| output_validator.set_output_dir(self.file_system_observer.get_output_dir()) |
| self.__check_output(timeout_seconds, output_validator, 1) |
| |
| def __check_output_after_time_period(self, wait_time_in_seconds, output_validator): |
| time.sleep(wait_time_in_seconds) |
| self.__validate(output_validator) |
| |
| def __check_output(self, timeout_seconds, output_validator, max_files=0): |
| result = self.file_system_observer.validate_output(timeout_seconds, output_validator, max_files) |
| assert not self.cluster.segfault_happened() or self.cluster.log_app_output() |
| assert result or self.cluster.log_app_output() |
| |
| def __validate(self, validator): |
| assert not self.cluster.segfault_happened() or self.cluster.log_app_output() |
| assert validator.validate() or self.cluster.log_app_output() |
| |
| def check_s3_server_object_data(self, s3_container_name, object_data): |
| assert self.cluster.check_s3_server_object_data(s3_container_name, object_data) or self.cluster.log_app_output() |
| |
| def check_s3_server_object_metadata(self, s3_container_name, content_type): |
| assert self.cluster.check_s3_server_object_metadata(s3_container_name, content_type) or self.cluster.log_app_output() |
| |
| def check_empty_s3_bucket(self, s3_container_name): |
| assert self.cluster.is_s3_bucket_empty(s3_container_name) or self.cluster.log_app_output() |
| |
| def check_http_proxy_access(self, http_proxy_container_name, url): |
| assert self.cluster.check_http_proxy_access(http_proxy_container_name, url) or self.cluster.log_app_output() |
| |
| def check_azure_storage_server_data(self, azure_container_name, object_data): |
| assert self.cluster.check_azure_storage_server_data(azure_container_name, object_data) or self.cluster.log_app_output() |
| |
| def wait_for_kafka_consumer_to_be_registered(self, kafka_container_name): |
| assert self.cluster.wait_for_kafka_consumer_to_be_registered(kafka_container_name) or self.cluster.log_app_output() |
| |
| def check_splunk_event(self, splunk_container_name, query): |
| assert self.cluster.check_splunk_event(splunk_container_name, query) or self.cluster.log_app_output() |
| |
| def check_splunk_event_with_attributes(self, splunk_container_name, query, attributes): |
| assert self.cluster.check_splunk_event_with_attributes(splunk_container_name, query, attributes) or self.cluster.log_app_output() |
| |
| def check_google_cloud_storage(self, gcs_container_name, content): |
| assert self.cluster.check_google_cloud_storage(gcs_container_name, content) or self.cluster.log_app_output() |
| |
| def check_empty_gcs_bucket(self, gcs_container_name): |
| assert self.cluster.is_gcs_bucket_empty(gcs_container_name) or self.cluster.log_app_output() |
| |
| def check_empty_elastic(self, elastic_container_name): |
| assert self.cluster.is_elasticsearch_empty(elastic_container_name) or self.cluster.log_app_output() |
| |
| def elastic_generate_apikey(self, elastic_container_name): |
| return self.cluster.elastic_generate_apikey(elastic_container_name) or self.cluster.log_app_output() |
| |
| def create_doc_elasticsearch(self, elastic_container_name, index_name, doc_id): |
| assert self.cluster.create_doc_elasticsearch(elastic_container_name, index_name, doc_id) or self.cluster.log_app_output() |
| |
| def check_elastic_field_value(self, elastic_container_name, index_name, doc_id, field_name, field_value): |
| assert self.cluster.check_elastic_field_value(elastic_container_name, index_name, doc_id, field_name, field_value) or self.cluster.log_app_output() |
| |
| def add_elastic_user_to_opensearch(self, container_name): |
| assert self.cluster.add_elastic_user_to_opensearch(container_name) or self.cluster.log_app_output() |
| |
| def check_minifi_log_contents(self, line, timeout_seconds=60, count=1): |
| self.check_container_log_contents("minifi-cpp", line, timeout_seconds, count) |
| |
| def check_minifi_log_matches_regex(self, regex, timeout_seconds=60, count=1): |
| assert self.cluster.check_minifi_log_matches_regex(regex, timeout_seconds, count) or self.cluster.log_app_output() |
| |
| def check_container_log_contents(self, container_engine, line, timeout_seconds=60, count=1): |
| assert self.cluster.check_container_log_contents(container_engine, line, timeout_seconds, count) or self.cluster.log_app_output() |
| |
| def check_minifi_log_does_not_contain(self, line, wait_time_seconds): |
| assert self.cluster.check_minifi_log_does_not_contain(line, wait_time_seconds) or self.cluster.log_app_output() |
| |
| def check_query_results(self, postgresql_container_name, query, number_of_rows, timeout_seconds): |
| assert self.cluster.check_query_results(postgresql_container_name, query, number_of_rows, timeout_seconds) or self.cluster.log_app_output() |
| |
| def check_container_log_matches_regex(self, container_name, log_pattern, timeout_seconds, count=1): |
| assert self.cluster.wait_for_app_logs_regex(container_name, log_pattern, timeout_seconds, count) or self.cluster.log_app_output() |
| |
| def add_test_blob(self, blob_name, content, with_snapshot): |
| self.cluster.add_test_blob(blob_name, content, with_snapshot) |
| |
| def check_azure_blob_storage_is_empty(self, timeout_seconds): |
| assert self.cluster.check_azure_blob_storage_is_empty(timeout_seconds) or self.cluster.log_app_output() |
| |
| def check_azure_blob_and_snapshot_count(self, blob_and_snapshot_count, timeout_seconds): |
| assert self.cluster.check_azure_blob_and_snapshot_count(blob_and_snapshot_count, timeout_seconds) or self.cluster.log_app_output() |
| |
| def check_metric_class_on_prometheus(self, metric_class, timeout_seconds): |
| assert self.cluster.wait_for_metric_class_on_prometheus(metric_class, timeout_seconds) or self.cluster.log_app_output() |
| |
| def check_processor_metric_on_prometheus(self, metric_class, timeout_seconds, processor_name): |
| assert self.cluster.wait_for_processor_metric_on_prometheus(metric_class, timeout_seconds, processor_name) or self.cluster.log_app_output() |
| |
| def check_if_peak_memory_usage_exceeded(self, minimum_peak_memory_usage: int, timeout_seconds: int) -> None: |
| assert self.cluster.wait_for_peak_memory_usage_to_exceed(minimum_peak_memory_usage, timeout_seconds) or self.cluster.log_app_output() |
| |
| def check_if_memory_usage_is_below(self, maximum_memory_usage: int, timeout_seconds: int) -> None: |
| assert self.cluster.wait_for_memory_usage_to_drop_below(maximum_memory_usage, timeout_seconds) or self.cluster.log_app_output() |
| |
| def check_memory_usage_compared_to_peak(self, peak_multiplier: float, timeout_seconds: int) -> None: |
| peak_memory = get_peak_memory_usage(get_minifi_pid()) |
| assert (peak_memory is not None) or self.cluster.log_app_output() |
| assert (1.0 > peak_multiplier > 0.0) or self.cluster.log_app_output() |
| assert self.cluster.wait_for_memory_usage_to_drop_below(peak_memory * peak_multiplier, timeout_seconds) or self.cluster.log_app_output() |
| |
| def enable_provenance_repository_in_minifi(self): |
| self.cluster.enable_provenance_repository_in_minifi() |
| |
| def enable_c2_in_minifi(self): |
| self.cluster.enable_c2_in_minifi() |
| |
| def enable_c2_with_ssl_in_minifi(self): |
| self.cluster.enable_c2_with_ssl_in_minifi() |
| |
| def fetch_flow_config_from_c2_url_in_minifi(self): |
| self.cluster.fetch_flow_config_from_c2_url_in_minifi() |
| |
| def enable_prometheus_in_minifi(self): |
| self.cluster.enable_prometheus_in_minifi() |
| |
| def enable_splunk_hec_ssl(self, container_name, splunk_cert_pem, splunk_key_pem, root_ca_cert_pem): |
| self.cluster.enable_splunk_hec_ssl(container_name, splunk_cert_pem, splunk_key_pem, root_ca_cert_pem) |
| |
| def enable_sql_in_minifi(self): |
| self.cluster.enable_sql_in_minifi() |
| |
| def set_yaml_in_minifi(self): |
| self.cluster.set_yaml_in_minifi() |