blob: 67bfe62640683b877b2108b9afe092a9fc54e294 [file] [log] [blame]
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the \"License\"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an \"AS IS\" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import gzip
import logging
import tarfile
import uuid
import xml.etree.cElementTree as elementTree
from xml.etree.cElementTree import Element
from io import StringIO
from io import BytesIO
from textwrap import dedent
import docker
import os
import yaml
from copy import copy
import time
from collections import OrderedDict
class Cluster(object):
"""
Base Cluster class. This is intended to be a generic interface
to different types of clusters. Clusters could be Kubernetes clusters,
Docker swarms, or cloud compute/container services.
"""
def deploy_flow(self, flow, name=None, vols=None):
"""
Deploys a flow to the cluster.
"""
def __enter__(self):
"""
Allocate ephemeral cluster resources.
"""
return self
def __exit__(self, exc_type, exc_val, exc_tb):
"""
Clean up ephemeral cluster resources.
"""
class SingleNodeDockerCluster(Cluster):
"""
A "cluster" which consists of a single docker node. Useful for
testing or use-cases which do not span multiple compute nodes.
"""
def __init__(self):
self.minifi_version = os.environ['MINIFI_VERSION']
self.nifi_version = '1.7.0'
self.minifi_root = '/opt/minifi/nifi-minifi-cpp-' + self.minifi_version
self.nifi_root = '/opt/nifi/nifi-' + self.nifi_version
self.network = None
self.containers = OrderedDict()
self.images = []
self.tmp_files = []
# Get docker client
self.client = docker.from_env()
def deploy_flow(self,
flow,
name=None,
vols=None,
engine='minifi-cpp'):
"""
Compiles the flow to a valid config file and overlays it into a new image.
"""
if vols is None:
vols = {}
logging.info('Deploying %s flow...%s', engine,name)
if name is None:
name = engine + '-' + str(uuid.uuid4())
logging.info('Flow name was not provided; using generated name \'%s\'', name)
# Create network if necessary
if self.network is None:
net_name = 'nifi-' + str(uuid.uuid4())
logging.info('Creating network: %s', net_name)
self.network = self.client.networks.create(net_name)
if engine == 'nifi':
self.deploy_nifi_flow(flow, name, vols)
elif engine == 'minifi-cpp':
self.deploy_minifi_cpp_flow(flow, name, vols)
elif engine == 'kafka-broker':
self.deploy_kafka_broker(name)
elif engine == 'http-proxy':
self.deploy_http_proxy()
else:
raise Exception('invalid flow engine: \'%s\'' % engine)
def deploy_minifi_cpp_flow(self, flow, name, vols):
# Build configured image
dockerfile = dedent("""FROM {base_image}
USER root
ADD config.yml {minifi_root}/conf/config.yml
RUN chown minificpp:minificpp {minifi_root}/conf/config.yml
USER minificpp
""".format(name=name,hostname=name,
base_image='apacheminificpp:minimal-' + self.minifi_version,
minifi_root=self.minifi_root))
test_flow_yaml = minifi_flow_yaml(flow)
logging.info('Using generated flow config yml:\n%s', test_flow_yaml)
conf_file_buffer = BytesIO()
try:
conf_file_buffer.write(test_flow_yaml.encode('utf-8'))
conf_file_len = conf_file_buffer.tell()
conf_file_buffer.seek(0)
context_files = [
{
'name': 'config.yml',
'size': conf_file_len,
'file_obj': conf_file_buffer
}
]
configured_image = self.build_image(dockerfile, context_files)
finally:
conf_file_buffer.close()
logging.info('Creating and running docker container for flow...')
container = self.client.containers.run(
configured_image[0],
detach=True,
name=name,
network=self.network.name,
volumes=vols)
logging.info('Started container \'%s\'', container.name)
self.containers[container.name] = container
def deploy_nifi_flow(self, flow, name, vols):
dockerfile = dedent("""FROM {base_image}
USER root
ADD flow.xml.gz {nifi_root}/conf/flow.xml.gz
RUN chown nifi:nifi {nifi_root}/conf/flow.xml.gz
RUN sed -i -e 's/^\(nifi.remote.input.host\)=.*/\\1={name}/' {nifi_root}/conf/nifi.properties
RUN sed -i -e 's/^\(nifi.remote.input.socket.port\)=.*/\\1=5000/' {nifi_root}/conf/nifi.properties
USER nifi
""".format(name=name,
base_image='apache/nifi:' + self.nifi_version,
nifi_root=self.nifi_root))
test_flow_xml = nifi_flow_xml(flow, self.nifi_version)
logging.info('Using generated flow config xml:\n%s', test_flow_xml)
conf_file_buffer = BytesIO()
try:
with gzip.GzipFile(mode='wb', fileobj=conf_file_buffer) as conf_gz_file_buffer:
conf_gz_file_buffer.write(test_flow_xml.encode())
conf_file_len = conf_file_buffer.tell()
conf_file_buffer.seek(0)
context_files = [
{
'name': 'flow.xml.gz',
'size': conf_file_len,
'file_obj': conf_file_buffer
}
]
configured_image = self.build_image(dockerfile, context_files)
finally:
conf_file_buffer.close()
logging.info('Creating and running docker container for flow...')
container = self.client.containers.run(
configured_image[0],
detach=True,
name=name,
hostname=name,
network=self.network.name,
volumes=vols)
logging.info('Started container \'%s\'', container.name)
self.containers[container.name] = container
def deploy_kafka_broker(self, name):
logging.info('Creating and running docker containers for kafka broker...')
zookeeper = self.client.containers.run(
self.client.images.pull("wurstmeister/zookeeper:latest"),
detach=True,
name='zookeeper',
network=self.network.name,
ports={'2181/tcp': 2181},
)
self.containers[zookeeper.name] = zookeeper
test_dir = os.environ['PYTHONPATH'].split(':')[-1] # Based on DockerVerify.sh
broker_image = self.build_image_by_path(test_dir + "/resources/kafka_broker", 'minifi-kafka')
broker = self.client.containers.run(
broker_image[0],
detach=True,
name='kafka-broker',
network=self.network.name,
ports={'9092/tcp': 9092},
environment=["KAFKA_LISTENERS=PLAINTEXT://kafka-broker:9092,SSL://kafka-broker:9093", "KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181"],
)
self.containers[broker.name] = broker
dockerfile = dedent("""FROM {base_image}
USER root
CMD $KAFKA_HOME/bin/kafka-console-consumer.sh --bootstrap-server kafka-broker:9092 --topic test > heaven_signal.txt
""".format(base_image='wurstmeister/kafka:2.12-2.5.0'))
configured_image = self.build_image(dockerfile, [])
consumer = self.client.containers.run(
configured_image[0],
detach=True,
name='kafka-consumer',
network=self.network.name,
)
self.containers[consumer.name] = consumer
def deploy_http_proxy(self):
logging.info('Creating and running http-proxy docker container...')
dockerfile = dedent("""FROM {base_image}
RUN apt update && apt install -y apache2-utils
RUN htpasswd -b -c /etc/squid/.squid_users {proxy_username} {proxy_password}
RUN echo 'auth_param basic program /usr/lib/squid3/basic_ncsa_auth /etc/squid/.squid_users' > /etc/squid/squid.conf && \
echo 'auth_param basic realm proxy' >> /etc/squid/squid.conf && \
echo 'acl authenticated proxy_auth REQUIRED' >> /etc/squid/squid.conf && \
echo 'http_access allow authenticated' >> /etc/squid/squid.conf && \
echo 'http_port {proxy_port}' >> /etc/squid/squid.conf
ENTRYPOINT ["/sbin/entrypoint.sh"]
""".format(base_image='sameersbn/squid:3.5.27-2', proxy_username='admin', proxy_password='test101', proxy_port='3128'))
configured_image = self.build_image(dockerfile, [])
consumer = self.client.containers.run(
configured_image[0],
detach=True,
name='http-proxy',
network=self.network.name,
ports={'3128/tcp': 3128},
)
self.containers[consumer.name] = consumer
def build_image(self, dockerfile, context_files):
conf_dockerfile_buffer = BytesIO()
docker_context_buffer = BytesIO()
try:
# Overlay conf onto base nifi image
conf_dockerfile_buffer.write(dockerfile.encode())
conf_dockerfile_buffer.seek(0)
with tarfile.open(mode='w', fileobj=docker_context_buffer) as docker_context:
dockerfile_info = tarfile.TarInfo('Dockerfile')
dockerfile_info.size = conf_dockerfile_buffer.getbuffer().nbytes
docker_context.addfile(dockerfile_info,
fileobj=conf_dockerfile_buffer)
for context_file in context_files:
file_info = tarfile.TarInfo(context_file['name'])
file_info.size = context_file['size']
docker_context.addfile(file_info,
fileobj=context_file['file_obj'])
docker_context_buffer.seek(0)
logging.info('Creating configured image...')
configured_image = self.client.images.build(fileobj=docker_context_buffer,
custom_context=True,
rm=True,
forcerm=True)
logging.info('Created image with id: %s', configured_image[0].id)
self.images.append(configured_image)
finally:
conf_dockerfile_buffer.close()
docker_context_buffer.close()
return configured_image
def build_image_by_path(self, dir, name=None):
try:
logging.info('Creating configured image...')
configured_image = self.client.images.build(path=dir,
tag=name,
rm=True,
forcerm=True)
logging.info('Created image with id: %s', configured_image[0].id)
self.images.append(configured_image)
return configured_image
except Exception as e:
logging.info(e)
raise
def __enter__(self):
"""
Allocate ephemeral cluster resources.
"""
return self
def __exit__(self, exc_type, exc_val, exc_tb):
"""
Clean up ephemeral cluster resources
"""
# Clean up containers
for container in self.containers.values():
logging.info('Cleaning up container: %s', container.name)
container.remove(v=True, force=True)
# Clean up images
for image in reversed(self.images):
logging.info('Cleaning up image: %s', image[0].id)
self.client.images.remove(image[0].id, force=True)
# Clean up network
if self.network is not None:
logging.info('Cleaning up network network: %s', self.network.name)
self.network.remove()
# Clean up tmp files
for tmp_file in self.tmp_files:
os.remove(tmp_file)
class Connectable(object):
def __init__(self,
name=None,
auto_terminate=None):
self.uuid = uuid.uuid4()
if name is None:
self.name = str(self.uuid)
else:
self.name = name
if auto_terminate is None:
self.auto_terminate = []
else:
self.auto_terminate = auto_terminate
self.connections = {}
self.out_proc = self
self.drop_empty_flowfiles = False
def connect(self, connections):
for rel in connections:
# Ensure that rel is not auto-terminated
if rel in self.auto_terminate:
del self.auto_terminate[self.auto_terminate.index(rel)]
# Add to set of output connections for this rel
if rel not in self.connections:
self.connections[rel] = []
self.connections[rel].append(connections[rel])
return self
def __rshift__(self, other):
"""
Right shift operator to support flow DSL, for example:
GetFile('/input') >> LogAttribute() >> PutFile('/output')
"""
connected = copy(self)
connected.connections = copy(self.connections)
if self.out_proc is self:
connected.out_proc = connected
else:
connected.out_proc = copy(connected.out_proc)
if isinstance(other, tuple):
if isinstance(other[0], tuple):
for rel_tuple in other:
rel = {rel_tuple[0]: rel_tuple[1]}
connected.out_proc.connect(rel)
else:
rel = {other[0]: other[1]}
connected.out_proc.connect(rel)
else:
connected.out_proc.connect({'success': other})
connected.out_proc = other
return connected
def __invert__(self):
"""
Invert operation to set empty file filtering on incoming connections
GetFile('/input') >> ~LogAttribute()
"""
self.drop_empty_flowfiles = True
return self
class Processor(Connectable):
def __init__(self,
clazz,
properties=None,
schedule=None,
name=None,
controller_services=None,
auto_terminate=None):
super(Processor, self).__init__(name=name,
auto_terminate=auto_terminate)
if controller_services is None:
controller_services = []
if schedule is None:
schedule = {}
if properties is None:
properties = {}
if name is None:
pass
self.clazz = clazz
self.properties = properties
self.controller_services = controller_services
self.schedule = {
'scheduling strategy': 'TIMER_DRIVEN',
'scheduling period': '1 sec',
'penalization period': '30 sec',
'yield period': '1 sec',
'run duration nanos': 0
}
self.schedule.update(schedule)
def nifi_property_key(self, key):
"""
Returns the Apache NiFi-equivalent property key for the given key. This is often, but not always, the same as
the internal key.
"""
return key
class InvokeHTTP(Processor):
def __init__(self, url,
method='GET',
proxy_host='',
proxy_port='',
proxy_username='',
proxy_password='',
ssl_context_service=None):
properties = {'Remote URL': url,
'HTTP Method': method,
'Proxy Host': proxy_host,
'Proxy Port': proxy_port,
'invokehttp-proxy-username': proxy_username,
'invokehttp-proxy-password': proxy_password}
controller_services = []
if ssl_context_service is not None:
properties['SSL Context Service'] = ssl_context_service.name
controller_services.append(ssl_context_service)
super(InvokeHTTP, self).__init__('InvokeHTTP',
properties=properties,
controller_services=controller_services,
auto_terminate=['success',
'response',
'retry',
'failure',
'no retry'])
class ListenHTTP(Processor):
def __init__(self, port, cert=None):
properties = {'Listening Port': port}
if cert is not None:
properties['SSL Certificate'] = cert
properties['SSL Verify Peer'] = 'no'
super(ListenHTTP, self).__init__('ListenHTTP',
properties=properties,
auto_terminate=['success'])
class LogAttribute(Processor):
def __init__(self, ):
super(LogAttribute, self).__init__('LogAttribute',
auto_terminate=['success'])
class DebugFlow(Processor):
def __init__(self, ):
super(DebugFlow, self).__init__('DebugFlow')
class AttributesToJSON(Processor):
def __init__(self, destination, attributes):
super(AttributesToJSON, self).__init__('AttributesToJSON',
properties={'Destination': destination, 'Attributes List': attributes},
schedule={'scheduling period': '0 sec'},
auto_terminate=['failure'])
class GetFile(Processor):
def __init__(self, input_dir):
super(GetFile, self).__init__('GetFile',
properties={'Input Directory': input_dir, 'Keep Source File': 'true'},
schedule={'scheduling period': '2 sec'},
auto_terminate=['success'])
class GenerateFlowFile(Processor):
def __init__(self, file_size):
super(GenerateFlowFile, self).__init__('GenerateFlowFile',
properties={'File Size': file_size},
schedule={'scheduling period': '0 sec'},
auto_terminate=['success'])
class PutFile(Processor):
def __init__(self, output_dir):
super(PutFile, self).__init__('PutFile',
properties={'Directory': output_dir},
auto_terminate=['success', 'failure'])
def nifi_property_key(self, key):
if key == 'Output Directory':
return 'Directory'
else:
return key
class PublishKafka(Processor):
def __init__(self):
super(PublishKafka, self).__init__('PublishKafka',
properties={'Client Name': 'nghiaxlee', 'Known Brokers': 'kafka-broker:9092', 'Topic Name': 'test',
'Batch Size': '10', 'Compress Codec': 'none', 'Delivery Guarantee': '1',
'Request Timeout': '10 sec', 'Message Timeout': '12 sec'},
auto_terminate=['success'])
class PublishKafkaSSL(Processor):
def __init__(self):
super(PublishKafkaSSL, self).__init__('PublishKafka',
properties={'Client Name': 'LMN', 'Known Brokers': 'kafka-broker:9093',
'Topic Name': 'test', 'Batch Size': '10',
'Compress Codec': 'none', 'Delivery Guarantee': '1',
'Request Timeout': '10 sec', 'Message Timeout': '12 sec',
'Security CA': '/tmp/resources/certs/ca-cert',
'Security Cert': '/tmp/resources/certs/client_LMN_client.pem',
'Security Pass Phrase': 'abcdefgh',
'Security Private Key': '/tmp/resources/certs/client_LMN_client.key',
'Security Protocol': 'ssl'},
auto_terminate=['success'])
class InputPort(Connectable):
def __init__(self, name=None, remote_process_group=None):
super(InputPort, self).__init__(name=name)
self.remote_process_group = remote_process_group
class RemoteProcessGroup(object):
def __init__(self, url,
name=None):
self.uuid = uuid.uuid4()
if name is None:
self.name = str(self.uuid)
else:
self.name = name
self.url = url
class ControllerService(object):
def __init__(self, name=None, properties=None):
self.id = str(uuid.uuid4())
if name is None:
self.name = str(uuid.uuid4())
logging.info('Controller service name was not provided; using generated name \'%s\'', self.name)
else:
self.name = name
if properties is None:
properties = {}
self.properties = properties
class SSLContextService(ControllerService):
def __init__(self, name=None, cert=None, key=None, ca_cert=None):
super(SSLContextService, self).__init__(name=name)
self.service_class = 'SSLContextService'
if cert is not None:
self.properties['Client Certificate'] = cert
if key is not None:
self.properties['Private Key'] = key
if ca_cert is not None:
self.properties['CA Certificate'] = ca_cert
def minifi_flow_yaml(connectable, root=None, visited=None):
if visited is None:
visited = []
if root is None:
res = {
'Flow Controller': {
'name': 'MiNiFi Flow'
},
'Processors': [],
'Connections': [],
'Remote Processing Groups': [],
'Controller Services': []
}
else:
res = root
visited.append(connectable)
if hasattr(connectable, 'name'):
connectable_name = connectable.name
else:
connectable_name = str(connectable.uuid)
if isinstance(connectable, InputPort):
group = connectable.remote_process_group
res_group = None
for res_group_candidate in res['Remote Processing Groups']:
assert isinstance(res_group_candidate, dict)
if res_group_candidate['id'] == str(group.uuid):
res_group = res_group_candidate
if res_group is None:
res_group = {
'name': group.name,
'id': str(group.uuid),
'url': group.url,
'timeout': '30 sec',
'yield period': '3 sec',
'Input Ports': []
}
res['Remote Processing Groups'].append(res_group)
res_group['Input Ports'].append({
'id': str(connectable.uuid),
'name': connectable.name,
'max concurrent tasks': 1,
'Properties': {}
})
if isinstance(connectable, Processor):
res['Processors'].append({
'name': connectable_name,
'id': str(connectable.uuid),
'class': 'org.apache.nifi.processors.standard.' + connectable.clazz,
'scheduling strategy': connectable.schedule['scheduling strategy'],
'scheduling period': connectable.schedule['scheduling period'],
'penalization period': connectable.schedule['penalization period'],
'yield period': connectable.schedule['yield period'],
'run duration nanos': connectable.schedule['run duration nanos'],
'Properties': connectable.properties,
'auto-terminated relationships list': connectable.auto_terminate
})
for svc in connectable.controller_services:
if svc in visited:
continue
visited.append(svc)
res['Controller Services'].append({
'name': svc.name,
'id': svc.id,
'class': svc.service_class,
'Properties': svc.properties
})
for conn_name in connectable.connections:
conn_procs = connectable.connections[conn_name]
if isinstance(conn_procs, list):
for proc in conn_procs:
res['Connections'].append({
'name': str(uuid.uuid4()),
'source id': str(connectable.uuid),
'source relationship name': conn_name,
'destination id': str(proc.uuid),
'drop empty': ("true" if proc.drop_empty_flowfiles else "false")
})
if proc not in visited:
minifi_flow_yaml(proc, res, visited)
else:
res['Connections'].append({
'name': str(uuid.uuid4()),
'source id': str(connectable.uuid),
'source relationship name': conn_name,
'destination id': str(conn_procs.uuid)
})
if conn_procs not in visited:
minifi_flow_yaml(conn_procs, res, visited)
if root is None:
return yaml.dump(res, default_flow_style=False)
def nifi_flow_xml(connectable, nifi_version=None, root=None, visited=None):
if visited is None:
visited = []
position = Element('position')
position.set('x', '0.0')
position.set('y', '0.0')
comment = Element('comment')
styles = Element('styles')
bend_points = Element('bendPoints')
label_index = Element('labelIndex')
label_index.text = '1'
z_index = Element('zIndex')
z_index.text = '0'
if root is None:
res = Element('flowController')
max_timer_driven_thread_count = Element('maxTimerDrivenThreadCount')
max_timer_driven_thread_count.text = '10'
res.append(max_timer_driven_thread_count)
max_event_driven_thread_count = Element('maxEventDrivenThreadCount')
max_event_driven_thread_count.text = '5'
res.append(max_event_driven_thread_count)
root_group = Element('rootGroup')
root_group_id = Element('id')
root_group_id_text = str(uuid.uuid4())
root_group_id.text = root_group_id_text
root_group.append(root_group_id)
root_group_name = Element('name')
root_group_name.text = root_group_id_text
root_group.append(root_group_name)
res.append(root_group)
root_group.append(position)
root_group.append(comment)
res.append(Element('controllerServices'))
res.append(Element('reportingTasks'))
res.set('encoding-version', '1.2')
else:
res = root
visited.append(connectable)
if hasattr(connectable, 'name'):
connectable_name_text = connectable.name
else:
connectable_name_text = str(connectable.uuid)
if isinstance(connectable, InputPort):
input_port = Element('inputPort')
input_port_id = Element('id')
input_port_id.text = str(connectable.uuid)
input_port.append(input_port_id)
input_port_name = Element('name')
input_port_name.text = connectable_name_text
input_port.append(input_port_name)
input_port.append(position)
input_port.append(comment)
input_port_scheduled_state = Element('scheduledState')
input_port_scheduled_state.text = 'RUNNING'
input_port.append(input_port_scheduled_state)
input_port_max_concurrent_tasks = Element('maxConcurrentTasks')
input_port_max_concurrent_tasks.text = '1'
input_port.append(input_port_max_concurrent_tasks)
next( res.iterfind('rootGroup') ).append(input_port)
if isinstance(connectable, Processor):
conn_destination = Element('processor')
proc_id = Element('id')
proc_id.text = str(connectable.uuid)
conn_destination.append(proc_id)
proc_name = Element('name')
proc_name.text = connectable_name_text
conn_destination.append(proc_name)
conn_destination.append(position)
conn_destination.append(styles)
conn_destination.append(comment)
proc_class = Element('class')
proc_class.text = 'org.apache.nifi.processors.standard.' + connectable.clazz
conn_destination.append(proc_class)
proc_bundle = Element('bundle')
proc_bundle_group = Element('group')
proc_bundle_group.text = 'org.apache.nifi'
proc_bundle.append(proc_bundle_group)
proc_bundle_artifact = Element('artifact')
proc_bundle_artifact.text = 'nifi-standard-nar'
proc_bundle.append(proc_bundle_artifact)
proc_bundle_version = Element('version')
proc_bundle_version.text = nifi_version
proc_bundle.append(proc_bundle_version)
conn_destination.append(proc_bundle)
proc_max_concurrent_tasks = Element('maxConcurrentTasks')
proc_max_concurrent_tasks.text = '1'
conn_destination.append(proc_max_concurrent_tasks)
proc_scheduling_period = Element('schedulingPeriod')
proc_scheduling_period.text = connectable.schedule['scheduling period']
conn_destination.append(proc_scheduling_period)
proc_penalization_period = Element('penalizationPeriod')
proc_penalization_period.text = connectable.schedule['penalization period']
conn_destination.append(proc_penalization_period)
proc_yield_period = Element('yieldPeriod')
proc_yield_period.text = connectable.schedule['yield period']
conn_destination.append(proc_yield_period)
proc_bulletin_level = Element('bulletinLevel')
proc_bulletin_level.text = 'WARN'
conn_destination.append(proc_bulletin_level)
proc_loss_tolerant = Element('lossTolerant')
proc_loss_tolerant.text = 'false'
conn_destination.append(proc_loss_tolerant)
proc_scheduled_state = Element('scheduledState')
proc_scheduled_state.text = 'RUNNING'
conn_destination.append(proc_scheduled_state)
proc_scheduling_strategy = Element('schedulingStrategy')
proc_scheduling_strategy.text = connectable.schedule['scheduling strategy']
conn_destination.append(proc_scheduling_strategy)
proc_execution_node = Element('executionNode')
proc_execution_node.text = 'ALL'
conn_destination.append(proc_execution_node)
proc_run_duration_nanos = Element('runDurationNanos')
proc_run_duration_nanos.text = str(connectable.schedule['run duration nanos'])
conn_destination.append(proc_run_duration_nanos)
for property_key, property_value in connectable.properties.items():
proc_property = Element('property')
proc_property_name = Element('name')
proc_property_name.text = connectable.nifi_property_key(property_key)
proc_property.append(proc_property_name)
proc_property_value = Element('value')
proc_property_value.text = property_value
proc_property.append(proc_property_value)
conn_destination.append(proc_property)
for auto_terminate_rel in connectable.auto_terminate:
proc_auto_terminated_relationship = Element('autoTerminatedRelationship')
proc_auto_terminated_relationship.text = auto_terminate_rel
conn_destination.append(proc_auto_terminated_relationship)
next( res.iterfind('rootGroup') ).append(conn_destination)
""" res.iterfind('rootGroup').next().append(conn_destination) """
for svc in connectable.controller_services:
if svc in visited:
continue
visited.append(svc)
controller_service = Element('controllerService')
controller_service_id = Element('id')
controller_service_id.text = str(svc.id)
controller_service.append(controller_service_id)
controller_service_name = Element('name')
controller_service_name.text = svc.name
controller_service.append(controller_service_name)
controller_service.append(comment)
controller_service_class = Element('class')
controller_service_class.text = svc.service_class,
controller_service.append(controller_service_class)
controller_service_bundle = Element('bundle')
controller_service_bundle_group = Element('group')
controller_service_bundle_group.text = svc.group
controller_service_bundle.append(controller_service_bundle_group)
controller_service_bundle_artifact = Element('artifact')
controller_service_bundle_artifact.text = svc.artifact
controller_service_bundle.append(controller_service_bundle_artifact)
controller_service_bundle_version = Element('version')
controller_service_bundle_version.text = nifi_version
controller_service_bundle.append(controller_service_bundle_version)
controller_service.append(controller_service_bundle)
controller_enabled = Element('enabled')
controller_enabled.text = 'true',
controller_service.append(controller_enabled)
for property_key, property_value in svc.properties:
controller_service_property = Element('property')
controller_service_property_name = Element('name')
controller_service_property_name.text = property_key
controller_service_property.append(controller_service_property_name)
controller_service_property_value = Element('value')
controller_service_property_value.text = property_value
controller_service_property.append(controller_service_property_value)
controller_service.append(controller_service_property)
next( res.iterfind('rootGroup') ).append(controller_service)
""" res.iterfind('rootGroup').next().append(controller_service)"""
for conn_name in connectable.connections:
conn_destinations = connectable.connections[conn_name]
if isinstance(conn_destinations, list):
for conn_destination in conn_destinations:
connection = nifi_flow_xml_connection(res,
bend_points,
conn_name,
connectable,
label_index,
conn_destination,
z_index)
next( res.iterfind('rootGroup') ).append(connection)
""" res.iterfind('rootGroup').next().append(connection) """
if conn_destination not in visited:
nifi_flow_xml(conn_destination, nifi_version, res, visited)
else:
connection = nifi_flow_xml_connection(res,
bend_points,
conn_name,
connectable,
label_index,
conn_destinations,
z_index)
next( res.iterfind('rootGroup') ).append(connection)
""" res.iterfind('rootGroup').next().append(connection) """
if conn_destinations not in visited:
nifi_flow_xml(conn_destinations, nifi_version, res, visited)
if root is None:
return ('<?xml version="1.0" encoding="UTF-8" standalone="no"?>'
+ "\n"
+ elementTree.tostring(res, encoding='utf-8').decode('utf-8'))
def nifi_flow_xml_connection(res, bend_points, conn_name, connectable, label_index, destination, z_index):
connection = Element('connection')
connection_id = Element('id')
connection_id.text = str(uuid.uuid4())
connection.append(connection_id)
connection_name = Element('name')
connection.append(connection_name)
connection.append(bend_points)
connection.append(label_index)
connection.append(z_index)
connection_source_id = Element('sourceId')
connection_source_id.text = str(connectable.uuid)
connection.append(connection_source_id)
connection_source_group_id = Element('sourceGroupId')
connection_source_group_id.text = next( res.iterfind('rootGroup/id') ).text
"""connection_source_group_id.text = res.iterfind('rootGroup/id').next().text"""
connection.append(connection_source_group_id)
connection_source_type = Element('sourceType')
if isinstance(connectable, Processor):
connection_source_type.text = 'PROCESSOR'
elif isinstance(connectable, InputPort):
connection_source_type.text = 'INPUT_PORT'
else:
raise Exception('Unexpected source type: %s' % type(connectable))
connection.append(connection_source_type)
connection_destination_id = Element('destinationId')
connection_destination_id.text = str(destination.uuid)
connection.append(connection_destination_id)
connection_destination_group_id = Element('destinationGroupId')
connection_destination_group_id.text = next(res.iterfind('rootGroup/id')).text
""" connection_destination_group_id.text = res.iterfind('rootGroup/id').next().text """
connection.append(connection_destination_group_id)
connection_destination_type = Element('destinationType')
if isinstance(destination, Processor):
connection_destination_type.text = 'PROCESSOR'
elif isinstance(destination, InputPort):
connection_destination_type.text = 'INPUT_PORT'
else:
raise Exception('Unexpected destination type: %s' % type(destination))
connection.append(connection_destination_type)
connection_relationship = Element('relationship')
if not isinstance(connectable, InputPort):
connection_relationship.text = conn_name
connection.append(connection_relationship)
connection_max_work_queue_size = Element('maxWorkQueueSize')
connection_max_work_queue_size.text = '10000'
connection.append(connection_max_work_queue_size)
connection_max_work_queue_data_size = Element('maxWorkQueueDataSize')
connection_max_work_queue_data_size.text = '1 GB'
connection.append(connection_max_work_queue_data_size)
connection_flow_file_expiration = Element('flowFileExpiration')
connection_flow_file_expiration.text = '0 sec'
connection.append(connection_flow_file_expiration)
return connection