/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.nifi.controller.serialization;

import org.apache.nifi.bundle.BundleCoordinate;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.connectable.ConnectableType;
import org.apache.nifi.connectable.Connection;
import org.apache.nifi.connectable.Funnel;
import org.apache.nifi.connectable.Port;
import org.apache.nifi.connectable.Position;
import org.apache.nifi.connectable.Size;
import org.apache.nifi.controller.FlowController;
import org.apache.nifi.controller.ProcessorNode;
import org.apache.nifi.controller.ReportingTaskNode;
import org.apache.nifi.controller.Template;
import org.apache.nifi.controller.label.Label;
import org.apache.nifi.controller.service.ControllerServiceNode;
import org.apache.nifi.controller.service.ControllerServiceState;
import org.apache.nifi.encrypt.PropertyEncryptor;
import org.apache.nifi.flowfile.FlowFilePrioritizer;
import org.apache.nifi.groups.ProcessGroup;
import org.apache.nifi.groups.RemoteProcessGroup;
import org.apache.nifi.parameter.Parameter;
import org.apache.nifi.parameter.ParameterContext;
import org.apache.nifi.parameter.ParameterContextManager;
import org.apache.nifi.parameter.ParameterDescriptor;
import org.apache.nifi.persistence.TemplateSerializer;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.registry.VariableDescriptor;
import org.apache.nifi.registry.VariableRegistry;
import org.apache.nifi.registry.flow.FlowRegistry;
import org.apache.nifi.registry.flow.FlowRegistryClient;
import org.apache.nifi.registry.flow.VersionControlInformation;
import org.apache.nifi.remote.PublicPort;
import org.apache.nifi.remote.RemoteGroupPort;
import org.apache.nifi.security.xml.XmlUtils;
import org.apache.nifi.util.CharacterFilterUtils;
import org.apache.nifi.util.StringUtils;
import org.w3c.dom.DOMException;
import org.w3c.dom.Document;
import org.w3c.dom.Element;
import org.w3c.dom.Node;

import javax.xml.parsers.DocumentBuilder;
import javax.xml.parsers.ParserConfigurationException;
import javax.xml.transform.OutputKeys;
import javax.xml.transform.Transformer;
import javax.xml.transform.TransformerException;
import javax.xml.transform.TransformerFactory;
import javax.xml.transform.TransformerFactoryConfigurationError;
import javax.xml.transform.dom.DOMSource;
import javax.xml.transform.stream.StreamResult;
import java.io.BufferedOutputStream;
import java.io.ByteArrayInputStream;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.TimeUnit;

/**
 * Serializes a Flow Controller as XML to an output stream.
 *
 * NOT THREAD-SAFE.
 */
public class StandardFlowSerializer implements FlowSerializer<Document> {

    private static final String MAX_ENCODING_VERSION = "1.4";

    private final PropertyEncryptor encryptor;

    public StandardFlowSerializer(final PropertyEncryptor encryptor) {
        this.encryptor = encryptor;
    }


    @Override
    public Document transform(final FlowController controller, final ScheduledStateLookup scheduledStateLookup) throws FlowSerializationException {
        try {
            // create a new, empty document
            final DocumentBuilder docBuilder = XmlUtils.createSafeDocumentBuilder(true);
            final Document doc = docBuilder.newDocument();

            // populate document with controller state
            final Element rootNode = doc.createElement("flowController");
            rootNode.setAttribute("encoding-version", MAX_ENCODING_VERSION);
            doc.appendChild(rootNode);
            addTextElement(rootNode, "maxTimerDrivenThreadCount", controller.getMaxTimerDrivenThreadCount());
            addTextElement(rootNode, "maxEventDrivenThreadCount", controller.getMaxEventDrivenThreadCount());

            final Element registriesElement = doc.createElement("registries");
            rootNode.appendChild(registriesElement);
            addFlowRegistries(registriesElement, controller.getFlowRegistryClient());

            final Element parameterContextsElement = doc.createElement("parameterContexts");
            rootNode.appendChild(parameterContextsElement);
            addParameterContexts(parameterContextsElement, controller.getFlowManager().getParameterContextManager());

            addProcessGroup(rootNode, controller.getFlowManager().getRootGroup(), "rootGroup", scheduledStateLookup);

            // Add root-level controller services
            final Element controllerServicesNode = doc.createElement("controllerServices");
            rootNode.appendChild(controllerServicesNode);
            for (final ControllerServiceNode serviceNode : controller.getFlowManager().getRootControllerServices()) {
                addControllerService(controllerServicesNode, serviceNode);
            }

            final Element reportingTasksNode = doc.createElement("reportingTasks");
            rootNode.appendChild(reportingTasksNode);
            for (final ReportingTaskNode taskNode : controller.getAllReportingTasks()) {
                addReportingTask(reportingTasksNode, taskNode, encryptor);
            }

            return doc;
        } catch (final ParserConfigurationException | DOMException | TransformerFactoryConfigurationError | IllegalArgumentException e) {
            throw new FlowSerializationException(e);
        }
    }

