blob: 55a48959db673d9f29a627e0c3379e717f987b1e [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.controller.serialization;
import org.apache.nifi.bundle.BundleCoordinate;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.connectable.ConnectableType;
import org.apache.nifi.connectable.Connection;
import org.apache.nifi.connectable.Funnel;
import org.apache.nifi.connectable.Port;
import org.apache.nifi.connectable.Position;
import org.apache.nifi.connectable.Size;
import org.apache.nifi.controller.FlowController;
import org.apache.nifi.controller.ProcessorNode;
import org.apache.nifi.controller.ReportingTaskNode;
import org.apache.nifi.controller.Template;
import org.apache.nifi.controller.label.Label;
import org.apache.nifi.controller.service.ControllerServiceNode;
import org.apache.nifi.controller.service.ControllerServiceState;
import org.apache.nifi.encrypt.PropertyEncryptor;
import org.apache.nifi.flowfile.FlowFilePrioritizer;
import org.apache.nifi.groups.ProcessGroup;
import org.apache.nifi.groups.RemoteProcessGroup;
import org.apache.nifi.parameter.Parameter;
import org.apache.nifi.parameter.ParameterContext;
import org.apache.nifi.parameter.ParameterContextManager;
import org.apache.nifi.parameter.ParameterDescriptor;
import org.apache.nifi.persistence.TemplateSerializer;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.registry.VariableDescriptor;
import org.apache.nifi.registry.VariableRegistry;
import org.apache.nifi.registry.flow.FlowRegistry;
import org.apache.nifi.registry.flow.FlowRegistryClient;
import org.apache.nifi.registry.flow.VersionControlInformation;
import org.apache.nifi.remote.PublicPort;
import org.apache.nifi.remote.RemoteGroupPort;
import org.apache.nifi.security.xml.XmlUtils;
import org.apache.nifi.util.CharacterFilterUtils;
import org.apache.nifi.util.StringUtils;
import org.w3c.dom.DOMException;
import org.w3c.dom.Document;
import org.w3c.dom.Element;
import org.w3c.dom.Node;
import javax.xml.parsers.DocumentBuilder;
import javax.xml.parsers.ParserConfigurationException;
import javax.xml.transform.OutputKeys;
import javax.xml.transform.Transformer;
import javax.xml.transform.TransformerException;
import javax.xml.transform.TransformerFactory;
import javax.xml.transform.TransformerFactoryConfigurationError;
import javax.xml.transform.dom.DOMSource;
import javax.xml.transform.stream.StreamResult;
import java.io.BufferedOutputStream;
import java.io.ByteArrayInputStream;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.TimeUnit;
/**
* Serializes a Flow Controller as XML to an output stream.
*
* NOT THREAD-SAFE.
*/
public class StandardFlowSerializer implements FlowSerializer<Document> {
private static final String MAX_ENCODING_VERSION = "1.4";
private final PropertyEncryptor encryptor;
public StandardFlowSerializer(final PropertyEncryptor encryptor) {
this.encryptor = encryptor;
}
@Override
public Document transform(final FlowController controller, final ScheduledStateLookup scheduledStateLookup) throws FlowSerializationException {
try {
// create a new, empty document
final DocumentBuilder docBuilder = XmlUtils.createSafeDocumentBuilder(true);
final Document doc = docBuilder.newDocument();
// populate document with controller state
final Element rootNode = doc.createElement("flowController");
rootNode.setAttribute("encoding-version", MAX_ENCODING_VERSION);
doc.appendChild(rootNode);
addTextElement(rootNode, "maxTimerDrivenThreadCount", controller.getMaxTimerDrivenThreadCount());
addTextElement(rootNode, "maxEventDrivenThreadCount", controller.getMaxEventDrivenThreadCount());
final Element registriesElement = doc.createElement("registries");
rootNode.appendChild(registriesElement);
addFlowRegistries(registriesElement, controller.getFlowRegistryClient());
final Element parameterContextsElement = doc.createElement("parameterContexts");
rootNode.appendChild(parameterContextsElement);
addParameterContexts(parameterContextsElement, controller.getFlowManager().getParameterContextManager());
addProcessGroup(rootNode, controller.getFlowManager().getRootGroup(), "rootGroup", scheduledStateLookup);
// Add root-level controller services
final Element controllerServicesNode = doc.createElement("controllerServices");
rootNode.appendChild(controllerServicesNode);
for (final ControllerServiceNode serviceNode : controller.getFlowManager().getRootControllerServices()) {
addControllerService(controllerServicesNode, serviceNode);
}
final Element reportingTasksNode = doc.createElement("reportingTasks");
rootNode.appendChild(reportingTasksNode);
for (final ReportingTaskNode taskNode : controller.getAllReportingTasks()) {
addReportingTask(reportingTasksNode, taskNode, encryptor);
}
return doc;
} catch (final ParserConfigurationException | DOMException | TransformerFactoryConfigurationError | IllegalArgumentException e) {
throw new FlowSerializationException(e);
}
}
@Override
public void serialize(final Document flowConfiguration, final OutputStream os) throws FlowSerializationException {
try {
final DOMSource domSource = new DOMSource(flowConfiguration);
final StreamResult streamResult = new StreamResult(new BufferedOutputStream(os));
// configure the transformer and convert the DOM
final TransformerFactory transformFactory = TransformerFactory.newInstance();
final Transformer transformer = transformFactory.newTransformer();
transformer.setOutputProperty("{http://xml.apache.org/xslt}indent-amount", "2");
transformer.setOutputProperty(OutputKeys.INDENT, "yes");
// transform the document to byte stream
transformer.transform(domSource, streamResult);
} catch (final DOMException | TransformerFactoryConfigurationError | IllegalArgumentException | TransformerException e) {
throw new FlowSerializationException(e);
}
}
private void addParameterContexts(final Element parentElement, final ParameterContextManager parameterContextManager) {
for (final ParameterContext parameterContext : parameterContextManager.getParameterContexts()) {
final Element parameterContextElement = parentElement.getOwnerDocument().createElement("parameterContext");
parentElement.appendChild(parameterContextElement);
addStringElement(parameterContextElement, "id", parameterContext.getIdentifier());
addStringElement(parameterContextElement, "name", parameterContext.getName());
addStringElement(parameterContextElement, "description", parameterContext.getDescription());
for (final Parameter parameter : parameterContext.getParameters().values()) {
addParameter(parameterContextElement, parameter);
}
}
}
private void addParameter(final Element parentElement, final Parameter parameter) {
final Element parameterElement = parentElement.getOwnerDocument().createElement("parameter");
parentElement.appendChild(parameterElement);
final ParameterDescriptor descriptor = parameter.getDescriptor();
addStringElement(parameterElement, "name", descriptor.getName());
addStringElement(parameterElement, "description", descriptor.getDescription());
addStringElement(parameterElement, "sensitive", String.valueOf(descriptor.isSensitive()));
if (parameter.getValue() != null) {
if (descriptor.isSensitive()) {
final String parameterValue = parameter.getValue();
addStringElement(parameterElement, "value", parameterValue == null ? null : ENC_PREFIX + encryptor.encrypt(parameterValue) + ENC_SUFFIX);
} else {
addStringElement(parameterElement, "value", parameter.getValue());
}
}
}
private void addFlowRegistries(final Element parentElement, final FlowRegistryClient registryClient) {
for (final String registryId : registryClient.getRegistryIdentifiers()) {
final FlowRegistry flowRegistry = registryClient.getFlowRegistry(registryId);
final Element registryElement = parentElement.getOwnerDocument().createElement("flowRegistry");
parentElement.appendChild(registryElement);
addStringElement(registryElement, "id", flowRegistry.getIdentifier());
addStringElement(registryElement, "name", flowRegistry.getName());
addStringElement(registryElement, "url", flowRegistry.getURL());
addStringElement(registryElement, "description", flowRegistry.getDescription());
}
}
private void addStringElement(final Element parentElement, final String elementName, final String value) {
final Element childElement = parentElement.getOwnerDocument().createElement(elementName);
childElement.setTextContent(CharacterFilterUtils.filterInvalidXmlCharacters(value));
parentElement.appendChild(childElement);
}
private void addSize(final Element parentElement, final Size size) {
final Element element = parentElement.getOwnerDocument().createElement("size");
element.setAttribute("width", String.valueOf(size.getWidth()));
element.setAttribute("height", String.valueOf(size.getHeight()));
parentElement.appendChild(element);
}
private void addPosition(final Element parentElement, final Position position) {
addPosition(parentElement, position, "position");
}
private void addPosition(final Element parentElement, final Position position, final String elementName) {
final Element element = parentElement.getOwnerDocument().createElement(elementName);
element.setAttribute("x", String.valueOf(position.getX()));
element.setAttribute("y", String.valueOf(position.getY()));
parentElement.appendChild(element);
}
private void addProcessGroup(final Element parentElement, final ProcessGroup group, final String elementName, final ScheduledStateLookup scheduledStateLookup) {
final Document doc = parentElement.getOwnerDocument();
final Element element = doc.createElement(elementName);
parentElement.appendChild(element);
addTextElement(element, "id", group.getIdentifier());
addTextElement(element, "versionedComponentId", group.getVersionedComponentId());
addTextElement(element, "name", group.getName());
addPosition(element, group.getPosition());
addTextElement(element, "comment", group.getComments());
addTextElement(element, "flowfileConcurrency", group.getFlowFileConcurrency().name());
addTextElement(element, "flowfileOutboundPolicy", group.getFlowFileOutboundPolicy().name());
addTextElement(element, "defaultFlowFileExpiration", group.getDefaultFlowFileExpiration());
addTextElement(element, "defaultBackPressureObjectThreshold", group.getDefaultBackPressureObjectThreshold());
addTextElement(element, "defaultBackPressureDataSizeThreshold", group.getDefaultBackPressureDataSizeThreshold());
final VersionControlInformation versionControlInfo = group.getVersionControlInformation();
if (versionControlInfo != null) {
final Element versionControlInfoElement = doc.createElement("versionControlInformation");
addTextElement(versionControlInfoElement, "registryId", versionControlInfo.getRegistryIdentifier());
addTextElement(versionControlInfoElement, "bucketId", versionControlInfo.getBucketIdentifier());
addTextElement(versionControlInfoElement, "bucketName", versionControlInfo.getBucketName());
addTextElement(versionControlInfoElement, "flowId", versionControlInfo.getFlowIdentifier());
addTextElement(versionControlInfoElement, "flowName", versionControlInfo.getFlowName());
addTextElement(versionControlInfoElement, "flowDescription", versionControlInfo.getFlowDescription());
addTextElement(versionControlInfoElement, "version", versionControlInfo.getVersion());
element.appendChild(versionControlInfoElement);
}
for (final ProcessorNode processor : group.getProcessors()) {
addProcessor(element, processor, scheduledStateLookup);
}
for (final Port port : group.getInputPorts()) {
if (port instanceof PublicPort) {
addPublicPort(element, (PublicPort) port, "inputPort", scheduledStateLookup);
} else {
addPort(element, port, "inputPort", scheduledStateLookup);
}
}
for (final Port port : group.getOutputPorts()) {
if (port instanceof PublicPort) {
addPublicPort(element, (PublicPort) port, "outputPort", scheduledStateLookup);
} else {
addPort(element, port, "outputPort", scheduledStateLookup);
}
}
for (final Label label : group.getLabels()) {
addLabel(element, label);
}
for (final Funnel funnel : group.getFunnels()) {
addFunnel(element, funnel);
}
for (final ProcessGroup childGroup : group.getProcessGroups()) {
addProcessGroup(element, childGroup, "processGroup", scheduledStateLookup);
}
for (final RemoteProcessGroup remoteRef : group.getRemoteProcessGroups()) {
addRemoteProcessGroup(element, remoteRef, scheduledStateLookup);
}
for (final Connection connection : group.getConnections()) {
addConnection(element, connection);
}
for (final ControllerServiceNode service : group.getControllerServices(false)) {
addControllerService(element, service);
}
for (final Template template : group.getTemplates()) {
addTemplate(element, template);
}
final VariableRegistry variableRegistry = group.getVariableRegistry();
for (final Map.Entry<VariableDescriptor, String> entry : variableRegistry.getVariableMap().entrySet()) {
addVariable(element, entry.getKey().getName(), entry.getValue());
}
final ParameterContext parameterContext = group.getParameterContext();
if (parameterContext != null) {
addTextElement(element, "parameterContextId", parameterContext.getIdentifier());
}
}
private static void addVariable(final Element parentElement, final String variableName, final String variableValue) {
final Element variableElement = parentElement.getOwnerDocument().createElement("variable");
variableElement.setAttribute("name", CharacterFilterUtils.filterInvalidXmlCharacters(variableName));
variableElement.setAttribute("value", CharacterFilterUtils.filterInvalidXmlCharacters(variableValue));
parentElement.appendChild(variableElement);
}
private static void addBundle(final Element parentElement, final BundleCoordinate coordinate) {
// group
final Element groupElement = parentElement.getOwnerDocument().createElement("group");
groupElement.setTextContent(CharacterFilterUtils.filterInvalidXmlCharacters(coordinate.getGroup()));
// artifact
final Element artifactElement = parentElement.getOwnerDocument().createElement("artifact");
artifactElement.setTextContent(CharacterFilterUtils.filterInvalidXmlCharacters(coordinate.getId()));
// version
final Element versionElement = parentElement.getOwnerDocument().createElement("version");
versionElement.setTextContent(CharacterFilterUtils.filterInvalidXmlCharacters(coordinate.getVersion()));
// bundle
final Element bundleElement = parentElement.getOwnerDocument().createElement("bundle");
bundleElement.appendChild(groupElement);
bundleElement.appendChild(artifactElement);
bundleElement.appendChild(versionElement);
parentElement.appendChild(bundleElement);
}
private void addStyle(final Element parentElement, final Map<String, String> style) {
final Element element = parentElement.getOwnerDocument().createElement("styles");
for (final Map.Entry<String, String> entry : style.entrySet()) {
final Element styleElement = parentElement.getOwnerDocument().createElement("style");
styleElement.setAttribute("name", entry.getKey());
styleElement.setTextContent(entry.getValue());
element.appendChild(styleElement);
}
parentElement.appendChild(element);
}
private void addLabel(final Element parentElement, final Label label) {
final Document doc = parentElement.getOwnerDocument();
final Element element = doc.createElement("label");
parentElement.appendChild(element);
addTextElement(element, "id", label.getIdentifier());
addTextElement(element, "versionedComponentId", label.getVersionedComponentId());
addPosition(element, label.getPosition());
addSize(element, label.getSize());
addStyle(element, label.getStyle());
addTextElement(element, "value", label.getValue());
parentElement.appendChild(element);
}
private void addFunnel(final Element parentElement, final Funnel funnel) {
final Document doc = parentElement.getOwnerDocument();
final Element element = doc.createElement("funnel");
parentElement.appendChild(element);
addTextElement(element, "id", funnel.getIdentifier());
addTextElement(element, "versionedComponentId", funnel.getVersionedComponentId());
addPosition(element, funnel.getPosition());
}
private void addRemoteProcessGroup(final Element parentElement, final RemoteProcessGroup remoteRef, final ScheduledStateLookup scheduledStateLookup) {
final Document doc = parentElement.getOwnerDocument();
final Element element = doc.createElement("remoteProcessGroup");
parentElement.appendChild(element);
addTextElement(element, "id", remoteRef.getIdentifier());
addTextElement(element, "versionedComponentId", remoteRef.getVersionedComponentId());
addTextElement(element, "name", remoteRef.getName());
addPosition(element, remoteRef.getPosition());
addTextElement(element, "comment", remoteRef.getComments());
addTextElement(element, "url", remoteRef.getTargetUri());
addTextElement(element, "urls", remoteRef.getTargetUris());
addTextElement(element, "timeout", remoteRef.getCommunicationsTimeout());
addTextElement(element, "yieldPeriod", remoteRef.getYieldDuration());
addTextElement(element, "transmitting", String.valueOf(remoteRef.isTransmitting()));
addTextElement(element, "transportProtocol", remoteRef.getTransportProtocol().name());
addTextElement(element, "proxyHost", remoteRef.getProxyHost());
if (remoteRef.getProxyPort() != null) {
addTextElement(element, "proxyPort", remoteRef.getProxyPort());
}
addTextElement(element, "proxyUser", remoteRef.getProxyUser());
if (!StringUtils.isEmpty(remoteRef.getProxyPassword())) {
final String value = ENC_PREFIX + encryptor.encrypt(remoteRef.getProxyPassword()) + ENC_SUFFIX;
addTextElement(element, "proxyPassword", value);
}
if (remoteRef.getNetworkInterface() != null) {
addTextElement(element, "networkInterface", remoteRef.getNetworkInterface());
}
for (final RemoteGroupPort port : remoteRef.getInputPorts()) {
if (port.hasIncomingConnection()) {
addRemoteGroupPort(element, port, "inputPort", scheduledStateLookup);
}
}
for (final RemoteGroupPort port : remoteRef.getOutputPorts()) {
if (!port.getConnections().isEmpty()) {
addRemoteGroupPort(element, port, "outputPort", scheduledStateLookup);
}
}
parentElement.appendChild(element);
}
private void addRemoteGroupPort(final Element parentElement, final RemoteGroupPort port, final String elementName, final ScheduledStateLookup scheduledStateLookup) {
final Document doc = parentElement.getOwnerDocument();
final Element element = doc.createElement(elementName);
parentElement.appendChild(element);
addTextElement(element, "id", port.getIdentifier());
addTextElement(element, "versionedComponentId", port.getVersionedComponentId());
addTextElement(element, "name", port.getName());
addPosition(element, port.getPosition());
addTextElement(element, "comments", port.getComments());
addTextElement(element, "scheduledState", scheduledStateLookup.getScheduledState(port).name());
addTextElement(element, "targetId", port.getTargetIdentifier());
addTextElement(element, "maxConcurrentTasks", port.getMaxConcurrentTasks());
addTextElement(element, "useCompression", String.valueOf(port.isUseCompression()));
final Integer batchCount = port.getBatchCount();
if (batchCount != null && batchCount > 0) {
addTextElement(element, "batchCount", batchCount);
}
final String batchSize = port.getBatchSize();
if (batchSize != null && batchSize.length() > 0) {
addTextElement(element, "batchSize", batchSize);
}
final String batchDuration = port.getBatchDuration();
if (batchDuration != null && batchDuration.length() > 0) {
addTextElement(element, "batchDuration", batchDuration);
}
parentElement.appendChild(element);
}
private void addPort(final Element parentElement, final Port port, final String elementName, final ScheduledStateLookup scheduledStateLookup) {
final Document doc = parentElement.getOwnerDocument();
final Element element = doc.createElement(elementName);
parentElement.appendChild(element);
addTextElement(element, "id", port.getIdentifier());
addTextElement(element, "versionedComponentId", port.getVersionedComponentId());
addTextElement(element, "name", port.getName());
addPosition(element, port.getPosition());
addTextElement(element, "comments", port.getComments());
addTextElement(element, "scheduledState", scheduledStateLookup.getScheduledState(port).name());
parentElement.appendChild(element);
}
private void addPublicPort(final Element parentElement, final PublicPort port, final String elementName, final ScheduledStateLookup scheduledStateLookup) {
final Document doc = parentElement.getOwnerDocument();
final Element element = doc.createElement(elementName);
parentElement.appendChild(element);
addTextElement(element, "id", port.getIdentifier());
addTextElement(element, "versionedComponentId", port.getVersionedComponentId());
addTextElement(element, "name", port.getName());
addPosition(element, port.getPosition());
addTextElement(element, "comments", port.getComments());
addTextElement(element, "scheduledState", scheduledStateLookup.getScheduledState(port).name());
addTextElement(element, "maxConcurrentTasks", String.valueOf(port.getMaxConcurrentTasks()));
addTextElement(element, "allowRemoteAccess", Boolean.TRUE.toString());
for (final String user : port.getUserAccessControl()) {
addTextElement(element, "userAccessControl", user);
}
for (final String group : port.getGroupAccessControl()) {
addTextElement(element, "groupAccessControl", group);
}
parentElement.appendChild(element);
}
private void addProcessor(final Element parentElement, final ProcessorNode processor, final ScheduledStateLookup scheduledStateLookup) {
final Document doc = parentElement.getOwnerDocument();
final Element element = doc.createElement("processor");
parentElement.appendChild(element);
addTextElement(element, "id", processor.getIdentifier());
addTextElement(element, "versionedComponentId", processor.getVersionedComponentId());
addTextElement(element, "name", processor.getName());
addPosition(element, processor.getPosition());
addStyle(element, processor.getStyle());
addTextElement(element, "comment", processor.getComments());
addTextElement(element, "class", processor.getCanonicalClassName());
addBundle(element, processor.getBundleCoordinate());
addTextElement(element, "maxConcurrentTasks", processor.getMaxConcurrentTasks());
addTextElement(element, "schedulingPeriod", processor.getSchedulingPeriod());
addTextElement(element, "penalizationPeriod", processor.getPenalizationPeriod());
addTextElement(element, "yieldPeriod", processor.getYieldPeriod());
addTextElement(element, "bulletinLevel", processor.getBulletinLevel().toString());
addTextElement(element, "lossTolerant", String.valueOf(processor.isLossTolerant()));
addTextElement(element, "scheduledState", scheduledStateLookup.getScheduledState(processor).name());
addTextElement(element, "schedulingStrategy", processor.getSchedulingStrategy().name());
addTextElement(element, "executionNode", processor.getExecutionNode().name());
addTextElement(element, "runDurationNanos", processor.getRunDuration(TimeUnit.NANOSECONDS));
addConfiguration(element, processor.getRawPropertyValues(), processor.getAnnotationData(), encryptor);
for (final Relationship rel : processor.getAutoTerminatedRelationships()) {
addTextElement(element, "autoTerminatedRelationship", rel.getName());
}
}
private static void addConfiguration(final Element element, final Map<PropertyDescriptor, String> properties, final String annotationData, final PropertyEncryptor encryptor) {
final Document doc = element.getOwnerDocument();
for (final Map.Entry<PropertyDescriptor, String> entry : properties.entrySet()) {
final PropertyDescriptor descriptor = entry.getKey();
String value = entry.getValue();
if (value == null) {
value = descriptor.getDefaultValue();
}
if (value != null && descriptor.isSensitive()) {
value = ENC_PREFIX + encryptor.encrypt(value) + ENC_SUFFIX;
}
final Element propElement = doc.createElement("property");
addTextElement(propElement, "name", descriptor.getName());
if (value != null) {
addTextElement(propElement, "value", value);
}
element.appendChild(propElement);
}
if (annotationData != null) {
addTextElement(element, "annotationData", annotationData);
}
}
private void addConnection(final Element parentElement, final Connection connection) {
final Document doc = parentElement.getOwnerDocument();
final Element element = doc.createElement("connection");
parentElement.appendChild(element);
addTextElement(element, "id", connection.getIdentifier());
addTextElement(element, "versionedComponentId", connection.getVersionedComponentId());
addTextElement(element, "name", connection.getName());
final Element bendPointsElement = doc.createElement("bendPoints");
element.appendChild(bendPointsElement);
for (final Position bendPoint : connection.getBendPoints()) {
addPosition(bendPointsElement, bendPoint, "bendPoint");
}
addTextElement(element, "labelIndex", connection.getLabelIndex());
addTextElement(element, "zIndex", connection.getZIndex());
final String sourceId = connection.getSource().getIdentifier();
final ConnectableType sourceType = connection.getSource().getConnectableType();
final String sourceGroupId;
if (sourceType == ConnectableType.REMOTE_OUTPUT_PORT) {
sourceGroupId = ((RemoteGroupPort) connection.getSource()).getRemoteProcessGroup().getIdentifier();
} else {
sourceGroupId = connection.getSource().getProcessGroup().getIdentifier();
}
final ConnectableType destinationType = connection.getDestination().getConnectableType();
final String destinationId = connection.getDestination().getIdentifier();
final String destinationGroupId;
if (destinationType == ConnectableType.REMOTE_INPUT_PORT) {
destinationGroupId = ((RemoteGroupPort) connection.getDestination()).getRemoteProcessGroup().getIdentifier();
} else {
destinationGroupId = connection.getDestination().getProcessGroup().getIdentifier();
}
addTextElement(element, "sourceId", sourceId);
addTextElement(element, "sourceGroupId", sourceGroupId);
addTextElement(element, "sourceType", sourceType.toString());
addTextElement(element, "destinationId", destinationId);
addTextElement(element, "destinationGroupId", destinationGroupId);
addTextElement(element, "destinationType", destinationType.toString());
for (final Relationship relationship : connection.getRelationships()) {
addTextElement(element, "relationship", relationship.getName());
}
addTextElement(element, "maxWorkQueueSize", connection.getFlowFileQueue().getBackPressureObjectThreshold());
addTextElement(element, "maxWorkQueueDataSize", connection.getFlowFileQueue().getBackPressureDataSizeThreshold());
addTextElement(element, "flowFileExpiration", connection.getFlowFileQueue().getFlowFileExpiration());
for (final FlowFilePrioritizer comparator : connection.getFlowFileQueue().getPriorities()) {
final String className = comparator.getClass().getCanonicalName();
addTextElement(element, "queuePrioritizerClass", className);
}
addTextElement(element, "loadBalanceStrategy", connection.getFlowFileQueue().getLoadBalanceStrategy().name());
addTextElement(element, "partitioningAttribute", connection.getFlowFileQueue().getPartitioningAttribute());
addTextElement(element, "loadBalanceCompression", connection.getFlowFileQueue().getLoadBalanceCompression().name());
parentElement.appendChild(element);
}
public void addControllerService(final Element element, final ControllerServiceNode serviceNode) {
final Element serviceElement = element.getOwnerDocument().createElement("controllerService");
addTextElement(serviceElement, "id", serviceNode.getIdentifier());
addTextElement(serviceElement, "versionedComponentId", serviceNode.getVersionedComponentId());
addTextElement(serviceElement, "name", serviceNode.getName());
addTextElement(serviceElement, "comment", serviceNode.getComments());
addTextElement(serviceElement, "class", serviceNode.getCanonicalClassName());
addBundle(serviceElement, serviceNode.getBundleCoordinate());
final ControllerServiceState state = serviceNode.getState();
final boolean enabled = (state == ControllerServiceState.ENABLED || state == ControllerServiceState.ENABLING);
addTextElement(serviceElement, "enabled", String.valueOf(enabled));
addConfiguration(serviceElement, serviceNode.getRawPropertyValues(), serviceNode.getAnnotationData(), encryptor);
element.appendChild(serviceElement);
}
public static void addReportingTask(final Element element, final ReportingTaskNode taskNode, final PropertyEncryptor encryptor) {
final Element taskElement = element.getOwnerDocument().createElement("reportingTask");
addTextElement(taskElement, "id", taskNode.getIdentifier());
addTextElement(taskElement, "name", taskNode.getName());
addTextElement(taskElement, "comment", taskNode.getComments());
addTextElement(taskElement, "class", taskNode.getCanonicalClassName());
addBundle(taskElement, taskNode.getBundleCoordinate());
addTextElement(taskElement, "schedulingPeriod", taskNode.getSchedulingPeriod());
addTextElement(taskElement, "scheduledState", taskNode.getScheduledState().name());
addTextElement(taskElement, "schedulingStrategy", taskNode.getSchedulingStrategy().name());
addConfiguration(taskElement, taskNode.getRawPropertyValues(), taskNode.getAnnotationData(), encryptor);
element.appendChild(taskElement);
}
private static void addTextElement(final Element element, final String name, final long value) {
addTextElement(element, name, String.valueOf(value));
}
private static void addTextElement(final Element element, final String name, final String value) {
final Document doc = element.getOwnerDocument();
final Element toAdd = doc.createElement(name);
toAdd.setTextContent(CharacterFilterUtils.filterInvalidXmlCharacters(value)); // value should already be filtered, but just in case ensure there are no invalid xml characters
element.appendChild(toAdd);
}
private static void addTextElement(final Element element, final String name, final Optional<String> value) {
if (!value.isPresent()) {
return;
}
final Document doc = element.getOwnerDocument();
final Element toAdd = doc.createElement(name);
toAdd.setTextContent(CharacterFilterUtils.filterInvalidXmlCharacters(value.get())); // value should already be filtered, but just in case ensure there are no invalid xml characters
element.appendChild(toAdd);
}
public static void addTemplate(final Element element, final Template template) {
try {
final byte[] serialized = TemplateSerializer.serialize(template.getDetails());
final DocumentBuilder docBuilder = XmlUtils.createSafeDocumentBuilder(true);
final Document document;
try (final InputStream in = new ByteArrayInputStream(serialized)) {
document = docBuilder.parse(in);
}
final Node templateNode = element.getOwnerDocument().importNode(document.getDocumentElement(), true);
element.appendChild(templateNode);
} catch (final Exception e) {
throw new FlowSerializationException(e);
}
}
}