blob: 4a64a982e10a8705acb0dddb77e431ed80eca188 [file] [log] [blame]
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import uuid
import json
from ..core.Processor import Processor
from ..core.InputPort import InputPort
from ..core.OutputPort import OutputPort
from ..core.Funnel import Funnel
class Minifi_flow_json_serializer:
def serialize(self, start_nodes, controllers, parameter_context_name: str, parameter_contexts):
res = {
'parameterContexts': [],
'rootGroup': {
'name': 'MiNiFi Flow',
'processors': [],
'funnels': [],
'connections': [],
'remoteProcessGroups': [],
'controllerServices': [],
'inputPorts': [],
'outputPorts': []
}
}
visited = []
if parameter_context_name:
res['rootGroup']['parameterContextName'] = parameter_context_name
if parameter_contexts:
for context_name in parameter_contexts:
res['parameterContexts'].append({
'identifier': str(uuid.uuid4()),
'name': context_name,
'parameters': []
})
for parameter in parameter_contexts[context_name]:
res['parameterContexts'][-1]['parameters'].append({
'name': parameter.name,
'description': '',
'sensitive': False,
'value': parameter.value
})
for node in start_nodes:
self.serialize_node(node, res['rootGroup'], visited)
for controller in controllers:
self.serialize_controller(controller, res['rootGroup'])
return json.dumps(res)
def serialize_node(self, connectable, root, visited):
visited.append(connectable)
if hasattr(connectable, 'name'):
connectable_name = connectable.name
else:
connectable_name = str(connectable.uuid)
if isinstance(connectable, InputPort):
group = connectable.remote_process_group
if group is None:
root['inputPorts'].append({
'name': connectable_name,
'identifier': str(connectable.instance_id),
'properties': connectable.properties
})
else:
res_group = None
for res_group_candidate in root['remoteProcessGroups']:
assert isinstance(res_group_candidate, dict)
if res_group_candidate['identifier'] == str(group.uuid):
res_group = res_group_candidate
if res_group is None:
res_group = {
'name': group.name,
'identifier': str(group.uuid),
'targetUri': group.url,
'communicationsTimeout': '30 sec',
'yieldDuration': '3 sec',
'transportProtocol': group.transport_protocol,
'inputPorts': []
}
root['remoteProcessGroups'].append(res_group)
res_group['inputPorts'].append({
'identifier': str(connectable.instance_id),
'name': connectable.name,
'useCompression': connectable.use_compression,
'properties': connectable.properties
})
if isinstance(connectable, OutputPort):
group = connectable.remote_process_group
if group is None:
root['outputPorts'].append({
'name': connectable_name,
'identifier': str(connectable.instance_id),
'properties': connectable.properties
})
else:
res_group = None
for res_group_candidate in root['remoteProcessGroups']:
assert isinstance(res_group_candidate, dict)
if res_group_candidate['identifier'] == str(group.uuid):
res_group = res_group_candidate
if res_group is None:
res_group = {
'name': group.name,
'identifier': str(group.uuid),
'targetUri': group.url,
'communicationsTimeout': '30 sec',
'yieldDuration': '3 sec',
'outputPorts': []
}
root['remoteProcessGroups'].append(res_group)
res_group['outputPorts'].append({
'identifier': str(connectable.instance_id),
'name': connectable.name,
'properties': connectable.properties
})
if isinstance(connectable, Processor):
root['processors'].append({
'name': connectable_name,
'identifier': str(connectable.uuid),
'type': connectable.class_prefix + connectable.clazz,
'schedulingStrategy': connectable.schedule['scheduling strategy'],
'schedulingPeriod': connectable.schedule['scheduling period'],
'penaltyDuration': connectable.schedule['penalization period'],
'yieldDuration': connectable.schedule['yield period'],
'runDurationMillis': connectable.schedule['run duration nanos'],
'properties': connectable.properties,
'autoTerminatedRelationships': connectable.auto_terminate,
'concurrentlySchedulableTaskCount': connectable.max_concurrent_tasks
})
for svc in connectable.controller_services:
if svc in visited:
continue
visited.append(svc)
self.serialize_controller(svc, root)
if isinstance(connectable, Funnel):
root['funnels'].append({
'identifier': str(connectable.uuid)
})
for conn_name in connectable.connections:
conn_procs = connectable.connections[conn_name]
if not isinstance(conn_procs, list):
conn_procs = [conn_procs]
for proc in conn_procs:
root['connections'].append({
'name': str(uuid.uuid4()),
'source': {'id': str(connectable.id_for_connection())},
'destination': {'id': str(proc.id_for_connection())}
})
if (all(str(connectable.uuid) != x['identifier'] for x in root['funnels'])):
root['connections'][-1]['selectedRelationships'] = [conn_name]
if proc not in visited:
self.serialize_node(proc, root, visited)
def serialize_controller(self, controller, root):
if hasattr(controller, 'name'):
connectable_name = controller.name
else:
connectable_name = str(controller.uuid)
root['controllerServices'].append({
'name': connectable_name,
'identifier': controller.id,
'type': controller.service_class,
'properties': controller.properties
})
if controller.linked_services:
if len(controller.linked_services) == 1:
root['controllerServices'][-1]['properties']['Linked Services'] = controller.linked_services[0].name
else:
root['controllerServices'][-1]['properties']['Linked Services'] = [{"value": service.name} for service in controller.linked_services]