    @Override
    public void serialize(final Document flowConfiguration, final OutputStream os) throws FlowSerializationException {
        try {
            final DOMSource domSource = new DOMSource(flowConfiguration);
            final StreamResult streamResult = new StreamResult(new BufferedOutputStream(os));

            // configure the transformer and convert the DOM
            final TransformerFactory transformFactory = TransformerFactory.newInstance();
            final Transformer transformer = transformFactory.newTransformer();
            transformer.setOutputProperty("{http://xml.apache.org/xslt}indent-amount", "2");
            transformer.setOutputProperty(OutputKeys.INDENT, "yes");

            // transform the document to byte stream
            transformer.transform(domSource, streamResult);

        } catch (final DOMException | TransformerFactoryConfigurationError | IllegalArgumentException | TransformerException e) {
            throw new FlowSerializationException(e);
        }
    }

    private void addParameterContexts(final Element parentElement, final ParameterContextManager parameterContextManager) {
        for (final ParameterContext parameterContext : parameterContextManager.getParameterContexts()) {
            final Element parameterContextElement = parentElement.getOwnerDocument().createElement("parameterContext");
            parentElement.appendChild(parameterContextElement);

            addStringElement(parameterContextElement, "id", parameterContext.getIdentifier());
            addStringElement(parameterContextElement, "name", parameterContext.getName());
            addStringElement(parameterContextElement, "description", parameterContext.getDescription());

            for (final Parameter parameter : parameterContext.getParameters().values()) {
                addParameter(parameterContextElement, parameter);
            }
        }
    }

    private void addParameter(final Element parentElement, final Parameter parameter) {
        final Element parameterElement = parentElement.getOwnerDocument().createElement("parameter");
        parentElement.appendChild(parameterElement);

        final ParameterDescriptor descriptor = parameter.getDescriptor();
        addStringElement(parameterElement, "name", descriptor.getName());
        addStringElement(parameterElement, "description", descriptor.getDescription());
        addStringElement(parameterElement, "sensitive", String.valueOf(descriptor.isSensitive()));

        if (parameter.getValue() != null) {
            if (descriptor.isSensitive()) {
                final String parameterValue = parameter.getValue();
                addStringElement(parameterElement, "value", parameterValue == null ? null : ENC_PREFIX + encryptor.encrypt(parameterValue) + ENC_SUFFIX);
            } else {
                addStringElement(parameterElement, "value", parameter.getValue());
            }
        }
    }

    private void addFlowRegistries(final Element parentElement, final FlowRegistryClient registryClient) {
        for (final String registryId : registryClient.getRegistryIdentifiers()) {
            final FlowRegistry flowRegistry = registryClient.getFlowRegistry(registryId);

            final Element registryElement = parentElement.getOwnerDocument().createElement("flowRegistry");
            parentElement.appendChild(registryElement);

            addStringElement(registryElement, "id", flowRegistry.getIdentifier());
            addStringElement(registryElement, "name", flowRegistry.getName());
            addStringElement(registryElement, "url", flowRegistry.getURL());
            addStringElement(registryElement, "description", flowRegistry.getDescription());
        }
    }

    private void addStringElement(final Element parentElement, final String elementName, final String value) {
        final Element childElement = parentElement.getOwnerDocument().createElement(elementName);
        childElement.setTextContent(CharacterFilterUtils.filterInvalidXmlCharacters(value));
        parentElement.appendChild(childElement);
    }

    private void addSize(final Element parentElement, final Size size) {
        final Element element = parentElement.getOwnerDocument().createElement("size");
        element.setAttribute("width", String.valueOf(size.getWidth()));
        element.setAttribute("height", String.valueOf(size.getHeight()));
        parentElement.appendChild(element);
    }

