blob: defb168918a15b815860e67418c5a93d9ceb6e9f [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.stateless;
import org.apache.nifi.flow.Bundle;
import org.apache.nifi.flow.ComponentType;
import org.apache.nifi.flow.ConnectableComponent;
import org.apache.nifi.flow.ConnectableComponentType;
import org.apache.nifi.flow.ControllerServiceAPI;
import org.apache.nifi.flow.PortType;
import org.apache.nifi.flow.Position;
import org.apache.nifi.flow.ScheduledState;
import org.apache.nifi.flow.VersionedComponent;
import org.apache.nifi.flow.VersionedConnection;
import org.apache.nifi.flow.VersionedControllerService;
import org.apache.nifi.registry.flow.VersionedFlowSnapshot;
import org.apache.nifi.registry.flow.VersionedParameterContext;
import org.apache.nifi.flow.VersionedPort;
import org.apache.nifi.flow.VersionedProcessGroup;
import org.apache.nifi.flow.VersionedProcessor;
import org.apache.nifi.flow.VersionedPropertyDescriptor;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
public class VersionedFlowBuilder {
private VersionedFlowSnapshot flowSnapshot;
private VersionedProcessGroup rootGroup;
public VersionedFlowBuilder() {
rootGroup = new VersionedProcessGroup();
rootGroup.setIdentifier("root");
rootGroup.setName("root");
flowSnapshot = new VersionedFlowSnapshot();
flowSnapshot.setParameterContexts(new HashMap<>());
flowSnapshot.setFlowContents(rootGroup);
}
public VersionedProcessGroup createProcessGroup(final String name) {
return createProcessGroup(name, rootGroup);
}
public VersionedProcessGroup createProcessGroup(final String name, final VersionedProcessGroup group) {
final VersionedProcessGroup childGroup = new VersionedProcessGroup();
childGroup.setIdentifier(UUID.randomUUID().toString());
childGroup.setName(name);
childGroup.setGroupIdentifier(group.getIdentifier());
group.getProcessGroups().add(childGroup);
return childGroup;
}
public VersionedProcessor createProcessor(final Bundle bundle, final String type) {
return createProcessor(bundle, type, rootGroup);
}
public VersionedPort createOutputPort(final String portName) {
return createOutputPort(portName, rootGroup);
}
public VersionedPort createOutputPort(final String portName, final VersionedProcessGroup group) {
final VersionedPort port = new VersionedPort();
port.setAllowRemoteAccess(false);
port.setComponentType(ComponentType.OUTPUT_PORT);
port.setConcurrentlySchedulableTaskCount(1);
port.setGroupIdentifier(group.getIdentifier());
port.setIdentifier(UUID.randomUUID().toString());
port.setName(portName);
port.setPosition(new Position(0, 0));
port.setScheduledState(ScheduledState.ENABLED);
port.setType(PortType.OUTPUT_PORT);
group.getOutputPorts().add(port);
return port;
}
public VersionedPort createInputPort(final String portName) {
return createInputPort(portName, rootGroup);
}
public VersionedPort createInputPort(final String portName, final VersionedProcessGroup group) {
final VersionedPort port = new VersionedPort();
port.setAllowRemoteAccess(false);
port.setComponentType(ComponentType.INPUT_PORT);
port.setConcurrentlySchedulableTaskCount(1);
port.setGroupIdentifier(group.getIdentifier());
port.setIdentifier(UUID.randomUUID().toString());
port.setName(portName);
port.setPosition(new Position(0, 0));
port.setScheduledState(ScheduledState.ENABLED);
port.setType(PortType.INPUT_PORT);
group.getInputPorts().add(port);
return port;
}
public VersionedProcessor createSimpleProcessor(final String simpleType) {
return createSimpleProcessor(simpleType, rootGroup);
}
public VersionedProcessor createSimpleProcessor(final String simpleType, final VersionedProcessGroup group) {
return createProcessor(StatelessSystemIT.SYSTEM_TEST_EXTENSIONS_BUNDLE, "org.apache.nifi.processors.tests.system." + simpleType, group);
}
public VersionedProcessor createProcessor(final Bundle bundle, final String type, final VersionedProcessGroup group) {
final VersionedProcessor processor = new VersionedProcessor();
processor.setBundle(bundle);
processor.setIdentifier(UUID.randomUUID().toString());
processor.setName(substringAfterLast(type, "."));
processor.setAutoTerminatedRelationships(Collections.emptySet());
processor.setConcurrentlySchedulableTaskCount(1);
processor.setBulletinLevel("WARN");
processor.setComponentType(ComponentType.PROCESSOR);
processor.setExecutionNode("ALL");
processor.setGroupIdentifier(group.getIdentifier());
processor.setPenaltyDuration("30 sec");
processor.setPosition(new Position(0, 0));
processor.setProperties(Collections.emptyMap());
processor.setPropertyDescriptors(Collections.emptyMap());
processor.setRunDurationMillis(0L);
processor.setScheduledState(ScheduledState.ENABLED);
processor.setSchedulingPeriod("0 sec");
processor.setStyle(Collections.emptyMap());
processor.setType(type);
processor.setYieldDuration("1 sec");
processor.setSchedulingStrategy("TIMER_DRIVEN");
group.getProcessors().add(processor);
return processor;
}
public VersionedConnection createConnection(final VersionedComponent source, final VersionedComponent destination, final String relationship) {
return createConnection(source, destination, Collections.singleton(relationship));
}
public VersionedConnection createConnection(final VersionedComponent source, final VersionedComponent destination, final Set<String> relationships) {
return createConnection(source, destination, relationships, rootGroup);
}
public VersionedConnection createConnection(final VersionedComponent source, final VersionedComponent destination, final String relationship, final VersionedProcessGroup group) {
return createConnection(source, destination, Collections.singleton(relationship), group);
}
public VersionedConnection createConnection(final VersionedComponent source, final VersionedComponent destination, final Set<String> relationships, final VersionedProcessGroup group) {
final VersionedConnection connection = new VersionedConnection();
connection.setBackPressureDataSizeThreshold("0 GB");
connection.setBackPressureObjectThreshold(0L);
connection.setBends(Collections.emptyList());
connection.setComponentType(ComponentType.CONNECTION);
connection.setDestination(createConnectableComponent(destination));
connection.setFlowFileExpiration("0 sec");
connection.setIdentifier(UUID.randomUUID().toString());
connection.setGroupIdentifier(group.getIdentifier());
connection.setLabelIndex(1);
connection.setPosition(new Position(0, 0));
connection.setPrioritizers(Collections.emptyList());
connection.setSelectedRelationships(relationships);
connection.setSource(createConnectableComponent(source));
connection.setzIndex(1L);
group.getConnections().add(connection);
return connection;
}
private ConnectableComponent createConnectableComponent(final VersionedComponent component) {
final ConnectableComponent connectable = new ConnectableComponent();
connectable.setGroupId(component.getGroupIdentifier());
connectable.setId(component.getIdentifier());
connectable.setName(component.getName());
connectable.setType(ConnectableComponentType.valueOf(component.getComponentType().name()));
return connectable;
}
public VersionedParameterContext createParameterContext(final String name) {
if (flowSnapshot.getParameterContexts().containsKey(name)) {
throw new IllegalStateException("Parameter Context already exists with name " + name);
}
final VersionedParameterContext parameterContext = new VersionedParameterContext();
parameterContext.setName(name);
parameterContext.setParameters(new HashSet<>());
flowSnapshot.getParameterContexts().put(name, parameterContext);
return parameterContext;
}
public VersionedControllerService createSimpleControllerService(final String simpleType, final String apiSimpleType) {
return createSimpleControllerService(simpleType, apiSimpleType, rootGroup);
}
public VersionedControllerService createSimpleControllerService(final String simpleType, final String apiSimpleType, final VersionedProcessGroup group) {
return createControllerService(StatelessSystemIT.SYSTEM_TEST_EXTENSIONS_BUNDLE, "org.apache.nifi.cs.tests.system." + simpleType,
StatelessSystemIT.SYSTEM_TEST_EXTENSIONS_BUNDLE, "org.apache.nifi.cs.tests.system." + apiSimpleType, group);
}
public VersionedControllerService createControllerService(final Bundle bundle, final String type, final Bundle apiBundle, final String apiType, final VersionedProcessGroup group) {
final VersionedControllerService service = new VersionedControllerService();
service.setBundle(bundle);
service.setComponentType(ComponentType.CONTROLLER_SERVICE);
service.setControllerServiceApis(Collections.singletonList(createControllerServiceAPI(apiBundle, apiType)));
service.setGroupIdentifier(group.getIdentifier());
service.setIdentifier(UUID.randomUUID().toString());
service.setName(type);
service.setPosition(new Position(0, 0));
service.setProperties(new HashMap<>());
service.setPropertyDescriptors(new HashMap<>());
service.setType(type);
group.getControllerServices().add(service);
return service;
}
private ControllerServiceAPI createControllerServiceAPI(final Bundle bundle, final String type) {
final ControllerServiceAPI serviceApi = new ControllerServiceAPI();
serviceApi.setBundle(bundle);
serviceApi.setType(type);
return serviceApi;
}
public void addControllerServiceReference(final VersionedControllerService referencingComponent, final VersionedControllerService referenced, final String propertyName) {
final Map<String, String> existingProperties = referencingComponent.getProperties();
final Map<String, String> updatedProperties = new HashMap<>(existingProperties);
updatedProperties.put(propertyName, referenced.getIdentifier());
referencingComponent.setProperties(updatedProperties);
final Map<String, VersionedPropertyDescriptor> existingDescriptors = referencingComponent.getPropertyDescriptors();
final Map<String, VersionedPropertyDescriptor> updatedDescriptors = new HashMap<>(existingDescriptors);
final VersionedPropertyDescriptor versionedPropertyDescriptor = new VersionedPropertyDescriptor();
versionedPropertyDescriptor.setDisplayName(propertyName);
versionedPropertyDescriptor.setName(propertyName);
versionedPropertyDescriptor.setIdentifiesControllerService(true);
updatedDescriptors.put(propertyName, versionedPropertyDescriptor);
referencingComponent.setPropertyDescriptors(updatedDescriptors);
}
public void addControllerServiceReference(final VersionedProcessor referencingComponent, final VersionedControllerService referenced, final String propertyName) {
final Map<String, String> existingProperties = referencingComponent.getProperties();
final Map<String, String> updatedProperties = new HashMap<>(existingProperties);
updatedProperties.put(propertyName, referenced.getIdentifier());
referencingComponent.setProperties(updatedProperties);
final Map<String, VersionedPropertyDescriptor> existingDescriptors = referencingComponent.getPropertyDescriptors();
final Map<String, VersionedPropertyDescriptor> updatedDescriptors = new HashMap<>(existingDescriptors);
final VersionedPropertyDescriptor versionedPropertyDescriptor = new VersionedPropertyDescriptor();
versionedPropertyDescriptor.setDisplayName(propertyName);
versionedPropertyDescriptor.setName(propertyName);
versionedPropertyDescriptor.setIdentifiesControllerService(true);
updatedDescriptors.put(propertyName, versionedPropertyDescriptor);
referencingComponent.setPropertyDescriptors(updatedDescriptors);
}
public VersionedProcessGroup getRootGroup() {
return rootGroup;
}
public VersionedFlowSnapshot getFlowSnapshot() {
return flowSnapshot;
}
private static String substringAfterLast(final String input, final String delimiter) {
final int loc = input.lastIndexOf(delimiter);
if (loc < 0) {
return input;
}
return input.substring(loc + delimiter.length());
}
}