blob: 6bb5151e05d80d734c008626df785896eee81151 [file] [log] [blame]
from subprocess import Popen, PIPE, STDOUT
import docker
import logging
import os
import shutil
import threading
import time
import uuid
from pydoc import locate
from minifi.core.InputPort import InputPort
from minifi.core.DockerTestCluster import DockerTestCluster
from minifi.core.SingleNodeDockerCluster import SingleNodeDockerCluster
from minifi.core.DockerTestDirectoryBindings import DockerTestDirectoryBindings
from minifi.validators.EmptyFilesOutPutValidator import EmptyFilesOutPutValidator
from minifi.validators.NoFileOutPutValidator import NoFileOutPutValidator
from minifi.validators.SingleFileOutputValidator import SingleFileOutputValidator
from minifi.validators.MultiFileOutputValidator import MultiFileOutputValidator
class MiNiFi_integration_test():
def __init__(self, context):
self.test_id = str(uuid.uuid4())
self.clusters = {}
self.connectable_nodes = []
# Remote process groups are not connectables
self.remote_process_groups = []
self.file_system_observer = None
self.docker_network = None
self.docker_directory_bindings = DockerTestDirectoryBindings()
self.docker_directory_bindings.create_new_data_directories(self.test_id)
def __del__(self):
logging.info("MiNiFi_integration_test cleanup")
# Clean up network, for some reason only this order of events work for cleanup
if self.docker_network is not None:
logging.info('Cleaning up network network: %s', self.docker_network.name)
while len(self.docker_network.containers) != 0:
for container in self.docker_network.containers:
self.docker_network.disconnect(container, force=True)
self.docker_network.reload()
self.docker_network.remove()
container_ids = []
for cluster in self.clusters.values():
for container in cluster.containers.values():
container_ids.append(container.id)
del cluster
# The cluster deleter is not reliable for cleaning up
docker_client = docker.from_env()
for container_id in container_ids:
self.delete_docker_container_by_id(container_id)
del self.docker_directory_bindings
def delete_docker_container_by_id(self, container_id):
docker_client = docker.from_env()
try:
container = docker_client.containers.get(container_id)
container.remove(v=True, force=True)
except docker.errors.NotFound:
logging.warn("Contaner '%s' is already cleaned up before.", container_id)
return
wait_start_time = time.perf_counter()
while (time.perf_counter() - wait_start_time) < 35:
try:
docker_client.containers.get(container_id)
logging.error("Docker container '%s' still exists after removal attempt. Waiting for docker daemon to update...", container_id)
time.sleep(5)
except docker.errors.NotFound:
logging.info("Docker container cleanup successful for '%s'.", container_id)
return
logging.error("Failed to clean up docker container '%s'.", container_id)
def docker_path_to_local_path(self, docker_path):
return self.docker_directory_bindings.docker_path_to_local_path(self.test_id, docker_path)
def get_test_id(self):
return self.test_id
def acquire_cluster(self, name):
return self.clusters.setdefault(name, DockerTestCluster())
def set_up_cluster_network(self):
self.docker_network = SingleNodeDockerCluster.create_docker_network()
for cluster in self.clusters.values():
cluster.set_network(self.docker_network)
def start(self):
logging.info("MiNiFi_integration_test start")
self.set_up_cluster_network()
for cluster in self.clusters.values():
logging.info("Starting cluster %s with an engine of %s", cluster.get_name(), cluster.get_engine())
cluster.set_directory_bindings(self.docker_directory_bindings.get_directory_bindings(self.test_id))
cluster.deploy_flow()
for cluster_name, cluster in self.clusters.items():
startup_success = True
logging.info("Engine: %s", cluster.get_engine())
if cluster.get_engine() == "minifi-cpp":
startup_success = cluster.wait_for_app_logs("Starting Flow Controller", 120)
elif cluster.get_engine() == "nifi":
startup_success = cluster.wait_for_app_logs("Starting Flow Controller...", 120)
elif cluster.get_engine() == "kafka-broker":
startup_success = cluster.wait_for_app_logs("Startup complete.", 120)
elif cluster.get_engine() == "http-proxy":
startup_success = cluster.wait_for_app_logs("Accepting HTTP Socket connections at", 120)
elif cluster.get_engine() == "s3-server":
startup_success = cluster.wait_for_app_logs("Started S3MockApplication", 120)
elif cluster.get_engine() == "azure-storage-server":
startup_success = cluster.wait_for_app_logs("Azurite Queue service is successfully listening at", 120)
if not startup_success:
cluster.log_nifi_output()
assert startup_success
def add_node(self, processor):
if processor.get_name() in (elem.get_name() for elem in self.connectable_nodes):
raise Exception("Trying to register processor with an already registered name: \"%s\"" % processor.get_name())
self.connectable_nodes.append(processor)
def get_or_create_node_by_name(self, node_name):
node = self.get_node_by_name(node_name)
if node is None:
if node_name == "RemoteProcessGroup":
raise Exception("Trying to register RemoteProcessGroup without an input port or address.")
node = locate("minifi.processors." + node_name + "." + node_name)()
node.set_name(node_name)
self.add_node(node)
return node
def get_node_by_name(self, name):
for node in self.connectable_nodes:
if name == node.get_name():
return node
raise Exception("Trying to fetch unknow node: \"%s\"" % name)
def add_remote_process_group(self, remote_process_group):
if remote_process_group.get_name() in (elem.get_name() for elem in self.remote_process_groups):
raise Exception("Trying to register remote_process_group with an already registered name: \"%s\"" % remote_process_group.get_name())
self.remote_process_groups.append(remote_process_group)
def get_remote_process_group_by_name(self, name):
for node in self.remote_process_groups:
if name == node.get_name():
return node
raise Exception("Trying to fetch unknow node: \"%s\"" % name)
@staticmethod
def generate_input_port_for_remote_process_group(remote_process_group, name):
input_port_node = InputPort(name, remote_process_group)
# Generate an MD5 hash unique to the remote process group id
input_port_node.set_uuid(uuid.uuid3(remote_process_group.get_uuid(), "input_port"))
return input_port_node
def add_test_data(self, path, test_data, file_name=str(uuid.uuid4())):
self.docker_directory_bindings.put_file_to_docker_path(self.test_id, path, file_name, test_data.encode('utf-8'))
def put_test_resource(self, file_name, contents):
self.docker_directory_bindings.put_test_resource(self.test_id, file_name, contents)
def get_out_subdir(self, subdir):
return self.docker_directory_bindings.get_out_subdir(self.test_id, subdir)
def rm_out_child(self, subdir):
self.docker_directory_bindings.rm_out_child(self.test_id, subdir)
def add_file_system_observer(self, file_system_observer):
self.file_system_observer = file_system_observer
def check_for_no_files_generated(self, timeout_seconds, subdir=''):
output_validator = NoFileOutPutValidator()
output_validator.set_output_dir(self.file_system_observer.get_output_dir())
self.check_output(timeout_seconds, output_validator, 1, subdir)
def check_for_file_with_content_generated(self, content, timeout_seconds, subdir=''):
output_validator = SingleFileOutputValidator(content)
output_validator.set_output_dir(self.file_system_observer.get_output_dir())
self.check_output(timeout_seconds, output_validator, 1, subdir)
def check_for_multiple_files_generated(self, file_count, timeout_seconds, subdir=''):
output_validator = MultiFileOutputValidator(file_count, subdir)
output_validator.set_output_dir(self.file_system_observer.get_output_dir())
self.check_output(timeout_seconds, output_validator, file_count, subdir)
def check_for_multiple_empty_files_generated(self, timeout_seconds, subdir=''):
output_validator = EmptyFilesOutPutValidator()
output_validator.set_output_dir(self.file_system_observer.get_output_dir())
self.check_output(timeout_seconds, output_validator, 2, subdir)
def check_output(self, timeout_seconds, output_validator, max_files, subdir):
if subdir:
output_validator.subdir = subdir
self.file_system_observer.wait_for_output(timeout_seconds, output_validator, max_files)
for cluster in self.clusters.values():
# Logs for both nifi and minifi, but not other engines
cluster.log_nifi_output()
assert not cluster.segfault_happened()
assert output_validator.validate()
def check_s3_server_object_data(self, cluster_name, object_data):
cluster = self.acquire_cluster(cluster_name)
assert cluster.check_s3_server_object_data(object_data)
def check_s3_server_object_metadata(self, cluster_name, content_type):
cluster = self.acquire_cluster(cluster_name)
assert cluster.check_s3_server_object_metadata(content_type)
def check_empty_s3_bucket(self, cluster_name):
cluster = self.acquire_cluster(cluster_name)
assert cluster.is_s3_bucket_empty()
def check_http_proxy_access(self, cluster_name, url):
assert self.clusters[cluster_name].check_http_proxy_access(url)
def check_azure_storage_server_data(self, cluster_name, object_data):
cluster = self.acquire_cluster(cluster_name)
assert cluster.check_azure_storage_server_data(object_data)