    private void addPosition(final Element parentElement, final Position position) {
        addPosition(parentElement, position, "position");
    }

    private void addPosition(final Element parentElement, final Position position, final String elementName) {
        final Element element = parentElement.getOwnerDocument().createElement(elementName);
        element.setAttribute("x", String.valueOf(position.getX()));
        element.setAttribute("y", String.valueOf(position.getY()));
        parentElement.appendChild(element);
    }

    private void addProcessGroup(final Element parentElement, final ProcessGroup group, final String elementName, final ScheduledStateLookup scheduledStateLookup) {
        final Document doc = parentElement.getOwnerDocument();
        final Element element = doc.createElement(elementName);
        parentElement.appendChild(element);
        addTextElement(element, "id", group.getIdentifier());
        addTextElement(element, "versionedComponentId", group.getVersionedComponentId());
        addTextElement(element, "name", group.getName());
        addPosition(element, group.getPosition());
        addTextElement(element, "comment", group.getComments());
        addTextElement(element, "flowfileConcurrency", group.getFlowFileConcurrency().name());
        addTextElement(element, "flowfileOutboundPolicy", group.getFlowFileOutboundPolicy().name());

        final VersionControlInformation versionControlInfo = group.getVersionControlInformation();
        if (versionControlInfo != null) {
            final Element versionControlInfoElement = doc.createElement("versionControlInformation");
            addTextElement(versionControlInfoElement, "registryId", versionControlInfo.getRegistryIdentifier());
            addTextElement(versionControlInfoElement, "bucketId", versionControlInfo.getBucketIdentifier());
            addTextElement(versionControlInfoElement, "bucketName", versionControlInfo.getBucketName());
            addTextElement(versionControlInfoElement, "flowId", versionControlInfo.getFlowIdentifier());
            addTextElement(versionControlInfoElement, "flowName", versionControlInfo.getFlowName());
            addTextElement(versionControlInfoElement, "flowDescription", versionControlInfo.getFlowDescription());
            addTextElement(versionControlInfoElement, "version", versionControlInfo.getVersion());
            element.appendChild(versionControlInfoElement);
        }

        for (final ProcessorNode processor : group.getProcessors()) {
            addProcessor(element, processor, scheduledStateLookup);
        }

        for (final Port port : group.getInputPorts()) {
            if (port instanceof PublicPort) {
                addPublicPort(element, (PublicPort) port, "inputPort", scheduledStateLookup);
            } else {
                addPort(element, port, "inputPort", scheduledStateLookup);
            }
        }

        for (final Port port : group.getOutputPorts()) {
            if (port instanceof PublicPort) {
                addPublicPort(element, (PublicPort) port, "outputPort", scheduledStateLookup);
            } else {
                addPort(element, port, "outputPort", scheduledStateLookup);
            }
        }

        for (final Label label : group.getLabels()) {
            addLabel(element, label);
        }

        for (final Funnel funnel : group.getFunnels()) {
            addFunnel(element, funnel);
        }

        for (final ProcessGroup childGroup : group.getProcessGroups()) {
            addProcessGroup(element, childGroup, "processGroup", scheduledStateLookup);
        }

        for (final RemoteProcessGroup remoteRef : group.getRemoteProcessGroups()) {
            addRemoteProcessGroup(element, remoteRef, scheduledStateLookup);
        }

        for (final Connection connection : group.getConnections()) {
            addConnection(element, connection);
        }

        for (final ControllerServiceNode service : group.getControllerServices(false)) {
            addControllerService(element, service);
        }

        for (final Template template : group.getTemplates()) {
            addTemplate(element, template);
        }

        final VariableRegistry variableRegistry = group.getVariableRegistry();
        for (final Map.Entry<VariableDescriptor, String> entry : variableRegistry.getVariableMap().entrySet()) {
            addVariable(element, entry.getKey().getName(), entry.getValue());
        }

        final ParameterContext parameterContext = group.getParameterContext();
        if (parameterContext != null) {
            addTextElement(element, "parameterContextId", parameterContext.getIdentifier());
        }
    }

    private static void addVariable(final Element parentElement, final String variableName, final String variableValue) {
        final Element variableElement = parentElement.getOwnerDocument().createElement("variable");
        variableElement.setAttribute("name", CharacterFilterUtils.filterInvalidXmlCharacters(variableName));
        variableElement.setAttribute("value", CharacterFilterUtils.filterInvalidXmlCharacters(variableValue));
        parentElement.appendChild(variableElement);
    }

