| # Licensed to the Apache Software Foundation (ASF) under one or more |
| # contributor license agreements. See the NOTICE file distributed with |
| # this work for additional information regarding copyright ownership. |
| # The ASF licenses this file to You under the Apache License, Version 2.0 |
| # (the \"License\"); you may not use this file except in compliance with |
| # the License. You may obtain a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, software |
| # distributed under the License is distributed on an \"AS IS\" BASIS, |
| # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| # See the License for the specific language governing permissions and |
| # limitations under the License. |
| import gzip |
| import logging |
| import tarfile |
| import uuid |
| import xml.etree.cElementTree as elementTree |
| from xml.etree.cElementTree import Element |
| from io import StringIO |
| from io import BytesIO |
| from textwrap import dedent |
| |
| import docker |
| import os |
| import yaml |
| from copy import copy |
| |
| import time |
| from collections import OrderedDict |
| |
| class Cluster(object): |
| """ |
| Base Cluster class. This is intended to be a generic interface |
| to different types of clusters. Clusters could be Kubernetes clusters, |
| Docker swarms, or cloud compute/container services. |
| """ |
| |
| def deploy_flow(self, flow, name=None, vols=None): |
| """ |
| Deploys a flow to the cluster. |
| """ |
| |
| def __enter__(self): |
| """ |
| Allocate ephemeral cluster resources. |
| """ |
| return self |
| |
| def __exit__(self, exc_type, exc_val, exc_tb): |
| """ |
| Clean up ephemeral cluster resources. |
| """ |
| |
| |
| class SingleNodeDockerCluster(Cluster): |
| """ |
| A "cluster" which consists of a single docker node. Useful for |
| testing or use-cases which do not span multiple compute nodes. |
| """ |
| |
| def __init__(self): |
| self.minifi_version = os.environ['MINIFI_VERSION'] |
| self.nifi_version = '1.7.0' |
| self.minifi_root = '/opt/minifi/nifi-minifi-cpp-' + self.minifi_version |
| self.nifi_root = '/opt/nifi/nifi-' + self.nifi_version |
| self.network = None |
| self.containers = OrderedDict() |
| self.images = [] |
| self.tmp_files = [] |
| |
| # Get docker client |
| self.client = docker.from_env() |
| |
| def deploy_flow(self, |
| flow, |
| name=None, |
| vols=None, |
| engine='minifi-cpp'): |
| """ |
| Compiles the flow to a valid config file and overlays it into a new image. |
| """ |
| |
| if vols is None: |
| vols = {} |
| |
| logging.info('Deploying %s flow...%s', engine,name) |
| |
| if name is None: |
| name = engine + '-' + str(uuid.uuid4()) |
| logging.info('Flow name was not provided; using generated name \'%s\'', name) |
| |
| # Create network if necessary |
| if self.network is None: |
| net_name = 'nifi-' + str(uuid.uuid4()) |
| logging.info('Creating network: %s', net_name) |
| self.network = self.client.networks.create(net_name) |
| |
| if engine == 'nifi': |
| self.deploy_nifi_flow(flow, name, vols) |
| elif engine == 'minifi-cpp': |
| self.deploy_minifi_cpp_flow(flow, name, vols) |
| elif engine == 'kafka-broker': |
| self.deploy_kafka_broker(name) |
| elif engine == 'http-proxy': |
| self.deploy_http_proxy() |
| elif engine == 's3-server': |
| self.deploy_s3_server() |
| else: |
| raise Exception('invalid flow engine: \'%s\'' % engine) |
| |
| def deploy_minifi_cpp_flow(self, flow, name, vols): |
| |
| # Build configured image |
| dockerfile = dedent("""FROM {base_image} |
| USER root |
| ADD config.yml {minifi_root}/conf/config.yml |
| RUN chown minificpp:minificpp {minifi_root}/conf/config.yml |
| USER minificpp |
| """.format(name=name,hostname=name, |
| base_image='apacheminificpp:' + self.minifi_version, |
| minifi_root=self.minifi_root)) |
| |
| test_flow_yaml = minifi_flow_yaml(flow) |
| logging.info('Using generated flow config yml:\n%s', test_flow_yaml) |
| |
| conf_file_buffer = BytesIO() |
| |
| try: |
| conf_file_buffer.write(test_flow_yaml.encode('utf-8')) |
| conf_file_len = conf_file_buffer.tell() |
| conf_file_buffer.seek(0) |
| |
| context_files = [ |
| { |
| 'name': 'config.yml', |
| 'size': conf_file_len, |
| 'file_obj': conf_file_buffer |
| } |
| ] |
| |
| configured_image = self.build_image(dockerfile, context_files) |
| |
| finally: |
| conf_file_buffer.close() |
| |
| logging.info('Creating and running docker container for flow...') |
| |
| container = self.client.containers.run( |
| configured_image[0], |
| detach=True, |
| name=name, |
| network=self.network.name, |
| volumes=vols) |
| |
| logging.info('Started container \'%s\'', container.name) |
| |
| self.containers[container.name] = container |
| |
| def deploy_nifi_flow(self, flow, name, vols): |
| dockerfile = dedent(r"""FROM {base_image} |
| USER root |
| ADD flow.xml.gz {nifi_root}/conf/flow.xml.gz |
| RUN chown nifi:nifi {nifi_root}/conf/flow.xml.gz |
| RUN sed -i -e 's/^\(nifi.remote.input.host\)=.*/\1={name}/' {nifi_root}/conf/nifi.properties |
| RUN sed -i -e 's/^\(nifi.remote.input.socket.port\)=.*/\1=5000/' {nifi_root}/conf/nifi.properties |
| USER nifi |
| """.format(name=name, |
| base_image='apache/nifi:' + self.nifi_version, |
| nifi_root=self.nifi_root)) |
| |
| test_flow_xml = nifi_flow_xml(flow, self.nifi_version) |
| logging.info('Using generated flow config xml:\n%s', test_flow_xml) |
| |
| conf_file_buffer = BytesIO() |
| |
| try: |
| with gzip.GzipFile(mode='wb', fileobj=conf_file_buffer) as conf_gz_file_buffer: |
| conf_gz_file_buffer.write(test_flow_xml.encode()) |
| conf_file_len = conf_file_buffer.tell() |
| conf_file_buffer.seek(0) |
| |
| context_files = [ |
| { |
| 'name': 'flow.xml.gz', |
| 'size': conf_file_len, |
| 'file_obj': conf_file_buffer |
| } |
| ] |
| |
| configured_image = self.build_image(dockerfile, context_files) |
| |
| finally: |
| conf_file_buffer.close() |
| |
| logging.info('Creating and running docker container for flow...') |
| |
| container = self.client.containers.run( |
| configured_image[0], |
| detach=True, |
| name=name, |
| hostname=name, |
| network=self.network.name, |
| volumes=vols) |
| |
| logging.info('Started container \'%s\'', container.name) |
| |
| self.containers[container.name] = container |
| |
| def deploy_kafka_broker(self, name): |
| logging.info('Creating and running docker containers for kafka broker...') |
| zookeeper = self.client.containers.run( |
| self.client.images.pull("wurstmeister/zookeeper:latest"), |
| detach=True, |
| name='zookeeper', |
| network=self.network.name, |
| ports={'2181/tcp': 2181}, |
| ) |
| self.containers[zookeeper.name] = zookeeper |
| |
| test_dir = os.environ['PYTHONPATH'].split(':')[-1] # Based on DockerVerify.sh |
| broker_image = self.build_image_by_path(test_dir + "/resources/kafka_broker", 'minifi-kafka') |
| broker = self.client.containers.run( |
| broker_image[0], |
| detach=True, |
| name='kafka-broker', |
| network=self.network.name, |
| ports={'9092/tcp': 9092}, |
| environment=["KAFKA_LISTENERS=PLAINTEXT://kafka-broker:9092,SSL://kafka-broker:9093", "KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181"], |
| ) |
| self.containers[broker.name] = broker |
| |
| dockerfile = dedent("""FROM {base_image} |
| USER root |
| CMD $KAFKA_HOME/bin/kafka-console-consumer.sh --bootstrap-server kafka-broker:9092 --topic test > heaven_signal.txt |
| """.format(base_image='wurstmeister/kafka:2.12-2.5.0')) |
| configured_image = self.build_image(dockerfile, []) |
| consumer = self.client.containers.run( |
| configured_image[0], |
| detach=True, |
| name='kafka-consumer', |
| network=self.network.name, |
| ) |
| self.containers[consumer.name] = consumer |
| |
| def deploy_http_proxy(self): |
| logging.info('Creating and running http-proxy docker container...') |
| dockerfile = dedent("""FROM {base_image} |
| RUN apt update && apt install -y apache2-utils |
| RUN htpasswd -b -c /etc/squid/.squid_users {proxy_username} {proxy_password} |
| RUN echo 'auth_param basic program /usr/lib/squid3/basic_ncsa_auth /etc/squid/.squid_users' > /etc/squid/squid.conf && \ |
| echo 'auth_param basic realm proxy' >> /etc/squid/squid.conf && \ |
| echo 'acl authenticated proxy_auth REQUIRED' >> /etc/squid/squid.conf && \ |
| echo 'http_access allow authenticated' >> /etc/squid/squid.conf && \ |
| echo 'http_port {proxy_port}' >> /etc/squid/squid.conf |
| ENTRYPOINT ["/sbin/entrypoint.sh"] |
| """.format(base_image='sameersbn/squid:3.5.27-2', proxy_username='admin', proxy_password='test101', proxy_port='3128')) |
| configured_image = self.build_image(dockerfile, []) |
| consumer = self.client.containers.run( |
| configured_image[0], |
| detach=True, |
| name='http-proxy', |
| network=self.network.name, |
| ports={'3128/tcp': 3128}, |
| ) |
| self.containers[consumer.name] = consumer |
| |
| def deploy_s3_server(self): |
| consumer = self.client.containers.run( |
| "adobe/s3mock:2.1.28", |
| detach=True, |
| name='s3-server', |
| network=self.network.name, |
| ports={'9090/tcp': 9090, '9191/tcp': 9191}, |
| environment=["initialBuckets=test_bucket"], |
| ) |
| self.containers[consumer.name] = consumer |
| |
| def build_image(self, dockerfile, context_files): |
| conf_dockerfile_buffer = BytesIO() |
| docker_context_buffer = BytesIO() |
| |
| try: |
| # Overlay conf onto base nifi image |
| conf_dockerfile_buffer.write(dockerfile.encode()) |
| conf_dockerfile_buffer.seek(0) |
| |
| with tarfile.open(mode='w', fileobj=docker_context_buffer) as docker_context: |
| dockerfile_info = tarfile.TarInfo('Dockerfile') |
| dockerfile_info.size = conf_dockerfile_buffer.getbuffer().nbytes |
| docker_context.addfile(dockerfile_info, |
| fileobj=conf_dockerfile_buffer) |
| |
| for context_file in context_files: |
| file_info = tarfile.TarInfo(context_file['name']) |
| file_info.size = context_file['size'] |
| docker_context.addfile(file_info, |
| fileobj=context_file['file_obj']) |
| docker_context_buffer.seek(0) |
| |
| logging.info('Creating configured image...') |
| configured_image = self.client.images.build(fileobj=docker_context_buffer, |
| custom_context=True, |
| rm=True, |
| forcerm=True) |
| logging.info('Created image with id: %s', configured_image[0].id) |
| self.images.append(configured_image) |
| |
| finally: |
| conf_dockerfile_buffer.close() |
| docker_context_buffer.close() |
| |
| return configured_image |
| |
| def build_image_by_path(self, dir, name=None): |
| try: |
| logging.info('Creating configured image...') |
| configured_image = self.client.images.build(path=dir, |
| tag=name, |
| rm=True, |
| forcerm=True) |
| logging.info('Created image with id: %s', configured_image[0].id) |
| self.images.append(configured_image) |
| return configured_image |
| except Exception as e: |
| logging.info(e) |
| raise |
| |
| def __enter__(self): |
| """ |
| Allocate ephemeral cluster resources. |
| """ |
| return self |
| |
| def __exit__(self, exc_type, exc_val, exc_tb): |
| """ |
| Clean up ephemeral cluster resources |
| """ |
| |
| # Clean up containers |
| for container in self.containers.values(): |
| logging.info('Cleaning up container: %s', container.name) |
| container.remove(v=True, force=True) |
| |
| # Clean up images |
| for image in reversed(self.images): |
| logging.info('Cleaning up image: %s', image[0].id) |
| self.client.images.remove(image[0].id, force=True) |
| |
| # Clean up network |
| if self.network is not None: |
| logging.info('Cleaning up network network: %s', self.network.name) |
| self.network.remove() |
| |
| # Clean up tmp files |
| for tmp_file in self.tmp_files: |
| os.remove(tmp_file) |
| |
| |
| class Connectable(object): |
| def __init__(self, |
| name=None, |
| auto_terminate=None): |
| |
| self.uuid = uuid.uuid4() |
| |
| if name is None: |
| self.name = str(self.uuid) |
| else: |
| self.name = name |
| |
| if auto_terminate is None: |
| self.auto_terminate = [] |
| else: |
| self.auto_terminate = auto_terminate |
| |
| self.connections = {} |
| self.out_proc = self |
| |
| self.drop_empty_flowfiles = False |
| |
| def connect(self, connections): |
| for rel in connections: |
| |
| # Ensure that rel is not auto-terminated |
| if rel in self.auto_terminate: |
| del self.auto_terminate[self.auto_terminate.index(rel)] |
| |
| # Add to set of output connections for this rel |
| if rel not in self.connections: |
| self.connections[rel] = [] |
| self.connections[rel].append(connections[rel]) |
| |
| return self |
| |
| def __rshift__(self, other): |
| """ |
| Right shift operator to support flow DSL, for example: |
| |
| GetFile('/input') >> LogAttribute() >> PutFile('/output') |
| |
| """ |
| |
| connected = copy(self) |
| connected.connections = copy(self.connections) |
| |
| if self.out_proc is self: |
| connected.out_proc = connected |
| else: |
| connected.out_proc = copy(connected.out_proc) |
| |
| if isinstance(other, tuple): |
| if isinstance(other[0], tuple): |
| for rel_tuple in other: |
| rel = {rel_tuple[0]: rel_tuple[1]} |
| connected.out_proc.connect(rel) |
| else: |
| rel = {other[0]: other[1]} |
| connected.out_proc.connect(rel) |
| else: |
| connected.out_proc.connect({'success': other}) |
| connected.out_proc = other |
| |
| return connected |
| |
| def __invert__(self): |
| """ |
| Invert operation to set empty file filtering on incoming connections |
| GetFile('/input') >> ~LogAttribute() |
| """ |
| self.drop_empty_flowfiles = True |
| |
| return self |
| |
| |
| class Processor(Connectable): |
| def __init__(self, |
| clazz, |
| properties=None, |
| schedule=None, |
| name=None, |
| controller_services=None, |
| auto_terminate=None): |
| |
| super(Processor, self).__init__(name=name, |
| auto_terminate=auto_terminate) |
| |
| if controller_services is None: |
| controller_services = [] |
| |
| if schedule is None: |
| schedule = {} |
| |
| if properties is None: |
| properties = {} |
| |
| if name is None: |
| pass |
| |
| self.clazz = clazz |
| self.properties = properties |
| self.controller_services = controller_services |
| |
| self.schedule = { |
| 'scheduling strategy': 'TIMER_DRIVEN', |
| 'scheduling period': '1 sec', |
| 'penalization period': '30 sec', |
| 'yield period': '1 sec', |
| 'run duration nanos': 0 |
| } |
| self.schedule.update(schedule) |
| |
| def nifi_property_key(self, key): |
| """ |
| Returns the Apache NiFi-equivalent property key for the given key. This is often, but not always, the same as |
| the internal key. |
| """ |
| return key |
| |
| |
| class InvokeHTTP(Processor): |
| def __init__(self, url, |
| method='GET', |
| proxy_host='', |
| proxy_port='', |
| proxy_username='', |
| proxy_password='', |
| ssl_context_service=None, |
| schedule={'scheduling strategy': 'EVENT_DRIVEN'}): |
| properties = {'Remote URL': url, |
| 'HTTP Method': method, |
| 'Proxy Host': proxy_host, |
| 'Proxy Port': proxy_port, |
| 'invokehttp-proxy-username': proxy_username, |
| 'invokehttp-proxy-password': proxy_password} |
| |
| controller_services = [] |
| |
| if ssl_context_service is not None: |
| properties['SSL Context Service'] = ssl_context_service.name |
| controller_services.append(ssl_context_service) |
| |
| super(InvokeHTTP, self).__init__('InvokeHTTP', |
| properties=properties, |
| controller_services=controller_services, |
| auto_terminate=['success', |
| 'response', |
| 'retry', |
| 'failure', |
| 'no retry'], |
| schedule=schedule) |
| |
| |
| class ListenHTTP(Processor): |
| def __init__(self, port, cert=None, schedule=None): |
| properties = {'Listening Port': port} |
| |
| if cert is not None: |
| properties['SSL Certificate'] = cert |
| properties['SSL Verify Peer'] = 'no' |
| |
| super(ListenHTTP, self).__init__('ListenHTTP', |
| properties=properties, |
| auto_terminate=['success'], |
| schedule=schedule) |
| |
| |
| class LogAttribute(Processor): |
| def __init__(self, schedule={'scheduling strategy': 'EVENT_DRIVEN'}): |
| super(LogAttribute, self).__init__('LogAttribute', |
| auto_terminate=['success'], |
| schedule=schedule) |
| |
| |
| class GetFile(Processor): |
| def __init__(self, input_dir, schedule={'scheduling period': '2 sec'}): |
| super(GetFile, self).__init__('GetFile', |
| properties={'Input Directory': input_dir, 'Keep Source File': 'true'}, |
| schedule=schedule, |
| auto_terminate=['success']) |
| |
| |
| class GenerateFlowFile(Processor): |
| def __init__(self, file_size, schedule={'scheduling period': '0 sec'}): |
| super(GenerateFlowFile, self).__init__('GenerateFlowFile', |
| properties={'File Size': file_size}, |
| schedule=schedule, |
| auto_terminate=['success']) |
| |
| |
| class PutFile(Processor): |
| def __init__(self, output_dir, schedule={'scheduling strategy': 'EVENT_DRIVEN'}): |
| super(PutFile, self).__init__('PutFile', |
| properties={'Directory': output_dir, 'Directory Permissions': '777', 'Permissions': '777'}, |
| auto_terminate=['success', 'failure'], |
| schedule=schedule) |
| |
| def nifi_property_key(self, key): |
| if key == 'Directory Permissions': |
| return None |
| else: |
| return key |
| |
| |
| class PublishKafka(Processor): |
| def __init__(self, schedule={'scheduling strategy': 'EVENT_DRIVEN'}): |
| super(PublishKafka, self).__init__('PublishKafka', |
| properties={'Client Name': 'nghiaxlee', 'Known Brokers': 'kafka-broker:9092', 'Topic Name': 'test', |
| 'Batch Size': '10', 'Compress Codec': 'none', 'Delivery Guarantee': '1', |
| 'Request Timeout': '10 sec', 'Message Timeout': '12 sec'}, |
| auto_terminate=['success'], |
| schedule=schedule) |
| |
| |
| class PublishKafkaSSL(Processor): |
| def __init__(self, schedule={'scheduling strategy': 'EVENT_DRIVEN'}): |
| super(PublishKafkaSSL, self).__init__('PublishKafka', |
| properties={'Client Name': 'LMN', 'Known Brokers': 'kafka-broker:9093', |
| 'Topic Name': 'test', 'Batch Size': '10', |
| 'Compress Codec': 'none', 'Delivery Guarantee': '1', |
| 'Request Timeout': '10 sec', 'Message Timeout': '12 sec', |
| 'Security CA': '/tmp/resources/certs/ca-cert', |
| 'Security Cert': '/tmp/resources/certs/client_LMN_client.pem', |
| 'Security Pass Phrase': 'abcdefgh', |
| 'Security Private Key': '/tmp/resources/certs/client_LMN_client.key', |
| 'Security Protocol': 'ssl'}, |
| auto_terminate=['success'], |
| schedule=schedule) |
| |
| class PutS3Object(Processor): |
| def __init__(self, |
| proxy_host='', |
| proxy_port='', |
| proxy_username='', |
| proxy_password=''): |
| super(PutS3Object, self).__init__('PutS3Object', |
| properties={ |
| 'Object Key': 'test_object_key', |
| 'Bucket': 'test_bucket', |
| 'Access Key': 'test_access_key', |
| 'Secret Key': 'test_secret', |
| 'Endpoint Override URL': "http://s3-server:9090", |
| 'Proxy Host': proxy_host, |
| 'Proxy Port': proxy_port, |
| 'Proxy Username': proxy_username, |
| 'Proxy Password': proxy_password, |
| }, |
| auto_terminate=['success']) |
| |
| class InputPort(Connectable): |
| def __init__(self, name=None, remote_process_group=None): |
| super(InputPort, self).__init__(name=name) |
| |
| self.remote_process_group = remote_process_group |
| |
| |
| class RemoteProcessGroup(object): |
| def __init__(self, url, |
| name=None): |
| |
| self.uuid = uuid.uuid4() |
| |
| if name is None: |
| self.name = str(self.uuid) |
| else: |
| self.name = name |
| |
| self.url = url |
| |
| |
| class ControllerService(object): |
| def __init__(self, name=None, properties=None): |
| |
| self.id = str(uuid.uuid4()) |
| |
| if name is None: |
| self.name = str(uuid.uuid4()) |
| logging.info('Controller service name was not provided; using generated name \'%s\'', self.name) |
| else: |
| self.name = name |
| |
| if properties is None: |
| properties = {} |
| |
| self.properties = properties |
| |
| |
| class SSLContextService(ControllerService): |
| def __init__(self, name=None, cert=None, key=None, ca_cert=None): |
| super(SSLContextService, self).__init__(name=name) |
| |
| self.service_class = 'SSLContextService' |
| |
| if cert is not None: |
| self.properties['Client Certificate'] = cert |
| |
| if key is not None: |
| self.properties['Private Key'] = key |
| |
| if ca_cert is not None: |
| self.properties['CA Certificate'] = ca_cert |
| |
| |
| def minifi_flow_yaml(connectable, root=None, visited=None): |
| if visited is None: |
| visited = [] |
| |
| if root is None: |
| res = { |
| 'Flow Controller': { |
| 'name': 'MiNiFi Flow' |
| }, |
| 'Processors': [], |
| 'Connections': [], |
| 'Remote Processing Groups': [], |
| 'Controller Services': [] |
| } |
| else: |
| res = root |
| |
| visited.append(connectable) |
| |
| if hasattr(connectable, 'name'): |
| connectable_name = connectable.name |
| else: |
| connectable_name = str(connectable.uuid) |
| |
| if isinstance(connectable, InputPort): |
| group = connectable.remote_process_group |
| res_group = None |
| |
| for res_group_candidate in res['Remote Processing Groups']: |
| assert isinstance(res_group_candidate, dict) |
| if res_group_candidate['id'] == str(group.uuid): |
| res_group = res_group_candidate |
| |
| if res_group is None: |
| res_group = { |
| 'name': group.name, |
| 'id': str(group.uuid), |
| 'url': group.url, |
| 'timeout': '30 sec', |
| 'yield period': '3 sec', |
| 'Input Ports': [] |
| } |
| |
| res['Remote Processing Groups'].append(res_group) |
| |
| res_group['Input Ports'].append({ |
| 'id': str(connectable.uuid), |
| 'name': connectable.name, |
| 'max concurrent tasks': 1, |
| 'Properties': {} |
| }) |
| |
| if isinstance(connectable, Processor): |
| res['Processors'].append({ |
| 'name': connectable_name, |
| 'id': str(connectable.uuid), |
| 'class': 'org.apache.nifi.processors.standard.' + connectable.clazz, |
| 'scheduling strategy': connectable.schedule['scheduling strategy'], |
| 'scheduling period': connectable.schedule['scheduling period'], |
| 'penalization period': connectable.schedule['penalization period'], |
| 'yield period': connectable.schedule['yield period'], |
| 'run duration nanos': connectable.schedule['run duration nanos'], |
| 'Properties': connectable.properties, |
| 'auto-terminated relationships list': connectable.auto_terminate |
| }) |
| |
| for svc in connectable.controller_services: |
| if svc in visited: |
| continue |
| |
| visited.append(svc) |
| res['Controller Services'].append({ |
| 'name': svc.name, |
| 'id': svc.id, |
| 'class': svc.service_class, |
| 'Properties': svc.properties |
| }) |
| |
| for conn_name in connectable.connections: |
| conn_procs = connectable.connections[conn_name] |
| |
| if isinstance(conn_procs, list): |
| for proc in conn_procs: |
| res['Connections'].append({ |
| 'name': str(uuid.uuid4()), |
| 'source id': str(connectable.uuid), |
| 'source relationship name': conn_name, |
| 'destination id': str(proc.uuid), |
| 'drop empty': ("true" if proc.drop_empty_flowfiles else "false") |
| }) |
| if proc not in visited: |
| minifi_flow_yaml(proc, res, visited) |
| else: |
| res['Connections'].append({ |
| 'name': str(uuid.uuid4()), |
| 'source id': str(connectable.uuid), |
| 'source relationship name': conn_name, |
| 'destination id': str(conn_procs.uuid) |
| }) |
| if conn_procs not in visited: |
| minifi_flow_yaml(conn_procs, res, visited) |
| |
| if root is None: |
| return yaml.dump(res, default_flow_style=False) |
| |
| |
| def nifi_flow_xml(connectable, nifi_version=None, root=None, visited=None): |
| if visited is None: |
| visited = [] |
| |
| position = Element('position') |
| position.set('x', '0.0') |
| position.set('y', '0.0') |
| |
| comment = Element('comment') |
| styles = Element('styles') |
| bend_points = Element('bendPoints') |
| label_index = Element('labelIndex') |
| label_index.text = '1' |
| z_index = Element('zIndex') |
| z_index.text = '0' |
| |
| if root is None: |
| res = Element('flowController') |
| max_timer_driven_thread_count = Element('maxTimerDrivenThreadCount') |
| max_timer_driven_thread_count.text = '10' |
| res.append(max_timer_driven_thread_count) |
| max_event_driven_thread_count = Element('maxEventDrivenThreadCount') |
| max_event_driven_thread_count.text = '5' |
| res.append(max_event_driven_thread_count) |
| root_group = Element('rootGroup') |
| root_group_id = Element('id') |
| root_group_id_text = str(uuid.uuid4()) |
| root_group_id.text = root_group_id_text |
| root_group.append(root_group_id) |
| root_group_name = Element('name') |
| root_group_name.text = root_group_id_text |
| root_group.append(root_group_name) |
| res.append(root_group) |
| root_group.append(position) |
| root_group.append(comment) |
| res.append(Element('controllerServices')) |
| res.append(Element('reportingTasks')) |
| res.set('encoding-version', '1.2') |
| else: |
| res = root |
| |
| visited.append(connectable) |
| |
| if hasattr(connectable, 'name'): |
| connectable_name_text = connectable.name |
| else: |
| connectable_name_text = str(connectable.uuid) |
| |
| if isinstance(connectable, InputPort): |
| input_port = Element('inputPort') |
| |
| input_port_id = Element('id') |
| input_port_id.text = str(connectable.uuid) |
| input_port.append(input_port_id) |
| |
| input_port_name = Element('name') |
| input_port_name.text = connectable_name_text |
| input_port.append(input_port_name) |
| |
| input_port.append(position) |
| input_port.append(comment) |
| |
| input_port_scheduled_state = Element('scheduledState') |
| input_port_scheduled_state.text = 'RUNNING' |
| input_port.append(input_port_scheduled_state) |
| |
| input_port_max_concurrent_tasks = Element('maxConcurrentTasks') |
| input_port_max_concurrent_tasks.text = '1' |
| input_port.append(input_port_max_concurrent_tasks) |
| next( res.iterfind('rootGroup') ).append(input_port) |
| |
| if isinstance(connectable, Processor): |
| conn_destination = Element('processor') |
| |
| proc_id = Element('id') |
| proc_id.text = str(connectable.uuid) |
| conn_destination.append(proc_id) |
| |
| proc_name = Element('name') |
| proc_name.text = connectable_name_text |
| conn_destination.append(proc_name) |
| |
| conn_destination.append(position) |
| conn_destination.append(styles) |
| conn_destination.append(comment) |
| |
| proc_class = Element('class') |
| proc_class.text = 'org.apache.nifi.processors.standard.' + connectable.clazz |
| conn_destination.append(proc_class) |
| |
| proc_bundle = Element('bundle') |
| proc_bundle_group = Element('group') |
| proc_bundle_group.text = 'org.apache.nifi' |
| proc_bundle.append(proc_bundle_group) |
| proc_bundle_artifact = Element('artifact') |
| proc_bundle_artifact.text = 'nifi-standard-nar' |
| proc_bundle.append(proc_bundle_artifact) |
| proc_bundle_version = Element('version') |
| proc_bundle_version.text = nifi_version |
| proc_bundle.append(proc_bundle_version) |
| conn_destination.append(proc_bundle) |
| |
| proc_max_concurrent_tasks = Element('maxConcurrentTasks') |
| proc_max_concurrent_tasks.text = '1' |
| conn_destination.append(proc_max_concurrent_tasks) |
| |
| proc_scheduling_period = Element('schedulingPeriod') |
| proc_scheduling_period.text = connectable.schedule['scheduling period'] |
| conn_destination.append(proc_scheduling_period) |
| |
| proc_penalization_period = Element('penalizationPeriod') |
| proc_penalization_period.text = connectable.schedule['penalization period'] |
| conn_destination.append(proc_penalization_period) |
| |
| proc_yield_period = Element('yieldPeriod') |
| proc_yield_period.text = connectable.schedule['yield period'] |
| conn_destination.append(proc_yield_period) |
| |
| proc_bulletin_level = Element('bulletinLevel') |
| proc_bulletin_level.text = 'WARN' |
| conn_destination.append(proc_bulletin_level) |
| |
| proc_loss_tolerant = Element('lossTolerant') |
| proc_loss_tolerant.text = 'false' |
| conn_destination.append(proc_loss_tolerant) |
| |
| proc_scheduled_state = Element('scheduledState') |
| proc_scheduled_state.text = 'RUNNING' |
| conn_destination.append(proc_scheduled_state) |
| |
| proc_scheduling_strategy = Element('schedulingStrategy') |
| proc_scheduling_strategy.text = connectable.schedule['scheduling strategy'] |
| conn_destination.append(proc_scheduling_strategy) |
| |
| proc_execution_node = Element('executionNode') |
| proc_execution_node.text = 'ALL' |
| conn_destination.append(proc_execution_node) |
| |
| proc_run_duration_nanos = Element('runDurationNanos') |
| proc_run_duration_nanos.text = str(connectable.schedule['run duration nanos']) |
| conn_destination.append(proc_run_duration_nanos) |
| |
| for property_key, property_value in connectable.properties.items(): |
| proc_property = Element('property') |
| proc_property_name = Element('name') |
| proc_property_name.text = connectable.nifi_property_key(property_key) |
| if not proc_property_name.text: |
| continue |
| proc_property.append(proc_property_name) |
| proc_property_value = Element('value') |
| proc_property_value.text = property_value |
| proc_property.append(proc_property_value) |
| conn_destination.append(proc_property) |
| |
| for auto_terminate_rel in connectable.auto_terminate: |
| proc_auto_terminated_relationship = Element('autoTerminatedRelationship') |
| proc_auto_terminated_relationship.text = auto_terminate_rel |
| conn_destination.append(proc_auto_terminated_relationship) |
| next( res.iterfind('rootGroup') ).append(conn_destination) |
| """ res.iterfind('rootGroup').next().append(conn_destination) """ |
| |
| for svc in connectable.controller_services: |
| if svc in visited: |
| continue |
| |
| visited.append(svc) |
| controller_service = Element('controllerService') |
| |
| controller_service_id = Element('id') |
| controller_service_id.text = str(svc.id) |
| controller_service.append(controller_service_id) |
| |
| controller_service_name = Element('name') |
| controller_service_name.text = svc.name |
| controller_service.append(controller_service_name) |
| |
| controller_service.append(comment) |
| |
| controller_service_class = Element('class') |
| controller_service_class.text = svc.service_class, |
| controller_service.append(controller_service_class) |
| |
| controller_service_bundle = Element('bundle') |
| controller_service_bundle_group = Element('group') |
| controller_service_bundle_group.text = svc.group |
| controller_service_bundle.append(controller_service_bundle_group) |
| controller_service_bundle_artifact = Element('artifact') |
| controller_service_bundle_artifact.text = svc.artifact |
| controller_service_bundle.append(controller_service_bundle_artifact) |
| controller_service_bundle_version = Element('version') |
| controller_service_bundle_version.text = nifi_version |
| controller_service_bundle.append(controller_service_bundle_version) |
| controller_service.append(controller_service_bundle) |
| |
| controller_enabled = Element('enabled') |
| controller_enabled.text = 'true', |
| controller_service.append(controller_enabled) |
| |
| for property_key, property_value in svc.properties: |
| controller_service_property = Element('property') |
| controller_service_property_name = Element('name') |
| controller_service_property_name.text = property_key |
| controller_service_property.append(controller_service_property_name) |
| controller_service_property_value = Element('value') |
| controller_service_property_value.text = property_value |
| controller_service_property.append(controller_service_property_value) |
| controller_service.append(controller_service_property) |
| next( res.iterfind('rootGroup') ).append(controller_service) |
| """ res.iterfind('rootGroup').next().append(controller_service)""" |
| |
| for conn_name in connectable.connections: |
| conn_destinations = connectable.connections[conn_name] |
| |
| if isinstance(conn_destinations, list): |
| for conn_destination in conn_destinations: |
| connection = nifi_flow_xml_connection(res, |
| bend_points, |
| conn_name, |
| connectable, |
| label_index, |
| conn_destination, |
| z_index) |
| next( res.iterfind('rootGroup') ).append(connection) |
| """ res.iterfind('rootGroup').next().append(connection) """ |
| |
| if conn_destination not in visited: |
| nifi_flow_xml(conn_destination, nifi_version, res, visited) |
| else: |
| connection = nifi_flow_xml_connection(res, |
| bend_points, |
| conn_name, |
| connectable, |
| label_index, |
| conn_destinations, |
| z_index) |
| next( res.iterfind('rootGroup') ).append(connection) |
| """ res.iterfind('rootGroup').next().append(connection) """ |
| |
| if conn_destinations not in visited: |
| nifi_flow_xml(conn_destinations, nifi_version, res, visited) |
| |
| if root is None: |
| return ('<?xml version="1.0" encoding="UTF-8" standalone="no"?>' |
| + "\n" |
| + elementTree.tostring(res, encoding='utf-8').decode('utf-8')) |
| |
| |
| def nifi_flow_xml_connection(res, bend_points, conn_name, connectable, label_index, destination, z_index): |
| connection = Element('connection') |
| |
| connection_id = Element('id') |
| connection_id.text = str(uuid.uuid4()) |
| connection.append(connection_id) |
| |
| connection_name = Element('name') |
| connection.append(connection_name) |
| |
| connection.append(bend_points) |
| connection.append(label_index) |
| connection.append(z_index) |
| |
| connection_source_id = Element('sourceId') |
| connection_source_id.text = str(connectable.uuid) |
| connection.append(connection_source_id) |
| |
| connection_source_group_id = Element('sourceGroupId') |
| connection_source_group_id.text = next( res.iterfind('rootGroup/id') ).text |
| """connection_source_group_id.text = res.iterfind('rootGroup/id').next().text""" |
| connection.append(connection_source_group_id) |
| |
| connection_source_type = Element('sourceType') |
| if isinstance(connectable, Processor): |
| connection_source_type.text = 'PROCESSOR' |
| elif isinstance(connectable, InputPort): |
| connection_source_type.text = 'INPUT_PORT' |
| else: |
| raise Exception('Unexpected source type: %s' % type(connectable)) |
| connection.append(connection_source_type) |
| |
| connection_destination_id = Element('destinationId') |
| connection_destination_id.text = str(destination.uuid) |
| connection.append(connection_destination_id) |
| |
| connection_destination_group_id = Element('destinationGroupId') |
| connection_destination_group_id.text = next(res.iterfind('rootGroup/id')).text |
| """ connection_destination_group_id.text = res.iterfind('rootGroup/id').next().text """ |
| connection.append(connection_destination_group_id) |
| |
| connection_destination_type = Element('destinationType') |
| if isinstance(destination, Processor): |
| connection_destination_type.text = 'PROCESSOR' |
| elif isinstance(destination, InputPort): |
| connection_destination_type.text = 'INPUT_PORT' |
| else: |
| raise Exception('Unexpected destination type: %s' % type(destination)) |
| connection.append(connection_destination_type) |
| |
| connection_relationship = Element('relationship') |
| if not isinstance(connectable, InputPort): |
| connection_relationship.text = conn_name |
| connection.append(connection_relationship) |
| |
| connection_max_work_queue_size = Element('maxWorkQueueSize') |
| connection_max_work_queue_size.text = '10000' |
| connection.append(connection_max_work_queue_size) |
| |
| connection_max_work_queue_data_size = Element('maxWorkQueueDataSize') |
| connection_max_work_queue_data_size.text = '1 GB' |
| connection.append(connection_max_work_queue_data_size) |
| |
| connection_flow_file_expiration = Element('flowFileExpiration') |
| connection_flow_file_expiration.text = '0 sec' |
| connection.append(connection_flow_file_expiration) |
| |
| return connection |