blob: bb5bcbe654efea04de9d213eda3ba2cba9feba32 [file] [log] [blame]
package org.apache.taverna.scufl2.api.common;
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
import java.net.URI;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Set;
import javax.xml.bind.PropertyException;
import org.apache.taverna.scufl2.api.activity.Activity;
import org.apache.taverna.scufl2.api.annotation.Annotation;
import org.apache.taverna.scufl2.api.common.Visitor.VisitorWithPath;
import org.apache.taverna.scufl2.api.configurations.Configuration;
import org.apache.taverna.scufl2.api.container.WorkflowBundle;
import org.apache.taverna.scufl2.api.core.BlockingControlLink;
import org.apache.taverna.scufl2.api.core.ControlLink;
import org.apache.taverna.scufl2.api.core.DataLink;
import org.apache.taverna.scufl2.api.core.Processor;
import org.apache.taverna.scufl2.api.core.Workflow;
import org.apache.taverna.scufl2.api.iterationstrategy.CrossProduct;
import org.apache.taverna.scufl2.api.iterationstrategy.IterationStrategyStack;
import org.apache.taverna.scufl2.api.iterationstrategy.PortNode;
import org.apache.taverna.scufl2.api.port.ActivityPort;
import org.apache.taverna.scufl2.api.port.InputActivityPort;
import org.apache.taverna.scufl2.api.port.InputPort;
import org.apache.taverna.scufl2.api.port.InputProcessorPort;
import org.apache.taverna.scufl2.api.port.OutputActivityPort;
import org.apache.taverna.scufl2.api.port.OutputPort;
import org.apache.taverna.scufl2.api.port.OutputProcessorPort;
import org.apache.taverna.scufl2.api.port.Port;
import org.apache.taverna.scufl2.api.port.ProcessorPort;
import org.apache.taverna.scufl2.api.port.ReceiverPort;
import org.apache.taverna.scufl2.api.port.SenderPort;
import org.apache.taverna.scufl2.api.profiles.ProcessorBinding;
import org.apache.taverna.scufl2.api.profiles.ProcessorInputPortBinding;
import org.apache.taverna.scufl2.api.profiles.ProcessorOutputPortBinding;
import org.apache.taverna.scufl2.api.profiles.ProcessorPortBinding;
import org.apache.taverna.scufl2.api.profiles.Profile;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.node.ObjectNode;
/**
* Utility methods for dealing with SCUFL2 models
*
* @author Stian Soiland-Reyes
*/
public class Scufl2Tools {
private static final String CONSTANT_STRING = "string";
private static final String CONSTANT_VALUE_PORT = "value";
public static URI PORT_DEFINITION = URI
.create("http://ns.taverna.org.uk/2010/scufl2#portDefinition");
private static URITools uriTools = new URITools();
public static URI NESTED_WORKFLOW = URI
.create("http://ns.taverna.org.uk/2010/activity/nested-workflow");
/**
* Compare {@link ProcessorBinding}s by their
* {@link ProcessorBinding#getActivityPosition()}.
* <p>
* <b>Note:</b> this comparator imposes orderings that are inconsistent with
* equals.
*
* @author Stian Soiland-Reyes
*/
public static class BindingComparator implements
Comparator<ProcessorBinding> {
@Override
public int compare(ProcessorBinding o1, ProcessorBinding o2) {
return o1.getActivityPosition() - o2.getActivityPosition();
}
}
public List<Annotation> annotationsFor(Child<?> bean) {
WorkflowBundle bundle = findParent(WorkflowBundle.class, bean);
return annotationsFor(bean, bundle);
}
public List<Annotation> annotationsFor(WorkflowBean bean,
WorkflowBundle bundle) {
ArrayList<Annotation> annotations = new ArrayList<>();
if (bundle == null)
return annotations;
for (Annotation ann : bundle.getAnnotations())
if (ann.getTarget().equals(bean))
annotations.add(ann);
return annotations;
}
/**
* Returns the {@link Configuration} for a {@link Configurable} in the given
* {@link Profile}.
*
* @param configurable
* the <code>Configurable</code> to find a
* <code>Configuration</code> for
* @param profile
* the <code>Profile</code> to look for the
* <code>Configuration</code> in
* @return the <code>Configuration</code> for a <code>Configurable</code> in
* the given <code>Profile</code>
*/
public Configuration configurationFor(Configurable configurable,
Profile profile) {
List<Configuration> configurations = configurationsFor(configurable,
profile);
if (configurations.isEmpty())
throw new IndexOutOfBoundsException(
"Could not find configuration for " + configurable);
if (configurations.size() > 1)
throw new IllegalStateException("More than one configuration for "
+ configurable);
return configurations.get(0);
}
public Configuration configurationForActivityBoundToProcessor(
Processor processor, Profile profile) {
ProcessorBinding binding = processorBindingForProcessor(processor,
profile);
return configurationFor(binding.getBoundActivity(), profile);
}
/**
* Returns the list of {@link Configuration Configurations} for a
* {@link Configurable} in the given {@link Profile}.
*
* @param configurable
* the <code>Configurable</code> to find a
* <code>Configuration</code> for
* @param profile
* the <code>Profile</code> to look for the
* <code>Configuration</code> in
* @return the list of <code>Configurations</code> for a
* <code>Configurable</code> in the given <code>Profile</code>
*/
public List<Configuration> configurationsFor(Configurable configurable,
Profile profile) {
List<Configuration> configurations = new ArrayList<>();
for (Configuration config : profile.getConfigurations())
if (configurable.equals(config.getConfigures()))
configurations.add(config);
// Collections.sort(configurations);
return configurations;
}
@SuppressWarnings("unchecked")
public List<BlockingControlLink> controlLinksBlocking(Processor blocked) {
List<BlockingControlLink> controlLinks = new ArrayList<>();
for (ControlLink link : blocked.getParent().getControlLinks()) {
if (!(link instanceof BlockingControlLink))
continue;
BlockingControlLink blockingControlLink = (BlockingControlLink) link;
if (blockingControlLink.getBlock().equals(blocked))
controlLinks.add(blockingControlLink);
}
Collections.sort(controlLinks);
return controlLinks;
}
@SuppressWarnings("unchecked")
public List<BlockingControlLink> controlLinksWaitingFor(
Processor untilFinished) {
List<BlockingControlLink> controlLinks = new ArrayList<>();
for (ControlLink link : untilFinished.getParent().getControlLinks()) {
if (!(link instanceof BlockingControlLink))
continue;
BlockingControlLink blockingControlLink = (BlockingControlLink) link;
if (blockingControlLink.getUntilFinished().equals(untilFinished))
controlLinks.add(blockingControlLink);
}
Collections.sort(controlLinks);
return controlLinks;
}
@SuppressWarnings("unchecked")
public List<DataLink> datalinksFrom(SenderPort senderPort) {
Workflow wf = findParent(Workflow.class, (Child<Workflow>) senderPort);
List<DataLink> links = new ArrayList<>();
for (DataLink link : wf.getDataLinks())
if (link.getReceivesFrom().equals(senderPort))
links.add(link);
Collections.sort(links);
return links;
}
@SuppressWarnings("unchecked")
public List<DataLink> datalinksTo(ReceiverPort receiverPort) {
Workflow wf = findParent(Workflow.class, (Child<Workflow>) receiverPort);
List<DataLink> links = new ArrayList<>();
for (DataLink link : wf.getDataLinks())
if (link.getSendsTo().equals(receiverPort))
links.add(link);
Collections.sort(links);
return links;
}
public <T extends WorkflowBean> T findParent(Class<T> parentClass,
Child<?> child) {
WorkflowBean parent = child.getParent();
if (parent == null)
return null;
if (parentClass.isAssignableFrom(parent.getClass())) {
@SuppressWarnings("unchecked")
T foundParent = (T) parent;
return foundParent;
}
if (parent instanceof Child)
return findParent(parentClass, (Child<?>) parent);
return null;
}
public JsonNode portDefinitionFor(ActivityPort activityPort, Profile profile)
throws PropertyException {
Configuration actConfig = configurationFor(activityPort.getParent(),
profile);
JsonNode portDef = actConfig.getJson().get("portDefinition");
if (portDef == null)
return null;
URI portPath = uriTools.relativeUriForBean(activityPort,
activityPort.getParent());
// e.g. "in/input1" or "out/output2"
return portDef.get(portPath.toString());
}
public ProcessorBinding processorBindingForProcessor(Processor processor,
Profile profile) {
List<ProcessorBinding> bindings = processorBindingsForProcessor(
processor, profile);
if (bindings.isEmpty())
throw new IndexOutOfBoundsException("Could not find bindings for "
+ processor);
if (bindings.size() > 1)
throw new IllegalStateException("More than one proc binding for "
+ processor);
return bindings.get(0);
}
public List<ProcessorBinding> processorBindingsForProcessor(
Processor processor, Profile profile) {
List<ProcessorBinding> bindings = new ArrayList<>();
for (ProcessorBinding pb : profile.getProcessorBindings())
if (pb.getBoundProcessor().equals(processor))
bindings.add(pb);
Collections.sort(bindings, new BindingComparator());
return bindings;
}
public List<ProcessorBinding> processorBindingsToActivity(Activity activity) {
Profile profile = activity.getParent();
List<ProcessorBinding> bindings = new ArrayList<>();
for (ProcessorBinding pb : profile.getProcessorBindings())
if (pb.getBoundActivity().equals(activity))
bindings.add(pb);
Collections.sort(bindings, new BindingComparator());
return bindings;
}
public ProcessorInputPortBinding processorPortBindingForPort(
InputPort inputPort, Profile profile) {
return (ProcessorInputPortBinding) processorPortBindingForPortInternal(
inputPort, profile);
}
public ProcessorOutputPortBinding processorPortBindingForPort(
OutputPort outputPort, Profile profile) {
return (ProcessorOutputPortBinding) processorPortBindingForPortInternal(
outputPort, profile);
}
protected ProcessorPortBinding<?, ?> processorPortBindingForPortInternal(
Port port, Profile profile) {
List<ProcessorBinding> processorBindings;
if (port instanceof ProcessorPort) {
ProcessorPort processorPort = (ProcessorPort) port;
processorBindings = processorBindingsForProcessor(
processorPort.getParent(), profile);
} else if (port instanceof ActivityPort) {
ActivityPort activityPort = (ActivityPort) port;
processorBindings = processorBindingsToActivity(activityPort
.getParent());
} else
throw new IllegalArgumentException(
"Port must be a ProcessorPort or ActivityPort");
for (ProcessorBinding procBinding : processorBindings) {
ProcessorPortBinding<?, ?> portBinding = processorPortBindingInternalInBinding(
port, procBinding);
if (portBinding != null)
return portBinding;
}
return null;
}
protected ProcessorPortBinding<?, ?> processorPortBindingInternalInBinding(
Port port, ProcessorBinding procBinding) {
Set<? extends ProcessorPortBinding<?, ?>> portBindings;
if (port instanceof InputPort)
portBindings = procBinding.getInputPortBindings();
else
portBindings = procBinding.getOutputPortBindings();
for (ProcessorPortBinding<?, ?> portBinding : portBindings) {
if (port instanceof ProcessorPort
&& portBinding.getBoundProcessorPort().equals(port))
return portBinding;
if (port instanceof ActivityPort
&& portBinding.getBoundActivityPort().equals(port))
return portBinding;
}
return null;
}
public void setParents(WorkflowBundle bundle) {
bundle.accept(new VisitorWithPath() {
@Override
public boolean visit() {
WorkflowBean node = getCurrentNode();
if (node instanceof Child) {
@SuppressWarnings("unchecked")
Child<WorkflowBean> child = (Child<WorkflowBean>) node;
WorkflowBean parent = getCurrentPath().peek();
if (child.getParent() != parent)
child.setParent(parent);
}
return true;
}
});
}
/**
* Find processors that a given processor can connect to downstream.
* <p>
* This is calculated as all processors in the dataflow, except the
* processor itself, and any processor <em>upstream</em>, following both
* data links and conditional links.
*
* @see #possibleUpStreamProcessors(Dataflow, Processor)
* @see #splitProcessors(Collection, Processor)
*
* @param dataflow
* Dataflow from where to find processors
* @param processor
* Processor which is to be connected
* @return A set of possible downstream processors
*/
public Set<Processor> possibleDownStreamProcessors(Workflow dataflow,
Processor processor) {
ProcessorSplit splitProcessors = splitProcessors(
dataflow.getProcessors(), processor);
Set<Processor> possibles = new HashSet<>(
splitProcessors.getUnconnected());
possibles.addAll(splitProcessors.getDownStream());
return possibles;
}
/**
* Find processors that a given processor can connect to upstream.
* <p>
* This is calculated as all processors in the dataflow, except the
* processor itself, and any processor <em>downstream</em>, following both
* data links and conditional links.
*
* @see #possibleDownStreamProcessors(Dataflow, Processor)
* @see #splitProcessors(Collection, Processor)
*
* @param dataflow
* Dataflow from where to find processors
* @param processor
* Processor which is to be connected
* @return A set of possible downstream processors
*/
public Set<Processor> possibleUpStreamProcessors(Workflow dataflow,
Processor firstProcessor) {
ProcessorSplit splitProcessors = splitProcessors(
dataflow.getProcessors(), firstProcessor);
Set<Processor> possibles = new HashSet<>(
splitProcessors.getUnconnected());
possibles.addAll(splitProcessors.getUpStream());
return possibles;
}
/**
* @param processors
* @param splitPoint
* @return
*/
public ProcessorSplit splitProcessors(Collection<Processor> processors,
Processor splitPoint) {
Set<Processor> upStream = new HashSet<>();
Set<Processor> downStream = new HashSet<>();
Set<Processor> queue = new HashSet<>();
queue.add(splitPoint);
// First let's go upstream
while (!queue.isEmpty()) {
Processor processor = queue.iterator().next();
queue.remove(processor);
List<BlockingControlLink> preConditions = controlLinksBlocking(processor);
for (BlockingControlLink condition : preConditions) {
Processor upstreamProc = condition.getUntilFinished();
if (!upStream.contains(upstreamProc)) {
upStream.add(upstreamProc);
queue.add(upstreamProc);
}
}
for (InputProcessorPort inputPort : processor.getInputPorts())
for (DataLink incomingLink : datalinksTo(inputPort)) {
SenderPort source = incomingLink.getReceivesFrom();
if (!(source instanceof OutputProcessorPort))
continue;
Processor upstreamProc = ((OutputProcessorPort) source)
.getParent();
if (!upStream.contains(upstreamProc)) {
upStream.add(upstreamProc);
queue.add(upstreamProc);
}
}
}
// Our split
queue.add(splitPoint);
// Then downstream
while (!queue.isEmpty()) {
Processor processor = queue.iterator().next();
queue.remove(processor);
List<BlockingControlLink> controlledConditions = controlLinksWaitingFor(processor);
for (BlockingControlLink condition : controlledConditions) {
Processor downstreamProc = condition.getBlock();
if (!downStream.contains(downstreamProc)) {
downStream.add(downstreamProc);
queue.add(downstreamProc);
}
}
for (OutputProcessorPort outputPort : processor.getOutputPorts())
for (DataLink datalink : datalinksFrom(outputPort)) {
ReceiverPort sink = datalink.getSendsTo();
if (!(sink instanceof InputProcessorPort))
continue;
Processor downstreamProcc = ((InputProcessorPort) sink)
.getParent();
if (!downStream.contains(downstreamProcc)) {
downStream.add(downstreamProcc);
queue.add(downstreamProcc);
}
}
}
Set<Processor> undecided = new HashSet<>(processors);
undecided.remove(splitPoint);
undecided.removeAll(upStream);
undecided.removeAll(downStream);
return new ProcessorSplit(splitPoint, upStream, downStream, undecided);
}
/**
* Result bean returned from
* {@link Scufl2Tools#splitProcessors(Collection, Processor)}.
*
* @author Stian Soiland-Reyes
*/
public static class ProcessorSplit {
private final Processor splitPoint;
private final Set<Processor> upStream;
private final Set<Processor> downStream;
private final Set<Processor> unconnected;
/**
* Processor that was used as a split point.
*
* @return Split point processor
*/
public Processor getSplitPoint() {
return splitPoint;
}
/**
* Processors that are upstream from the split point.
*
* @return Upstream processors
*/
public Set<Processor> getUpStream() {
return upStream;
}
/**
* Processors that are downstream from the split point.
*
* @return Downstream processors
*/
public Set<Processor> getDownStream() {
return downStream;
}
/**
* Processors that are unconnected to the split point.
* <p>
* These are processors in the dataflow that are neither upstream,
* downstream or the split point itself.
* <p>
* Note that this does not imply a total graph separation, for instance
* processors in {@link #getUpStream()} might have some of these
* unconnected processors downstream, but not along the path to the
* {@link #getSplitPoint()}, or they could be upstream from any
* processor in {@link #getDownStream()}.
*
* @return Processors unconnected from the split point
*/
public Set<Processor> getUnconnected() {
return unconnected;
}
/**
* Construct a new processor split result.
*
* @param splitPoint
* Processor used as split point
* @param upStream
* Processors that are upstream from split point
* @param downStream
* Processors that are downstream from split point
* @param unconnected
* The rest of the processors, that are by definition
* unconnected to split point
*/
public ProcessorSplit(Processor splitPoint, Set<Processor> upStream,
Set<Processor> downStream, Set<Processor> unconnected) {
this.splitPoint = splitPoint;
this.upStream = upStream;
this.downStream = downStream;
this.unconnected = unconnected;
}
}
/**
* Return nested workflow for processor as configured in given profile.
* <p>
* A nested workflow is an activity bound to the processor with the
* configurable type equal to {@value #NESTED_WORKFLOW}.
* <p>
* This method returns <code>null</code> if no such workflow was found,
* otherwise the configured workflow.
* <p
* Note that even if several bindings/configurations map to a different
* workflow, this method throws an IllegalStateException. Most workflows
* will only have a single workflow for a given profile, to handle more
* complex cases use instead
* {@link #nestedWorkflowsForProcessor(Processor, Profile)}.
*
* @throws NullPointerException
* if the given profile does not have a parent
* @throws IllegalStateException
* if a nested workflow configuration is invalid, or more than
* one possible workflow is found
*
* @param processor
* Processor which might have a nested workflow
* @param profile
* Profile to look for nested workflow activity/configuration.
* The profile must have a {@link WorkflowBundle} set as its
* {@link Profile#setParent(WorkflowBundle)}.
* @return The configured nested workflows for processor
*/
public Workflow nestedWorkflowForProcessor(Processor processor,
Profile profile) {
List<Workflow> wfs = nestedWorkflowsForProcessor(processor, profile);
if (wfs.isEmpty())
return null;
if (wfs.size() > 1)
throw new IllegalStateException(
"More than one possible workflow for processor "
+ processor);
return wfs.get(0);
}
/**
* Return list of nested workflows for processor as configured in given
* profile.
* <p>
* A nested workflow is an activity bound to the processor with the
* configurable type equal to {@value #NESTED_WORKFLOW}.
* <p>
* This method returns a list of 0 or more workflows, as every matching
* {@link ProcessorBinding} and every matching {@link Configuration} for the
* bound activity is considered. Normally there will only be a single nested
* workflow, in which case the
* {@link #nestedWorkflowForProcessor(Processor, Profile)} method should be
* used instead.
* <p>
* Note that even if several bindings/configurations map to the same
* workflow, each workflow is only included once in the list. Nested
* workflow configurations that are incomplete or which #workflow can't be
* found within the workflow bundle of the profile will be silently ignored.
*
* @throws NullPointerException
* if the given profile does not have a parent
* @throws IllegalStateException
* if a nested workflow configuration is invalid
*
* @param processor
* Processor which might have a nested workflow
* @param profile
* Profile to look for nested workflow activity/configuration.
* The profile must have a {@link WorkflowBundle} set as its
* {@link Profile#setParent(WorkflowBundle)}.
* @return List of configured nested workflows for processor
*/
public List<Workflow> nestedWorkflowsForProcessor(Processor processor,
Profile profile) {
WorkflowBundle bundle = profile.getParent();
if (bundle == null)
throw new NullPointerException("Parent must be set for " + profile);
ArrayList<Workflow> workflows = new ArrayList<>();
for (ProcessorBinding binding : processorBindingsForProcessor(
processor, profile)) {
if (!binding.getBoundActivity().getType().equals(NESTED_WORKFLOW))
continue;
for (Configuration c : configurationsFor(
binding.getBoundActivity(), profile)) {
JsonNode nested = c.getJson().get("nestedWorkflow");
Workflow wf = bundle.getWorkflows().getByName(nested.asText());
if (wf != null && !workflows.contains(wf))
workflows.add(wf);
}
}
return workflows;
}
/**
* Returns true if processor contains a nested workflow in any of its
* activities in any of its profiles.
*/
public boolean containsNestedWorkflow(Processor processor) {
for (Profile profile : processor.getParent().getParent().getProfiles())
if (containsNestedWorkflow(processor, profile))
return true;
return false;
}
/**
* Returns true if processor contains a nested workflow in the specified
* profile.
*/
public boolean containsNestedWorkflow(Processor processor, Profile profile) {
for (ProcessorBinding binding : processorBindingsForProcessor(
processor, profile))
if (binding.getBoundActivity().getType().equals(NESTED_WORKFLOW))
return true;
return false;
}
public void createActivityPortsFromProcessor(Activity activity,
Processor processor) {
for (InputProcessorPort processorPort : processor.getInputPorts())
new InputActivityPort(activity, processorPort.getName())
.setDepth(processorPort.getDepth());
for (OutputProcessorPort processorPort : processor.getOutputPorts()) {
OutputActivityPort activityPort = new OutputActivityPort(activity,
processorPort.getName());
activityPort.setDepth(processorPort.getDepth());
activityPort.setGranularDepth(processorPort.getGranularDepth());
}
}
public void createProcessorPortsFromActivity(Processor processor,
Activity activity) {
for (InputActivityPort activityPort : activity.getInputPorts())
new InputProcessorPort(processor, activityPort.getName())
.setDepth(activityPort.getDepth());
for (OutputActivityPort activityPort : activity.getOutputPorts()) {
OutputProcessorPort procPort = new OutputProcessorPort(processor,
activityPort.getName());
procPort.setDepth(activityPort.getDepth());
procPort.setGranularDepth(activityPort.getGranularDepth());
}
}
public ProcessorBinding bindActivityToProcessorByMatchingPorts(
Activity activity, Processor processor) {
ProcessorBinding binding = new ProcessorBinding();
binding.setParent(activity.getParent());
binding.setBoundActivity(activity);
binding.setBoundProcessor(processor);
bindActivityToProcessorByMatchingPorts(binding);
return binding;
}
public void bindActivityToProcessorByMatchingPorts(ProcessorBinding binding) {
Activity activity = binding.getBoundActivity();
Processor processor = binding.getBoundProcessor();
for (InputActivityPort activityPort : activity.getInputPorts()) {
InputProcessorPort processorPort = processor.getInputPorts()
.getByName(activityPort.getName());
if (processorPort != null
&& processorPortBindingInternalInBinding(processorPort,
binding) == null)
new ProcessorInputPortBinding(binding, processorPort,
activityPort);
}
for (OutputProcessorPort processorPort : processor.getOutputPorts()) {
OutputActivityPort activityPort = activity.getOutputPorts()
.getByName(processorPort.getName());
if (activityPort != null
&& processorPortBindingInternalInBinding(activityPort,
binding) == null)
new ProcessorOutputPortBinding(binding, activityPort,
processorPort);
}
}
public ProcessorBinding createProcessorAndBindingFromActivity(
Activity activity) {
Processor proc = new Processor();
proc.setName(activity.getName());
createProcessorPortsFromActivity(proc, activity);
return bindActivityToProcessorByMatchingPorts(activity, proc);
}
public Activity createActivityFromProcessor(Processor processor,
Profile profile) {
Activity activity = new Activity();
activity.setName(processor.getName());
activity.setParent(profile);
createActivityPortsFromProcessor(activity, processor);
bindActivityToProcessorByMatchingPorts(activity, processor);
return activity;
}
public Configuration setAsNestedWorkflow(Processor processor, Workflow childWorkflow, Profile profile) {
if(processor.getParent() == null) {
throw new IllegalStateException("Processor " + processor + " has no parent");
}
if(processor.getParent().getParent() != childWorkflow.getParent()) {
throw new IllegalStateException(
"Processor " + processor + " and workflow " + childWorkflow + " are not in the same Workflow bundle");
}
if(nestedWorkflowForProcessor(processor, profile) != null) {
throw new IllegalStateException("Processor " + processor + " already has a nested workflow");
}
List<ProcessorBinding> processorBindings = processorBindingsForProcessor(processor, profile);
if(processorBindings.size() != 0) {
throw new IllegalStateException("Processor " + processor + "already has a binding")
}
Activity activity = createActivityFromProcessor(processor, profile);
activity.setType(NESTED_WORKFLOW);
Configuration configuration = createConfigurationFor(activity, NESTED_WORKFLOW);
ObjectNode json = configuration.getJsonAsObjectNode();
json.put("nestedWorkflow", childWorkflow.getName());
childWorkflow.setParent(processor.getParent().getParent());
return configuration;
}
public void removePortsBindingForUnknownPorts(ProcessorBinding binding) {
// First, remove ports no longer owned by processor
Iterator<ProcessorInputPortBinding> inputBindings = binding
.getInputPortBindings().iterator();
Activity activity = binding.getBoundActivity();
Processor processor = binding.getBoundProcessor();
for (ProcessorInputPortBinding ip : iterable(inputBindings)) {
if (!activity.getInputPorts().contains(ip.getBoundActivityPort())) {
inputBindings.remove();
continue;
}
if (!processor.getInputPorts().contains(ip.getBoundProcessorPort())) {
inputBindings.remove();
continue;
}
}
Iterator<ProcessorOutputPortBinding> outputBindings = binding
.getOutputPortBindings().iterator();
for (ProcessorOutputPortBinding op : iterable(outputBindings)) {
if (!activity.getOutputPorts().contains(op.getBoundActivityPort())) {
outputBindings.remove();
continue;
}
if (!processor.getOutputPorts()
.contains(op.getBoundProcessorPort())) {
outputBindings.remove();
continue;
}
}
}
public void updateBindingByMatchingPorts(ProcessorBinding binding) {
removePortsBindingForUnknownPorts(binding);
bindActivityToProcessorByMatchingPorts(binding);
}
private <T> Iterable<T> iterable(final Iterator<T> it) {
return new Iterable<T>() {
@Override
public Iterator<T> iterator() {
return it;
}
};
}
public static URI CONSTANT = URI
.create("http://ns.taverna.org.uk/2010/activity/constant");
public static URI CONSTANT_CONFIG = CONSTANT.resolve("#Config");
public Processor createConstant(Workflow workflow, Profile profile,
String name) {
Processor processor = new Processor(null, name);
workflow.getProcessors().addWithUniqueName(processor);
processor.setParent(workflow);
OutputProcessorPort valuePort = new OutputProcessorPort(processor,
CONSTANT_VALUE_PORT);
valuePort.setDepth(0);
valuePort.setGranularDepth(0);
Activity activity = createActivityFromProcessor(processor, profile);
activity.setType(CONSTANT);
createConfigurationFor(activity, CONSTANT_CONFIG);
return processor;
}
public Configuration createConfigurationFor(Activity activity,
URI configType) {
Profile profile = activity.getParent();
Configuration config = new Configuration(activity.getName());
profile.getConfigurations().addWithUniqueName(config);
config.setParent(profile);
config.setConfigures(activity);
config.setType(configType);
return config;
}
public Configuration createConfigurationFor(Processor processor,
Profile profile) {
Configuration config = new Configuration(processor.getName() + "-proc");
profile.getConfigurations().addWithUniqueName(config);
config.setParent(profile);
config.setConfigures(processor);
config.setType(Processor.CONFIG_TYPE);
return config;
}
public void setConstantStringValue(Processor constant, String value,
Profile profile) {
Configuration config = configurationForActivityBoundToProcessor(
constant, profile);
config.getJsonAsObjectNode().put(CONSTANT_STRING, value);
}
public String getConstantStringValue(Processor constant, Profile profile) {
Configuration config = configurationForActivityBoundToProcessor(
constant, profile);
return config.getJson().get(CONSTANT_STRING).asText();
}
public Set<Processor> getConstants(Workflow workflow, Profile profile) {
Set<Processor> procs = new LinkedHashSet<>();
for (Configuration config : profile.getConfigurations()) {
Configurable configurable = config.getConfigures();
if (!CONSTANT.equals(configurable.getType())
|| !(configurable instanceof Activity))
continue;
for (ProcessorBinding bind : processorBindingsToActivity((Activity) configurable))
procs.add(bind.getBoundProcessor());
}
return procs;
}
public void createDefaultIterationStrategyStack(Processor p) {
p.setIterationStrategyStack(new IterationStrategyStack());
CrossProduct crossProduct = new CrossProduct();
for (InputProcessorPort in : p.getInputPorts()) {
// As this is a NamedSet the above will always be in
// the same alphabetical order
// FIXME: What about different Locales?
crossProduct.add(new PortNode(crossProduct, in));
}
p.getIterationStrategyStack().add(crossProduct);
}
}