    private static void addBundle(final Element parentElement, final BundleCoordinate coordinate) {
        // group
        final Element groupElement = parentElement.getOwnerDocument().createElement("group");
        groupElement.setTextContent(CharacterFilterUtils.filterInvalidXmlCharacters(coordinate.getGroup()));

        // artifact
        final Element artifactElement = parentElement.getOwnerDocument().createElement("artifact");
        artifactElement.setTextContent(CharacterFilterUtils.filterInvalidXmlCharacters(coordinate.getId()));

        // version
        final Element versionElement = parentElement.getOwnerDocument().createElement("version");
        versionElement.setTextContent(CharacterFilterUtils.filterInvalidXmlCharacters(coordinate.getVersion()));

        // bundle
        final Element bundleElement = parentElement.getOwnerDocument().createElement("bundle");
        bundleElement.appendChild(groupElement);
        bundleElement.appendChild(artifactElement);
        bundleElement.appendChild(versionElement);

        parentElement.appendChild(bundleElement);
    }

    private void addStyle(final Element parentElement, final Map<String, String> style) {
        final Element element = parentElement.getOwnerDocument().createElement("styles");

        for (final Map.Entry<String, String> entry : style.entrySet()) {
            final Element styleElement = parentElement.getOwnerDocument().createElement("style");
            styleElement.setAttribute("name", entry.getKey());
            styleElement.setTextContent(entry.getValue());
            element.appendChild(styleElement);
        }

        parentElement.appendChild(element);
    }

    private void addLabel(final Element parentElement, final Label label) {
        final Document doc = parentElement.getOwnerDocument();
        final Element element = doc.createElement("label");
        parentElement.appendChild(element);
        addTextElement(element, "id", label.getIdentifier());
        addTextElement(element, "versionedComponentId", label.getVersionedComponentId());

        addPosition(element, label.getPosition());
        addSize(element, label.getSize());
        addStyle(element, label.getStyle());

        addTextElement(element, "value", label.getValue());
        parentElement.appendChild(element);
    }

    private void addFunnel(final Element parentElement, final Funnel funnel) {
        final Document doc = parentElement.getOwnerDocument();
        final Element element = doc.createElement("funnel");
        parentElement.appendChild(element);
        addTextElement(element, "id", funnel.getIdentifier());
        addTextElement(element, "versionedComponentId", funnel.getVersionedComponentId());
        addPosition(element, funnel.getPosition());
    }

    private void addRemoteProcessGroup(final Element parentElement, final RemoteProcessGroup remoteRef, final ScheduledStateLookup scheduledStateLookup) {
        final Document doc = parentElement.getOwnerDocument();
        final Element element = doc.createElement("remoteProcessGroup");
        parentElement.appendChild(element);
        addTextElement(element, "id", remoteRef.getIdentifier());
        addTextElement(element, "versionedComponentId", remoteRef.getVersionedComponentId());
        addTextElement(element, "name", remoteRef.getName());
        addPosition(element, remoteRef.getPosition());
        addTextElement(element, "comment", remoteRef.getComments());
        addTextElement(element, "url", remoteRef.getTargetUri());
        addTextElement(element, "urls", remoteRef.getTargetUris());
        addTextElement(element, "timeout", remoteRef.getCommunicationsTimeout());
        addTextElement(element, "yieldPeriod", remoteRef.getYieldDuration());
        addTextElement(element, "transmitting", String.valueOf(remoteRef.isTransmitting()));
        addTextElement(element, "transportProtocol", remoteRef.getTransportProtocol().name());
        addTextElement(element, "proxyHost", remoteRef.getProxyHost());
        if (remoteRef.getProxyPort() != null) {
            addTextElement(element, "proxyPort", remoteRef.getProxyPort());
        }
        addTextElement(element, "proxyUser", remoteRef.getProxyUser());
        if (!StringUtils.isEmpty(remoteRef.getProxyPassword())) {
            final String value = ENC_PREFIX + encryptor.encrypt(remoteRef.getProxyPassword()) + ENC_SUFFIX;
            addTextElement(element, "proxyPassword", value);
        }
        if (remoteRef.getNetworkInterface() != null) {
            addTextElement(element, "networkInterface", remoteRef.getNetworkInterface());
        }

        for (final RemoteGroupPort port : remoteRef.getInputPorts()) {
            if (port.hasIncomingConnection()) {
                addRemoteGroupPort(element, port, "inputPort", scheduledStateLookup);
            }
        }

        for (final RemoteGroupPort port : remoteRef.getOutputPorts()) {
            if (!port.getConnections().isEmpty()) {
                addRemoteGroupPort(element, port, "outputPort", scheduledStateLookup);
            }
        }

        parentElement.appendChild(element);
    }

