blob: 390e0fe0102be6dfdc21fb0544ce04abb601fb5b [file] [log] [blame]
/*******************************************************************************
* Copyright (C) 2007-2008 The University of Manchester
*
* Modifications to the initial code base are copyright of their
* respective authors, or their employers as appropriate.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU Lesser General Public License
* as published by the Free Software Foundation; either version 2.1 of
* the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful, but
* WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* Lesser General Public License for more details.
*
* You should have received a copy of the GNU Lesser General Public
* License along with this program; if not, write to the Free Software
* Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
******************************************************************************/
package net.sf.taverna.t2.workflowmodel.impl;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import net.sf.taverna.t2.invocation.InvocationContext;
import net.sf.taverna.t2.invocation.WorkflowDataToken;
import net.sf.taverna.t2.reference.IdentifiedList;
import net.sf.taverna.t2.reference.ListService;
import net.sf.taverna.t2.reference.T2Reference;
import net.sf.taverna.t2.workflowmodel.EventForwardingOutputPort;
import net.sf.taverna.t2.workflowmodel.InputPort;
import net.sf.taverna.t2.workflowmodel.Merge;
import net.sf.taverna.t2.workflowmodel.MergeInputPort;
import net.sf.taverna.t2.workflowmodel.WorkflowStructureException;
import net.sf.taverna.t2.workflowmodel.processor.iteration.IterationTypeMismatchException;
import org.apache.log4j.Logger;
/**
* Implementation of {@link Merge}
*
* @author Tom Oinn
* @author Stian Soiland-Reyes
*
*/
public class MergeImpl implements Merge {
@SuppressWarnings("unused")
private static Logger logger = Logger.getLogger(MergeImpl.class);
private List<MergeInputPortImpl> inputs = new ArrayList<MergeInputPortImpl>();
private String name;
private BasicEventForwardingOutputPort output;
private Map<String, List<T2Reference>> partialOutputsByProcess = new HashMap<String, List<T2Reference>>();
public MergeImpl(String mergeName) {
super();
this.name = mergeName;
this.output = new MergeOutputPortImpl(this, name+"_output", 0, 0);
}
public String getLocalName() {
return this.name;
}
/**
* Adds a new input port to the internal list of ports.
*
* @param inputPort
* the MergeInputPortImpl
*/
public void addInputPort(MergeInputPortImpl inputPort) {
inputs.add(inputPort);
}
/**
* Removes an input port from the internal list of ports.
*
* @param inputPort
*/
public void removeInputPort(MergeInputPortImpl inputPort) {
inputs.remove(inputPort);
}
public List<? extends MergeInputPort> getInputPorts() {
return inputs;
}
public EventForwardingOutputPort getOutputPort() {
return this.output;
}
/**
* Return the index of the port with the specified name, or -1 if the port
* can't be found (this is a bad thing!)
*
* @param portName
* @return
*/
private int inputPortNameToIndex(String portName) {
int i = 0;
for (InputPort ip : inputs) {
if (ip.getName().equals(portName)) {
return i;
}
i++;
}
return -1; // FIXME: as the javadoc states, this is a bad thing!
}
protected void receiveEvent(WorkflowDataToken token, String portName) {
List<T2Reference> outputList;
String owningProcess = token.getOwningProcess();
synchronized (partialOutputsByProcess) {
outputList = partialOutputsByProcess.get(owningProcess);
if (outputList == null) {
int numPorts = getInputPorts().size();
outputList = new ArrayList<T2Reference>(Collections.nCopies(numPorts, (T2Reference)null));
partialOutputsByProcess.put(owningProcess, outputList);
}
}
int portIndex = inputPortNameToIndex(portName);
if (portIndex == -1) {
throw new WorkflowStructureException(
"Received event on unknown port " + portName);
}
int[] currentIndex = token.getIndex();
int[] newIndex = new int[currentIndex.length + 1];
newIndex[0] = portIndex;
for (int i = 0; i < currentIndex.length; i++) {
newIndex[i + 1] = currentIndex[i];
}
InvocationContext context = token.getContext();
output.sendEvent(new WorkflowDataToken(owningProcess,
newIndex, token.getData(), context));
if (token.getIndex().length == 0) {
// Add to completion list
synchronized (outputList) {
if (outputList.size() <= portIndex) {
// Ports changed after initiating running as our list is
// smaller than portIndex
throw new WorkflowStructureException(
"Unexpected addition of output port " + portName
+ " at " + portIndex);
}
if (outputList.get(portIndex) != null) {
throw new WorkflowStructureException(
"Already received completion for port " + portName
+ " " + outputList.get(portIndex));
}
outputList.set(portIndex, token.getData());
if (!outputList.contains(null)) {
// We're finished, let's register and send out the list
ListService listService = context.getReferenceService()
.getListService();
IdentifiedList<T2Reference> registeredList = listService
.registerList(outputList);
WorkflowDataToken workflowDataToken = new WorkflowDataToken(
owningProcess, new int[0], registeredList.getId(),
context);
synchronized (partialOutputsByProcess) {
partialOutputsByProcess.remove(owningProcess);
}
output.sendEvent(workflowDataToken);
}
}
}
}
/**
* There is only ever a single output from a merge node but the token
* processing entity interface defines a list, in this case it always
* contains exactly one item.
*/
public List<? extends EventForwardingOutputPort> getOutputPorts() {
List<EventForwardingOutputPort> result = new ArrayList<EventForwardingOutputPort>();
result.add(output);
return result;
}
public boolean doTypeCheck() throws IterationTypeMismatchException {
if (inputs.size() == 0) {
// Arguable, but technically a merge with no inputs is valid, it may
// make more sense to throw an exception here though as it has no
// actual meaning.
return true;
}
// Return false if we have unbound input ports or bound ports where the
// resolved depth hasn't been calculated yet
for (MergeInputPort ip : inputs) {
if (ip.getIncomingLink() == null
|| ip.getIncomingLink().getResolvedDepth() == -1) {
return false;
}
}
// Got all input ports, now scan for input depths
int inputDepth = inputs.get(0).getIncomingLink().getResolvedDepth();
for (MergeInputPort ip : inputs) {
if (ip.getIncomingLink().getResolvedDepth() != inputDepth) {
throw new IterationTypeMismatchException();
}
}
// Got to here so all the input resolved depths match, push depth+1 to
// all outgoing links and return true
for (DatalinkImpl dli : output.outgoingLinks) {
dli.setResolvedDepth(inputDepth+1);
}
return true;
}
@SuppressWarnings("unchecked")
public void reorderInputPorts(
List<? extends MergeInputPort> reorderedInputPortList) {
// Just set the inputs to the already reordered list of ports
inputs = (List<MergeInputPortImpl>) reorderedInputPortList;
}
}