blob: ecc04a5f3174f305bf843860bb2eae52d61592a3 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.taverna.workflowmodel.impl;
import static java.lang.Integer.parseInt;
import static org.apache.taverna.workflowmodel.utils.Tools.getUniqueMergeInputPortName;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.taverna.annotation.Annotated;
import org.apache.taverna.annotation.AnnotationAssertion;
import org.apache.taverna.annotation.AnnotationBeanSPI;
import org.apache.taverna.annotation.AnnotationChain;
import org.apache.taverna.annotation.AnnotationRole;
import org.apache.taverna.annotation.AnnotationSourceSPI;
import org.apache.taverna.annotation.CurationEvent;
import org.apache.taverna.annotation.Person;
import org.apache.taverna.annotation.impl.AnnotationAssertionImpl;
import org.apache.taverna.annotation.impl.AnnotationChainImpl;
import org.apache.taverna.facade.WorkflowInstanceFacade;
import org.apache.taverna.facade.impl.WorkflowInstanceFacadeImpl;
import org.apache.taverna.invocation.InvocationContext;
import org.apache.taverna.reference.ExternalReferenceSPI;
import org.apache.taverna.workflowmodel.CompoundEdit;
import org.apache.taverna.workflowmodel.Condition;
import org.apache.taverna.workflowmodel.Configurable;
import org.apache.taverna.workflowmodel.Dataflow;
import org.apache.taverna.workflowmodel.DataflowInputPort;
import org.apache.taverna.workflowmodel.DataflowOutputPort;
import org.apache.taverna.workflowmodel.Datalink;
import org.apache.taverna.workflowmodel.Edit;
import org.apache.taverna.workflowmodel.EditException;
import org.apache.taverna.workflowmodel.Edits;
import org.apache.taverna.workflowmodel.EventForwardingOutputPort;
import org.apache.taverna.workflowmodel.EventHandlingInputPort;
import org.apache.taverna.workflowmodel.InvalidDataflowException;
import org.apache.taverna.workflowmodel.Merge;
import org.apache.taverna.workflowmodel.MergeInputPort;
import org.apache.taverna.workflowmodel.OrderedPair;
import org.apache.taverna.workflowmodel.OutputPort;
import org.apache.taverna.workflowmodel.Processor;
import org.apache.taverna.workflowmodel.ProcessorInputPort;
import org.apache.taverna.workflowmodel.ProcessorOutputPort;
import org.apache.taverna.workflowmodel.processor.activity.AbstractActivity;
import org.apache.taverna.workflowmodel.processor.activity.Activity;
import org.apache.taverna.workflowmodel.processor.activity.ActivityInputPort;
import org.apache.taverna.workflowmodel.processor.activity.ActivityOutputPort;
import org.apache.taverna.workflowmodel.processor.activity.impl.ActivityInputPortImpl;
import org.apache.taverna.workflowmodel.processor.activity.impl.ActivityOutputPortImpl;
import org.apache.taverna.workflowmodel.processor.dispatch.DispatchLayer;
import org.apache.taverna.workflowmodel.processor.dispatch.DispatchStack;
import org.apache.taverna.workflowmodel.processor.dispatch.impl.AbstractDispatchLayerEdit;
import org.apache.taverna.workflowmodel.processor.dispatch.impl.DispatchStackImpl;
import org.apache.taverna.workflowmodel.processor.dispatch.layers.ErrorBounce;
import org.apache.taverna.workflowmodel.processor.dispatch.layers.Failover;
import org.apache.taverna.workflowmodel.processor.dispatch.layers.Invoke;
import org.apache.taverna.workflowmodel.processor.dispatch.layers.Parallelize;
import org.apache.taverna.workflowmodel.processor.dispatch.layers.Retry;
import org.apache.taverna.workflowmodel.processor.dispatch.layers.Stop;
import org.apache.taverna.workflowmodel.processor.iteration.IterationStrategy;
import org.apache.taverna.workflowmodel.processor.iteration.IterationStrategyStack;
import org.apache.taverna.workflowmodel.processor.iteration.NamedInputPortNode;
import org.apache.taverna.workflowmodel.processor.iteration.impl.IterationStrategyImpl;
import org.apache.taverna.workflowmodel.processor.iteration.impl.IterationStrategyStackImpl;
/**
* Implementation of {@link Edits}
* @author Donal Fellows (et al)
*/
public class EditsImpl implements Edits {
// ----------------------------------------------------------------
// Basic factory methods
@Override
public Dataflow createDataflow() {
return new DataflowImpl();
}
@Override
public Datalink createDatalink(EventForwardingOutputPort source,
EventHandlingInputPort sink) {
return new DatalinkImpl(source, sink);
}
@Override
public DataflowInputPort createDataflowInputPort(String name, int depth,
int granularDepth, Dataflow dataflow) {
return new DataflowInputPortImpl(name, depth, granularDepth, dataflow);
}
@Override
public DataflowOutputPort createDataflowOutputPort(String name,
Dataflow dataflow) {
return new DataflowOutputPortImpl(name, dataflow);
}
@Override
public MergeInputPort createMergeInputPort(Merge merge, String name,
int depth) {
if (merge instanceof MergeImpl)
return new MergeInputPortImpl((MergeImpl) merge, name, depth);
return null;
}
@Override
public ProcessorOutputPort createProcessorOutputPort(Processor processor,
String name, int depth, int granularDepth) {
return new ProcessorOutputPortImpl((ProcessorImpl) processor, name,
depth, granularDepth);
}
@Override
public ProcessorInputPort createProcessorInputPort(Processor processor,
String name, int depth) {
return new ProcessorInputPortImpl((ProcessorImpl) processor, name,
depth);
}
@Override
public AnnotationChain createAnnotationChain() {
return new AnnotationChainImpl();
}
/**
* Creates a MergeImpl instance. Merge names are generated as 'Merge0',
* 'Merge1', 'Merge2', etc. The next merge to be added always gets the name
* as the previous merge in the list with its index incremented by one. If a
* merge is deleted, that is not taken into account when generating merges'
* names.
*/
@Override
public Merge createMerge(Dataflow dataflow) {
String mergeName;
// Work out what the name of the merge should be.
List<? extends Merge> merges = dataflow.getMerges();
if (merges.isEmpty())
mergeName = "Merge0"; // the first merge to be added to the list
else
mergeName = "Merge"
+ String.valueOf(parseInt(merges.get(merges.size() - 1)
.getLocalName().substring(5)) + 1);
return new MergeImpl(mergeName);
}
@Override
public Processor createProcessor(String name) {
ProcessorImpl processor = new ProcessorImpl();
processor.setName(name);
return processor;
}
@Override
public IterationStrategy createIterationStrategy() {
return new IterationStrategyImpl();
}
/**
* Builds an instance of {@link ActivityInputPortImpl}
*/
@Override
public ActivityInputPort createActivityInputPort(
String portName,
int portDepth,
boolean allowsLiteralValues,
List<Class<? extends ExternalReferenceSPI>> handledReferenceSchemes,
Class<?> translatedElementClass) {
return new ActivityInputPortImpl(portName, portDepth,
allowsLiteralValues, handledReferenceSchemes,
translatedElementClass);
}
/**
* Builds an instance of {@link ActivityOutputPortImpl}
*/
@Override
public ActivityOutputPort createActivityOutputPort(String portName, int portDepth,
int portGranularDepth) {
return new ActivityOutputPortImpl(portName, portDepth,
portGranularDepth);
}
@Override
public WorkflowInstanceFacade createWorkflowInstanceFacade(
Dataflow dataflow, InvocationContext context, String parentProcess)
throws InvalidDataflowException {
return new WorkflowInstanceFacadeImpl(dataflow, context, parentProcess);
}
// ----------------------------------------------------------------
// Edits (structured transformation) factory methods
@Override
public Edit<Dataflow> getAddProcessorEdit(Dataflow dataflow,
Processor processor) {
if (!(processor instanceof ProcessorImpl))
throw new RuntimeException(
"The Processor is of the wrong implmentation,"
+ " it should be of type ProcessorImpl");
final ProcessorImpl p = (ProcessorImpl) processor;
return new AbstractDataflowEdit(dataflow) {
@Override
protected void doEditAction(DataflowImpl dataflow)
throws EditException {
dataflow.addProcessor(p);
}
};
}
@Override
public Edit<Dataflow> getAddMergeEdit(Dataflow dataflow, Merge merge) {
if (!(merge instanceof MergeImpl))
throw new RuntimeException(
"The Merge is of the wrong implmentation, "
+ "it should be of type MergeImpl");
final MergeImpl m = (MergeImpl) merge;
return new AbstractDataflowEdit(dataflow) {
@Override
protected void doEditAction(DataflowImpl dataflow)
throws EditException {
dataflow.addMerge(m);
}
};
}
@Override
public Edit<DispatchStack> getAddDispatchLayerEdit(DispatchStack stack,
final DispatchLayer<?> layer, final int position) {
return new AbstractDispatchLayerEdit(stack) {
@Override
protected void doEditAction(DispatchStackImpl stack) throws EditException {
stack.addLayer(layer, position);
}
@Override
protected void undoEditAction(DispatchStackImpl stack) {
stack.removeLayer(layer);
}
};
}
@Override
public Edit<Processor> getAddActivityEdit(Processor processor,
final Activity<?> activity) {
return new AbstractProcessorEdit(processor) {
@Override
protected void doEditAction(ProcessorImpl processor)
throws EditException {
List<Activity<?>> activities = processor.activityList;
if (activities.contains(activity))
throw new EditException(
"Cannot add a duplicate activity to processor");
activities.add(activity);
}
};
}
@Override
public Edit<Processor> getAddProcessorInputPortEdit(Processor processor,
final ProcessorInputPort port) {
return new AbstractProcessorEdit(processor) {
@Override
protected void doEditAction(ProcessorImpl processor)
throws EditException {
/*
* Add a new InputPort object to the processor and also create
* an appropriate NamedInputPortNode in any iteration
* strategies. By default set the desired drill depth on each
* iteration strategy node to the same as the input port, so
* this won't automatically trigger iteration staging unless the
* depth is altered on the iteration strategy itself.)
*/
if (processor.getInputPortWithName(port.getName()) != null)
throw new EditException(
"Attempt to create duplicate input port with name '"
+ port.getName() + "'");
processor.inputPorts.add((ProcessorInputPortImpl) port);
for (IterationStrategyImpl is : processor.iterationStack
.getStrategies()) {
NamedInputPortNode nipn = new NamedInputPortNode(
port.getName(), port.getDepth());
is.addInput(nipn);
is.connectDefault(nipn);
}
}
};
}
@Override
public Edit<Processor> getAddProcessorOutputPortEdit(Processor processor,
final ProcessorOutputPort port) {
return new AbstractProcessorEdit(processor) {
@Override
protected void doEditAction(ProcessorImpl processor)
throws EditException {
if (processor.getOutputPortWithName(port.getName()) != null)
throw new EditException("Duplicate output port name");
processor.outputPorts.add((ProcessorOutputPortImpl) port);
}
};
}
@Override
public Edit<Dataflow> getCreateDataflowInputPortEdit(Dataflow dataflow,
final String portName, final int portDepth, final int granularDepth) {
return new AbstractDataflowEdit(dataflow) {
@Override
protected void doEditAction(DataflowImpl dataflow)
throws EditException {
dataflow.createInputPort(portName, portDepth, granularDepth);
}
};
}
@Override
public Edit<Dataflow> getCreateDataflowOutputPortEdit(Dataflow dataflow,
final String portName) {
return new AbstractDataflowEdit(dataflow) {
@Override
protected void doEditAction(DataflowImpl dataflow)
throws EditException {
dataflow.createOutputPort(portName);
}
};
}
@Override
public Edit<DispatchStack> getDeleteDispatchLayerEdit(DispatchStack stack,
final DispatchLayer<?> layer) {
return new AbstractDispatchLayerEdit(stack) {
private int index;
@Override
protected void doEditAction(DispatchStackImpl stack) {
index = stack.removeLayer(layer);
}
@Override
protected void undoEditAction(DispatchStackImpl stack) {
stack.addLayer(layer, index);
}
};
}
@Override
public Edit<Merge> getRenameMergeEdit(Merge merge, final String newName) {
return new AbstractMergeEdit(merge) {
@Override
protected void doEditAction(MergeImpl merge) {
merge.setName(newName);
}
};
}
@Override
public Edit<Processor> getRenameProcessorEdit(Processor processor,
final String newName) {
return new AbstractProcessorEdit(processor) {
@Override
protected void doEditAction(ProcessorImpl processor) {
processor.setName(newName);
}
};
}
@Override
public Edit<DataflowInputPort> getRenameDataflowInputPortEdit(
DataflowInputPort dataflowInputPort, final String newName) {
return new AbstractDataflowInputPortEdit(dataflowInputPort) {
@Override
protected void doEditAction(DataflowInputPortImpl inputPort) {
inputPort.setName(newName);
}
};
}
@Override
public Edit<DataflowOutputPort> getRenameDataflowOutputPortEdit(
DataflowOutputPort dataflowOutputPort, final String newName) {
return new AbstractDataflowOutputPortEdit(dataflowOutputPort) {
@Override
protected void doEditAction(DataflowOutputPortImpl outputPort) {
outputPort.setName(newName);
}
};
}
@Override
public Edit<DataflowInputPort> getChangeDataflowInputPortDepthEdit(
DataflowInputPort dataflowInputPort, final int depth) {
return new AbstractDataflowInputPortEdit(dataflowInputPort) {
@Override
protected void doEditAction(DataflowInputPortImpl port) {
port.setDepth(depth);
}
};
}
@Override
public Edit<DataflowInputPort> getChangeDataflowInputPortGranularDepthEdit(
DataflowInputPort dataflowInputPort, final int granularDepth) {
return new AbstractDataflowInputPortEdit(dataflowInputPort) {
@Override
protected void doEditAction(DataflowInputPortImpl port) {
port.setGranularDepth(granularDepth);
}
};
}
@Override
public Edit<Processor> getConnectProcessorOutputEdit(Processor processor,
final String outputPortName, final EventHandlingInputPort targetPort) {
return new AbstractProcessorEdit(processor) {
@Override
protected void doEditAction(ProcessorImpl processor) throws EditException {
for (BasicEventForwardingOutputPort popi : processor.outputPorts)
if (popi.getName().equals(outputPortName)) {
addOutgoingLink(popi);
return;
}
throw new EditException("Cannot locate output port with name '"
+ outputPortName + "'");
}
private void addOutgoingLink(BasicEventForwardingOutputPort popi) {
DatalinkImpl newLink = new DatalinkImpl(popi, targetPort);
popi.addOutgoingLink(newLink);
if (targetPort instanceof AbstractEventHandlingInputPort)
((AbstractEventHandlingInputPort) targetPort)
.setIncomingLink(newLink);
}
};
}
@Override
public Edit<Datalink> getConnectDatalinkEdit(Datalink datalink) {
return new AbstractDatalinkEdit(datalink) {
@Override
protected void doEditAction(DatalinkImpl datalink) throws EditException {
EventForwardingOutputPort source = datalink.getSource();
EventHandlingInputPort sink = datalink.getSink();
if (source instanceof BasicEventForwardingOutputPort)
((BasicEventForwardingOutputPort) source).addOutgoingLink(datalink);
if (sink instanceof AbstractEventHandlingInputPort)
((AbstractEventHandlingInputPort) sink).setIncomingLink(datalink);
}
};
}
@Override
public Edit<AnnotationChain> getAddAnnotationAssertionEdit(
AnnotationChain annotationChain,
final AnnotationAssertion<?> annotationAssertion) {
if (!(annotationChain instanceof AnnotationChainImpl))
throw new RuntimeException(
"Object being edited must be instance of AnnotationChainImpl");
final AnnotationChainImpl chain = (AnnotationChainImpl) annotationChain;
return new EditSupport<AnnotationChain>() {
@Override
public AnnotationChain applyEdit() {
synchronized (chain) {
chain.addAnnotationAssertion(annotationAssertion);
}
return chain;
}
@Override
public Object getSubject() {
return chain;
}
};
}
/**
* @return a new instance of ConnectMergedDatalinkEdit constructed from the
* provided parameters.
*
* @param merge
* a Merge instance
* @param sourcePort
* the source port from which a link is to be created.
* @param sinkPort
* the sink port to which the link is to be created.
*/
@Override
public Edit<Merge> getConnectMergedDatalinkEdit(Merge merge,
final EventForwardingOutputPort sourcePort,
final EventHandlingInputPort sinkPort) {
if (sourcePort == null)
throw new RuntimeException("The sourceport cannot be null");
if (sinkPort == null)
throw new RuntimeException("The sinkport cannot be null");
return new AbstractMergeEdit(merge) {
private boolean needToCreateDatalink(MergeImpl mergeImpl)
throws EditException {
Collection<? extends Datalink> outgoing = mergeImpl.getOutputPort()
.getOutgoingLinks();
if (outgoing.size() == 0) {
return true;
} else if (outgoing.size() != 1)
throw new EditException(
"The merge instance cannot have more that 1 outgoing Datalink");
if (outgoing.iterator().next().getSink() != sinkPort)
throw new EditException(
"Cannot add a different sinkPort to a Merge that already has one defined");
return false;
}
@Override
protected void doEditAction(MergeImpl merge) throws EditException {
boolean linkOut = needToCreateDatalink(merge);
String name = getUniqueMergeInputPortName(merge,
sourcePort.getName() + "To" + merge.getLocalName()
+ "_input", 0);
MergeInputPortImpl mergeInputPort = new MergeInputPortImpl(
merge, name, sinkPort.getDepth());
merge.addInputPort(mergeInputPort);
getConnectDatalinkEdit(
createDatalink(sourcePort, mergeInputPort)).doEdit();
if (linkOut)
getConnectDatalinkEdit(
createDatalink(merge.getOutputPort(), sinkPort))
.doEdit();
}
};
}
@Override
public Edit<OrderedPair<Processor>> getCreateConditionEdit(
Processor control, Processor target) {
return new AbstractBinaryProcessorEdit(control, target) {
@Override
protected void doEditAction(ProcessorImpl control,
ProcessorImpl target) throws EditException {
ConditionImpl condition = new ConditionImpl(control, target);
// Check for duplicates
for (Condition c : control.controlledConditions)
if (c.getTarget() == target)
throw new EditException(
"Attempt to create duplicate control link");
control.controlledConditions.add(condition);
target.conditions.add(condition);
}
};
}
@Override
public Edit<OrderedPair<Processor>> getRemoveConditionEdit(
Processor control, Processor target) {
return new AbstractBinaryProcessorEdit(control, target) {
@Override
protected void doEditAction(ProcessorImpl control, ProcessorImpl target)
throws EditException {
for (ConditionImpl c : control.controlledConditions)
if (c.getTarget() == target) {
control.controlledConditions.remove(c);
target.conditions.remove(c);
return;
}
throw new EditException(
"Can't remove a control link as it doesn't exist");
}
};
}
@Override
@SuppressWarnings({ "rawtypes", "unchecked" })
public Edit<AnnotationAssertion<AnnotationBeanSPI>> getAddAnnotationBean(
AnnotationAssertion annotationAssertion,
final AnnotationBeanSPI bean) {
return new AbstractAnnotationEdit(annotationAssertion) {
@Override
protected void doEditAction(AnnotationAssertionImpl assertion)
throws EditException {
assertion.setAnnotationBean(bean);
}
};
}
@Override
@SuppressWarnings({ "rawtypes", "unchecked" })
public Edit<AnnotationAssertion<AnnotationBeanSPI>> getAddCurationEvent(
AnnotationAssertion annotationAssertion,
final CurationEvent curationEvent) {
return new AbstractAnnotationEdit(annotationAssertion) {
@Override
protected void doEditAction(AnnotationAssertionImpl assertion)
throws EditException {
assertion.addCurationEvent(curationEvent);
}
};
}
@Override
@SuppressWarnings({ "rawtypes", "unchecked" })
public Edit<AnnotationAssertion<AnnotationBeanSPI>> getAddAnnotationRole(
AnnotationAssertion annotationAssertion,
final AnnotationRole role) {
return new AbstractAnnotationEdit(annotationAssertion) {
@Override
protected void doEditAction(AnnotationAssertionImpl assertion)
throws EditException {
assertion.setAnnotationRole(role);
}
};
}
@Override
@SuppressWarnings({ "rawtypes", "unchecked" })
public Edit<AnnotationAssertion<AnnotationBeanSPI>> getAddAnnotationSource(
AnnotationAssertion annotationAssertion,
final AnnotationSourceSPI source) {
return new AbstractAnnotationEdit(annotationAssertion) {
@Override
protected void doEditAction(AnnotationAssertionImpl assertion)
throws EditException {
assertion.setAnnotationSource(source);
}
};
}
@Override
@SuppressWarnings({ "rawtypes", "unchecked" })
public Edit<AnnotationAssertion<AnnotationBeanSPI>> getAddCreator(
AnnotationAssertion annotationAssertion, final Person person) {
return new AbstractAnnotationEdit(annotationAssertion) {
@Override
protected void doEditAction(AnnotationAssertionImpl assertion)
throws EditException {
assertion.addCreator(person);
}
};
}
@Override
public Edit<?> getAddAnnotationChainEdit(Annotated<?> annotated,
AnnotationBeanSPI annotation) {
List<Edit<?>> editList = new ArrayList<>();
AnnotationAssertion<?> assertion = new AnnotationAssertionImpl();
AnnotationChain chain = new AnnotationChainImpl();
editList.add(getAddAnnotationBean(assertion, annotation));
editList.add(getAddAnnotationAssertionEdit(chain, assertion));
editList.add(annotated.getAddAnnotationEdit(chain));
return new CompoundEdit(editList);
}
@Override
public Edit<Dataflow> getUpdateDataflowNameEdit(Dataflow dataflow,
final String newName) {
return new AbstractDataflowEdit(dataflow) {
@Override
protected void doEditAction(DataflowImpl dataflow) {
dataflow.setLocalName(newName);
}
};
}
@Override
public Edit<Dataflow> getUpdateDataflowInternalIdentifierEdit(
Dataflow dataflow, final String newId) {
return new AbstractDataflowEdit(dataflow) {
@Override
protected void doEditAction(DataflowImpl dataflow) {
dataflow.setIdentifier(newId);
}
};
}
@Override
public Edit<Datalink> getDisconnectDatalinkEdit(Datalink datalink) {
return new AbstractDatalinkEdit(datalink) {
@Override
protected void doEditAction(DatalinkImpl datalink) throws EditException {
EventForwardingOutputPort source = datalink.getSource();
EventHandlingInputPort sink = datalink.getSink();
if (source instanceof BasicEventForwardingOutputPort)
((BasicEventForwardingOutputPort) source)
.removeOutgoingLink(datalink);
if (sink instanceof AbstractEventHandlingInputPort)
((AbstractEventHandlingInputPort) sink).setIncomingLink(null);
if (sink instanceof MergeInputPortImpl) {
MergeInputPortImpl mip = (MergeInputPortImpl) sink;
((MergeImpl) mip.getMerge()).removeInputPort(mip);
}
}
};
}
@Override
public Edit<Dataflow> getRemoveDataflowInputPortEdit(Dataflow dataflow,
final DataflowInputPort dataflowInputPort) {
return new AbstractDataflowEdit(dataflow) {
@Override
protected void doEditAction(DataflowImpl dataflow) throws EditException {
dataflow.removeDataflowInputPort(dataflowInputPort);
}
};
}
@Override
public Edit<Dataflow> getRemoveDataflowOutputPortEdit(Dataflow dataflow,
final DataflowOutputPort dataflowOutputPort) {
return new AbstractDataflowEdit(dataflow) {
@Override
protected void doEditAction(DataflowImpl dataflow) throws EditException {
dataflow.removeDataflowOutputPort(dataflowOutputPort);
}
};
}
@Override
public Edit<Dataflow> getRemoveProcessorEdit(Dataflow dataflow,
final Processor processor) {
return new AbstractDataflowEdit(dataflow) {
@Override
protected void doEditAction(DataflowImpl dataflow) {
dataflow.removeProcessor(processor);
}
};
}
@Override
public Edit<Dataflow> getRemoveMergeEdit(Dataflow dataflow, final Merge merge) {
return new AbstractDataflowEdit(dataflow) {
@Override
protected void doEditAction(DataflowImpl dataflow) {
dataflow.removeMerge(merge);
}
};
}
@Override
public Edit<Dataflow> getAddDataflowInputPortEdit(Dataflow dataflow,
DataflowInputPort dataflowInputPort) {
if (!(dataflowInputPort instanceof DataflowInputPortImpl))
throw new RuntimeException(
"The DataflowInputPort is of the wrong implmentation, "
+ "it should be of type DataflowInputPortImpl");
final DataflowInputPortImpl port = (DataflowInputPortImpl) dataflowInputPort;
return new AbstractDataflowEdit(dataflow) {
@Override
protected void doEditAction(DataflowImpl dataflow)
throws EditException {
dataflow.addInputPort(port);
}
};
}
@Override
public Edit<Dataflow> getAddDataflowOutputPortEdit(Dataflow dataflow,
final DataflowOutputPort dataflowOutputPort) {
if (!(dataflowOutputPort instanceof DataflowOutputPortImpl))
throw new RuntimeException(
"The DataflowOutputPort is of the wrong implmentation, "
+ "it should be of type DataflowOutputPortImpl");
return new AbstractDataflowEdit(dataflow) {
@Override
protected void doEditAction(DataflowImpl dataflow)
throws EditException {
dataflow.addOutputPort((DataflowOutputPortImpl) dataflowOutputPort);
}
};
}
@Override
@SuppressWarnings({ "unchecked", "rawtypes" })
public Edit<Activity<?>> getAddActivityInputPortEdit(Activity<?> activity,
final ActivityInputPort activityInputPort) {
return new AbstractActivityEdit(activity) {
@Override
protected void doEditAction(AbstractActivity activity) {
activity.getInputPorts().add(activityInputPort);
}
};
}
@Override
@SuppressWarnings({ "unchecked", "rawtypes" })
public Edit<Activity<?>> getAddActivityInputPortMappingEdit(
Activity<?> activity, final String processorPortName,
final String activityPortName) {
return new AbstractActivityEdit(activity) {
@Override
protected void doEditAction(AbstractActivity activity)
throws EditException {
if (activity.getInputPortMapping().containsKey(
processorPortName))
throw new EditException(
"The output mapping for processor name:"
+ processorPortName + " already exists");
/*
* Note javadoc of getOutputPortMapping - the mapping is
* processorPort -> activityPort -- opposite of the
* outputPortMapping
*/
activity.getInputPortMapping().put(processorPortName,
activityPortName);
}
};
}
@Override
@SuppressWarnings({ "unchecked", "rawtypes" })
public Edit<Activity<?>> getAddActivityOutputPortEdit(Activity<?> activity,
final ActivityOutputPort activityOutputPort) {
return new AbstractActivityEdit(activity) {
@Override
protected void doEditAction(AbstractActivity activity) {
activity.getOutputPorts().add(activityOutputPort);
}
};
}
@Override
@SuppressWarnings({ "unchecked", "rawtypes" })
public Edit<Activity<?>> getAddActivityOutputPortMappingEdit(
Activity<?> activity, final String processorPortName,
final String activityPortName) {
return new AbstractActivityEdit(activity) {
@Override
protected void doEditAction(AbstractActivity activity)
throws EditException {
Map<String, String> opm = activity.getOutputPortMapping();
if (opm.containsKey(activityPortName))
throw new EditException("The mapping starting with:"
+ activityPortName + " already exists");
/*
* Note javadoc of getOutputPortMapping - the mapping is
* activityPort -> processorPort -- opposite of the
* outputPortMapping
*/
opm.put(activityPortName, processorPortName);
}
};
}
@Override
@SuppressWarnings({ "unchecked", "rawtypes" })
public Edit<Activity<?>> getRemoveActivityInputPortEdit(
Activity<?> activity, final ActivityInputPort activityInputPort) {
return new AbstractActivityEdit(activity) {
@Override
protected void doEditAction(AbstractActivity activity)
throws EditException {
activity.getInputPorts().remove(activityInputPort);
}
};
}
@Override
@SuppressWarnings({ "unchecked", "rawtypes" })
public Edit<Activity<?>> getRemoveActivityInputPortMappingEdit(
Activity<?> activity, final String processorPortName) {
return new AbstractActivityEdit(activity) {
@Override
protected void doEditAction(AbstractActivity activity)
throws EditException {
if (!activity.getInputPortMapping().containsKey(processorPortName))
throw new EditException(
"The input port mapping for the processor port name:"
+ processorPortName + " doesn't exist");
activity.getInputPortMapping().remove(processorPortName);
}
};
}
@Override
@SuppressWarnings({ "unchecked", "rawtypes" })
public Edit<Activity<?>> getRemoveActivityOutputPortEdit(
Activity<?> activity, final ActivityOutputPort activityOutputPort) {
return new AbstractActivityEdit(activity) {
@Override
protected void doEditAction(AbstractActivity activity)
throws EditException {
activity.getOutputPorts().remove(activityOutputPort);
}
};
}
@Override
@SuppressWarnings({ "unchecked", "rawtypes" })
public Edit<Activity<?>> getRemoveActivityOutputPortMappingEdit(
Activity<?> activity, final String processorPortName) {
return new AbstractActivityEdit(activity) {
@Override
protected void doEditAction(AbstractActivity activity)
throws EditException {
if (!activity.getOutputPortMapping().containsKey(processorPortName))
throw new EditException(
"The output port mapping for the processor port name:"
+ processorPortName + " doesn't exist");
activity.getOutputPortMapping().remove(processorPortName);
}
};
}
@Override
public Edit<Merge> getAddMergeInputPortEdit(Merge merge,
MergeInputPort mergeInputPort) {
if (!(mergeInputPort instanceof MergeInputPortImpl))
throw new RuntimeException(
"The MergeInputPort is of the wrong implmentation,"
+ " it should be of type MergeInputPortImpl");
final MergeInputPortImpl port = (MergeInputPortImpl) mergeInputPort;
return new AbstractMergeEdit(merge) {
@Override
protected void doEditAction(MergeImpl mergeImpl) {
mergeImpl.addInputPort(port);
}
};
}
@Override
@SuppressWarnings({ "rawtypes", "unchecked" })
public <T> Edit<Activity<?>> getConfigureActivityEdit(Activity<T> activity,
T configurationBean) {
return new ConfigureEdit(Activity.class, activity, configurationBean);
}
@Override
public Edit<Processor> getRemoveProcessorInputPortEdit(Processor processor,
final ProcessorInputPort port) {
return new AbstractProcessorEdit(processor) {
@Override
protected void doEditAction(ProcessorImpl processor) throws EditException {
if (processor.getInputPortWithName(port.getName()) == null)
throw new EditException("The processor doesn't have a port named:"
+ port.getName());
for (IterationStrategyImpl is : processor.iterationStack
.getStrategies())
is.removeInputByName(port.getName());
processor.inputPorts.remove(port);
}
};
}
@Override
public Edit<Processor> getRemoveProcessorOutputPortEdit(
Processor processor, final ProcessorOutputPort port) {
return new AbstractProcessorEdit(processor) {
@Override
protected void doEditAction(ProcessorImpl processor) throws EditException {
if (processor.getOutputPortWithName(port.getName()) == null)
throw new EditException("The processor doesn't have a port named:"
+ port.getName());
processor.outputPorts.remove(port);
}
};
}
@Override
public Edit<Processor> getMapProcessorPortsForActivityEdit(
final Processor processor) {
return new EditSupport<Processor>() {
@Override
public Processor applyEdit() throws EditException {
List<Edit<?>> edits = new ArrayList<>();
Activity<?> a = processor.getActivityList().get(0);
List<ProcessorInputPort> inputPortsForRemoval = determineInputPortsForRemoval(
processor, a);
List<ProcessorOutputPort> outputPortsForRemoval = determineOutputPortsForRemoval(
processor, a);
Map<ProcessorInputPort, ActivityInputPort> changedInputPorts = determineChangedInputPorts(
processor, a);
Map<ProcessorOutputPort, ActivityOutputPort> changedOutputPorts = determineChangedOutputPorts(
processor, a);
for (ProcessorInputPort ip : inputPortsForRemoval) {
if (ip.getIncomingLink() != null)
edits.add(getDisconnectDatalinkEdit(ip
.getIncomingLink()));
edits.add(getRemoveProcessorInputPortEdit(processor, ip));
if (a.getInputPortMapping().containsKey(ip.getName()))
edits.add(getRemoveActivityInputPortMappingEdit(a, ip
.getName()));
}
for (ProcessorOutputPort op : outputPortsForRemoval) {
if (!op.getOutgoingLinks().isEmpty())
for (Datalink link : op.getOutgoingLinks())
edits.add(getDisconnectDatalinkEdit(link));
edits.add(getRemoveProcessorOutputPortEdit(processor, op));
if (a.getOutputPortMapping().containsKey(op.getName()))
edits.add(getRemoveActivityOutputPortMappingEdit(a, op
.getName()));
}
for (ProcessorInputPort ip : changedInputPorts.keySet()) {
Datalink incomingLink = ip.getIncomingLink();
if (incomingLink != null)
edits.add(getDisconnectDatalinkEdit(incomingLink));
edits.add(getRemoveProcessorInputPortEdit(processor, ip));
if (incomingLink != null) {
ActivityInputPort aip = changedInputPorts.get(ip);
ProcessorInputPort pip = createProcessorInputPort(processor,
aip.getName(), aip.getDepth());
edits.add(getAddProcessorInputPortEdit(processor, pip));
edits.add(getConnectDatalinkEdit(new DatalinkImpl(
incomingLink.getSource(), pip)));
}
}
for (ProcessorOutputPort op : changedOutputPorts.keySet()) {
Set<? extends Datalink> outgoingLinks = op.getOutgoingLinks();
for (Datalink link : outgoingLinks)
edits.add(getDisconnectDatalinkEdit(link));
edits.add(getRemoveProcessorOutputPortEdit(processor, op));
if (!outgoingLinks.isEmpty()) {
ActivityOutputPort aop = changedOutputPorts.get(op);
ProcessorOutputPort pop = createProcessorOutputPort(
processor, aop.getName(), aop.getDepth(),
aop.getGranularDepth());
edits.add(getAddProcessorOutputPortEdit(processor,
pop));
for (Datalink link : outgoingLinks)
edits.add(getConnectDatalinkEdit(createDatalink(
pop, link.getSink())));
}
}
new CompoundEdit(edits).doEdit();
return processor;
}
@Override
public Object getSubject() {
return processor;
}
private List<ProcessorInputPort> determineInputPortsForRemoval(Processor p,
Activity<?> a) {
List<ProcessorInputPort> result = new ArrayList<>();
processorPorts: for (ProcessorInputPort pPort : p.getInputPorts()) {
for (ActivityInputPort aPort : a.getInputPorts())
if (aPort.getName().equals(pPort.getName()))
continue processorPorts;
result.add(pPort);
}
return result;
}
private List<ProcessorOutputPort> determineOutputPortsForRemoval(
Processor p, Activity<?> a) {
List<ProcessorOutputPort> result = new ArrayList<>();
processorPorts: for (ProcessorOutputPort pPort : p.getOutputPorts()) {
for (OutputPort aPort : a.getOutputPorts())
if (aPort.getName().equals(pPort.getName()))
continue processorPorts;
result.add(pPort);
}
return result;
}
private Map<ProcessorInputPort, ActivityInputPort> determineChangedInputPorts(
Processor p, Activity<?> a) {
Map<ProcessorInputPort, ActivityInputPort> result = new HashMap<>();
for (ProcessorInputPort pPort : p.getInputPorts())
for (ActivityInputPort aPort : a.getInputPorts())
if (aPort.getName().equals(pPort.getName())) {
if (pPort.getDepth() != aPort.getDepth())
result.put(pPort, aPort);
break;
}
return result;
}
private Map<ProcessorOutputPort, ActivityOutputPort> determineChangedOutputPorts(
Processor p, Activity<?> a) {
Map<ProcessorOutputPort, ActivityOutputPort> result = new HashMap<>();
for (ProcessorOutputPort pPort : p.getOutputPorts())
for (OutputPort aPort : a.getOutputPorts())
if (aPort.getName().equals(pPort.getName())) {
if ((pPort.getDepth() != aPort.getDepth())
|| (pPort.getGranularDepth() != aPort
.getGranularDepth()))
result.put(pPort, (ActivityOutputPort) aPort);
break;
}
return result;
}
};
}
private static final int DEFAULT_MAX_JOBS = 1;
@Override
public Edit<Processor> getDefaultDispatchStackEdit(Processor processor) {
DispatchStackImpl stack = ((ProcessorImpl) processor)
.getDispatchStack();
// Top level parallelise layer
int layer = 0;
List<Edit<?>> edits = new ArrayList<>();
edits.add(getAddDispatchLayerEdit(stack, new Parallelize(DEFAULT_MAX_JOBS),
layer++));
edits.add(getAddDispatchLayerEdit(stack, new ErrorBounce(), layer++));
edits.add(getAddDispatchLayerEdit(stack, new Failover(), layer++));
edits.add(getAddDispatchLayerEdit(stack, new Retry(), layer++));
edits.add(getAddDispatchLayerEdit(stack, new Stop(), layer++));
edits.add(getAddDispatchLayerEdit(stack, new Invoke(), layer++));
final Edit<?> compoundEdit = new CompoundEdit(edits);
return new AbstractProcessorEdit(processor) {
@Override
protected void doEditAction(ProcessorImpl processor)
throws EditException {
compoundEdit.doEdit();
}
};
}
@Override
public Edit<Processor> getSetIterationStrategyStackEdit(
Processor processor,
final IterationStrategyStack iterationStrategyStack) {
if (!(iterationStrategyStack instanceof IterationStrategyStackImpl))
throw new RuntimeException(
"Unknown implementation of iteration strategy "
+ iterationStrategyStack);
return new AbstractProcessorEdit(processor) {
@Override
protected void doEditAction(ProcessorImpl processor)
throws EditException {
processor.iterationStack = (IterationStrategyStackImpl) iterationStrategyStack;
}
};
}
@Override
@SuppressWarnings({ "unchecked", "rawtypes" })
public <T> Edit<? extends Configurable<T>> getConfigureEdit(
Configurable<T> configurable, T configBean) {
return new ConfigureEdit(Configurable.class, configurable, configBean);
}
@Override
public Edit<Merge> getReorderMergeInputPortsEdit(Merge merge,
final List<MergeInputPort> reorderedMergeInputPortList) {
return new AbstractMergeEdit(merge) {
@Override
protected void doEditAction(MergeImpl mergeImpl) {
mergeImpl.reorderInputPorts(reorderedMergeInputPortList);
}
};
}
@Override
public Edit<Processor> getRemoveActivityEdit(Processor processor,
final Activity<?> activity) {
return new AbstractProcessorEdit(processor) {
@Override
protected void doEditAction(ProcessorImpl processor) {
processor.activityList.remove(activity);
}
};
}
@Override
public Edit<IterationStrategyStack> getAddIterationStrategyEdit(
IterationStrategyStack iterationStrategyStack,
final IterationStrategy strategy) {
if (!(iterationStrategyStack instanceof IterationStrategyStackImpl))
throw new RuntimeException(
"Object being edited must be instance of IterationStrategyStackImpl");
final IterationStrategyStackImpl stack = (IterationStrategyStackImpl) iterationStrategyStack;
return new EditSupport<IterationStrategyStack>() {
@Override
public IterationStrategyStack applyEdit() {
stack.addStrategy(strategy);
return stack;
}
@Override
public IterationStrategyStack getSubject() {
return stack;
}
};
}
@Override
public Edit<IterationStrategy> getAddIterationStrategyInputNodeEdit(
IterationStrategy iterationStrategy,
final NamedInputPortNode namedInputPortNode) {
if (!(iterationStrategy instanceof IterationStrategyImpl))
throw new RuntimeException(
"Object being edited must be instance of IterationStrategyImpl");
final IterationStrategyImpl strategy = (IterationStrategyImpl) iterationStrategy;
return new EditSupport<IterationStrategy>() {
@Override
public IterationStrategy applyEdit() throws EditException {
strategy.addInput(namedInputPortNode);
return strategy;
}
@Override
public IterationStrategy getSubject() {
return strategy;
}
};
}
@Override
public Edit<IterationStrategyStack> getClearIterationStrategyStackEdit(
final IterationStrategyStack iterationStrategyStack) {
if (!(iterationStrategyStack instanceof IterationStrategyStackImpl))
throw new RuntimeException(
"Object being edited must be instance of IterationStrategyStackImpl");
return new EditSupport<IterationStrategyStack>() {
@Override
public IterationStrategyStack applyEdit() throws EditException {
((IterationStrategyStackImpl) iterationStrategyStack).clear();
return iterationStrategyStack;
}
@Override
public IterationStrategyStack getSubject() {
return iterationStrategyStack;
}
};
}
}