    private void addRemoteGroupPort(final Element parentElement, final RemoteGroupPort port, final String elementName, final ScheduledStateLookup scheduledStateLookup) {
        final Document doc = parentElement.getOwnerDocument();
        final Element element = doc.createElement(elementName);
        parentElement.appendChild(element);
        addTextElement(element, "id", port.getIdentifier());
        addTextElement(element, "versionedComponentId", port.getVersionedComponentId());
        addTextElement(element, "name", port.getName());
        addPosition(element, port.getPosition());
        addTextElement(element, "comments", port.getComments());
        addTextElement(element, "scheduledState", scheduledStateLookup.getScheduledState(port).name());
        addTextElement(element, "targetId", port.getTargetIdentifier());
        addTextElement(element, "maxConcurrentTasks", port.getMaxConcurrentTasks());
        addTextElement(element, "useCompression", String.valueOf(port.isUseCompression()));
        final Integer batchCount = port.getBatchCount();
        if (batchCount != null && batchCount > 0) {
            addTextElement(element, "batchCount", batchCount);
        }
        final String batchSize = port.getBatchSize();
        if (batchSize != null && batchSize.length() > 0) {
            addTextElement(element, "batchSize", batchSize);
        }
        final String batchDuration = port.getBatchDuration();
        if (batchDuration != null && batchDuration.length() > 0) {
            addTextElement(element, "batchDuration", batchDuration);
        }

        parentElement.appendChild(element);
    }

    private void addPort(final Element parentElement, final Port port, final String elementName, final ScheduledStateLookup scheduledStateLookup) {
        final Document doc = parentElement.getOwnerDocument();
        final Element element = doc.createElement(elementName);
        parentElement.appendChild(element);
        addTextElement(element, "id", port.getIdentifier());
        addTextElement(element, "versionedComponentId", port.getVersionedComponentId());
        addTextElement(element, "name", port.getName());
        addPosition(element, port.getPosition());
        addTextElement(element, "comments", port.getComments());
        addTextElement(element, "scheduledState", scheduledStateLookup.getScheduledState(port).name());

        parentElement.appendChild(element);
    }

    private void addPublicPort(final Element parentElement, final PublicPort port, final String elementName, final ScheduledStateLookup scheduledStateLookup) {
        final Document doc = parentElement.getOwnerDocument();
        final Element element = doc.createElement(elementName);
        parentElement.appendChild(element);
        addTextElement(element, "id", port.getIdentifier());
        addTextElement(element, "versionedComponentId", port.getVersionedComponentId());
        addTextElement(element, "name", port.getName());
        addPosition(element, port.getPosition());
        addTextElement(element, "comments", port.getComments());
        addTextElement(element, "scheduledState", scheduledStateLookup.getScheduledState(port).name());
        addTextElement(element, "maxConcurrentTasks", String.valueOf(port.getMaxConcurrentTasks()));
        addTextElement(element, "allowRemoteAccess", Boolean.TRUE.toString());
        for (final String user : port.getUserAccessControl()) {
            addTextElement(element, "userAccessControl", user);
        }
        for (final String group : port.getGroupAccessControl()) {
            addTextElement(element, "groupAccessControl", group);
        }

        parentElement.appendChild(element);
    }

    private void addProcessor(final Element parentElement, final ProcessorNode processor, final ScheduledStateLookup scheduledStateLookup) {
        final Document doc = parentElement.getOwnerDocument();
        final Element element = doc.createElement("processor");
        parentElement.appendChild(element);
        addTextElement(element, "id", processor.getIdentifier());
        addTextElement(element, "versionedComponentId", processor.getVersionedComponentId());
        addTextElement(element, "name", processor.getName());

        addPosition(element, processor.getPosition());
        addStyle(element, processor.getStyle());

        addTextElement(element, "comment", processor.getComments());
        addTextElement(element, "class", processor.getCanonicalClassName());

        addBundle(element, processor.getBundleCoordinate());

        addTextElement(element, "maxConcurrentTasks", processor.getMaxConcurrentTasks());
        addTextElement(element, "schedulingPeriod", processor.getSchedulingPeriod());
        addTextElement(element, "penalizationPeriod", processor.getPenalizationPeriod());
        addTextElement(element, "yieldPeriod", processor.getYieldPeriod());
        addTextElement(element, "bulletinLevel", processor.getBulletinLevel().toString());
        addTextElement(element, "lossTolerant", String.valueOf(processor.isLossTolerant()));
        addTextElement(element, "scheduledState", scheduledStateLookup.getScheduledState(processor).name());
        addTextElement(element, "schedulingStrategy", processor.getSchedulingStrategy().name());
        addTextElement(element, "executionNode", processor.getExecutionNode().name());
        addTextElement(element, "runDurationNanos", processor.getRunDuration(TimeUnit.NANOSECONDS));

        addConfiguration(element, processor.getRawPropertyValues(), processor.getAnnotationData(), encryptor);

        for (final Relationship rel : processor.getAutoTerminatedRelationships()) {
            addTextElement(element, "autoTerminatedRelationship", rel.getName());
        }
    }

    private static void addConfiguration(final Element element, final Map<PropertyDescriptor, String> properties, final String annotationData, final PropertyEncryptor encryptor) {
        final Document doc = element.getOwnerDocument();
        for (final Map.Entry<PropertyDescriptor, String> entry : properties.entrySet()) {
            final PropertyDescriptor descriptor = entry.getKey();
            String value = entry.getValue();

            if (value == null) {
                value = descriptor.getDefaultValue();
            }

            if (value != null && descriptor.isSensitive()) {
                value = ENC_PREFIX + encryptor.encrypt(value) + ENC_SUFFIX;
            }

            final Element propElement = doc.createElement("property");
            addTextElement(propElement, "name", descriptor.getName());
            if (value != null) {
                addTextElement(propElement, "value", value);
            }

            element.appendChild(propElement);
        }

        if (annotationData != null) {
            addTextElement(element, "annotationData", annotationData);
        }
    }

    private void addConnection(final Element parentElement, final Connection connection) {
        final Document doc = parentElement.getOwnerDocument();
        final Element element = doc.createElement("connection");
        parentElement.appendChild(element);
        addTextElement(element, "id", connection.getIdentifier());
        addTextElement(element, "versionedComponentId", connection.getVersionedComponentId());
        addTextElement(element, "name", connection.getName());

        final Element bendPointsElement = doc.createElement("bendPoints");
        element.appendChild(bendPointsElement);
        for (final Position bendPoint : connection.getBendPoints()) {
            addPosition(bendPointsElement, bendPoint, "bendPoint");
        }

        addTextElement(element, "labelIndex", connection.getLabelIndex());
        addTextElement(element, "zIndex", connection.getZIndex());

        final String sourceId = connection.getSource().getIdentifier();
        final ConnectableType sourceType = connection.getSource().getConnectableType();
        final String sourceGroupId;
        if (sourceType == ConnectableType.REMOTE_OUTPUT_PORT) {
            sourceGroupId = ((RemoteGroupPort) connection.getSource()).getRemoteProcessGroup().getIdentifier();
        } else {
            sourceGroupId = connection.getSource().getProcessGroup().getIdentifier();
        }

        final ConnectableType destinationType = connection.getDestination().getConnectableType();
        final String destinationId = connection.getDestination().getIdentifier();
        final String destinationGroupId;
        if (destinationType == ConnectableType.REMOTE_INPUT_PORT) {
            destinationGroupId = ((RemoteGroupPort) connection.getDestination()).getRemoteProcessGroup().getIdentifier();
        } else {
            destinationGroupId = connection.getDestination().getProcessGroup().getIdentifier();
        }

        addTextElement(element, "sourceId", sourceId);
        addTextElement(element, "sourceGroupId", sourceGroupId);
        addTextElement(element, "sourceType", sourceType.toString());

        addTextElement(element, "destinationId", destinationId);
        addTextElement(element, "destinationGroupId", destinationGroupId);
        addTextElement(element, "destinationType", destinationType.toString());

        for (final Relationship relationship : connection.getRelationships()) {
            addTextElement(element, "relationship", relationship.getName());
        }

        addTextElement(element, "maxWorkQueueSize", connection.getFlowFileQueue().getBackPressureObjectThreshold());
        addTextElement(element, "maxWorkQueueDataSize", connection.getFlowFileQueue().getBackPressureDataSizeThreshold());

        addTextElement(element, "flowFileExpiration", connection.getFlowFileQueue().getFlowFileExpiration());
        for (final FlowFilePrioritizer comparator : connection.getFlowFileQueue().getPriorities()) {
            final String className = comparator.getClass().getCanonicalName();
            addTextElement(element, "queuePrioritizerClass", className);
        }

        addTextElement(element, "loadBalanceStrategy", connection.getFlowFileQueue().getLoadBalanceStrategy().name());
        addTextElement(element, "partitioningAttribute", connection.getFlowFileQueue().getPartitioningAttribute());
        addTextElement(element, "loadBalanceCompression", connection.getFlowFileQueue().getLoadBalanceCompression().name());

        parentElement.appendChild(element);
    }

    public void addControllerService(final Element element, final ControllerServiceNode serviceNode) {
        final Element serviceElement = element.getOwnerDocument().createElement("controllerService");
        addTextElement(serviceElement, "id", serviceNode.getIdentifier());
        addTextElement(serviceElement, "versionedComponentId", serviceNode.getVersionedComponentId());
        addTextElement(serviceElement, "name", serviceNode.getName());
        addTextElement(serviceElement, "comment", serviceNode.getComments());
        addTextElement(serviceElement, "class", serviceNode.getCanonicalClassName());

        addBundle(serviceElement, serviceNode.getBundleCoordinate());

        final ControllerServiceState state = serviceNode.getState();
        final boolean enabled = (state == ControllerServiceState.ENABLED || state == ControllerServiceState.ENABLING);
        addTextElement(serviceElement, "enabled", String.valueOf(enabled));

        addConfiguration(serviceElement, serviceNode.getRawPropertyValues(), serviceNode.getAnnotationData(), encryptor);

        element.appendChild(serviceElement);
    }

    public static void addReportingTask(final Element element, final ReportingTaskNode taskNode, final PropertyEncryptor encryptor) {
        final Element taskElement = element.getOwnerDocument().createElement("reportingTask");
        addTextElement(taskElement, "id", taskNode.getIdentifier());
        addTextElement(taskElement, "name", taskNode.getName());
        addTextElement(taskElement, "comment", taskNode.getComments());
        addTextElement(taskElement, "class", taskNode.getCanonicalClassName());

        addBundle(taskElement, taskNode.getBundleCoordinate());

        addTextElement(taskElement, "schedulingPeriod", taskNode.getSchedulingPeriod());
        addTextElement(taskElement, "scheduledState", taskNode.getScheduledState().name());
        addTextElement(taskElement, "schedulingStrategy", taskNode.getSchedulingStrategy().name());

        addConfiguration(taskElement, taskNode.getRawPropertyValues(), taskNode.getAnnotationData(), encryptor);

        element.appendChild(taskElement);
    }

    private static void addTextElement(final Element element, final String name, final long value) {
        addTextElement(element, name, String.valueOf(value));
    }

    private static void addTextElement(final Element element, final String name, final String value) {
        final Document doc = element.getOwnerDocument();
        final Element toAdd = doc.createElement(name);
        toAdd.setTextContent(CharacterFilterUtils.filterInvalidXmlCharacters(value)); // value should already be filtered, but just in case ensure there are no invalid xml characters
        element.appendChild(toAdd);
    }

    private static void addTextElement(final Element element, final String name, final Optional<String> value) {
        if (!value.isPresent()) {
            return;
        }

        final Document doc = element.getOwnerDocument();
        final Element toAdd = doc.createElement(name);
        toAdd.setTextContent(CharacterFilterUtils.filterInvalidXmlCharacters(value.get())); // value should already be filtered, but just in case ensure there are no invalid xml characters
        element.appendChild(toAdd);
    }

    public static void addTemplate(final Element element, final Template template) {
        try {
            final byte[] serialized = TemplateSerializer.serialize(template.getDetails());

            final DocumentBuilder docBuilder = XmlUtils.createSafeDocumentBuilder(true);
            final Document document;
            try (final InputStream in = new ByteArrayInputStream(serialized)) {
                document = docBuilder.parse(in);
            }

            final Node templateNode = element.getOwnerDocument().importNode(document.getDocumentElement(), true);
            element.appendChild(templateNode);
        } catch (final Exception e) {
            throw new FlowSerializationException(e);
        }
    }
}
