blob: 3e0463f6e8432dc15753b8cab686b06f5fdb919c [file] [log] [blame]
/*******************************************************************************
* Copyright (C) 2007 The University of Manchester
*
* Modifications to the initial code base are copyright of their
* respective authors, or their employers as appropriate.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU Lesser General Public License
* as published by the Free Software Foundation; either version 2.1 of
* the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful, but
* WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* Lesser General Public License for more details.
*
* You should have received a copy of the GNU Lesser General Public
* License along with this program; if not, write to the Free Software
* Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
******************************************************************************/
package net.sf.taverna.t2.facade.impl;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicLong;
import net.sf.taverna.t2.facade.FailureListener;
import net.sf.taverna.t2.facade.ResultListener;
import net.sf.taverna.t2.facade.WorkflowInstanceFacade;
import net.sf.taverna.t2.invocation.InvocationContext;
import net.sf.taverna.t2.invocation.TokenOrderException;
import net.sf.taverna.t2.invocation.WorkflowDataToken;
import net.sf.taverna.t2.monitor.MonitorManager;
import net.sf.taverna.t2.monitor.MonitorNode;
import net.sf.taverna.t2.monitor.MonitorableProperty;
import net.sf.taverna.t2.provenance.item.DataflowRunComplete;
import net.sf.taverna.t2.provenance.item.WorkflowDataProvenanceItem;
import net.sf.taverna.t2.provenance.item.WorkflowProvenanceItem;
import net.sf.taverna.t2.utility.TypedTreeModel;
import net.sf.taverna.t2.workflowmodel.Dataflow;
import net.sf.taverna.t2.workflowmodel.DataflowInputPort;
import net.sf.taverna.t2.workflowmodel.DataflowOutputPort;
import net.sf.taverna.t2.workflowmodel.DataflowValidationReport;
import net.sf.taverna.t2.workflowmodel.EditException;
import net.sf.taverna.t2.workflowmodel.Edits;
import net.sf.taverna.t2.workflowmodel.EditsRegistry;
import net.sf.taverna.t2.workflowmodel.InvalidDataflowException;
import net.sf.taverna.t2.workflowmodel.Processor;
import net.sf.taverna.t2.workflowmodel.processor.dispatch.DispatchLayer;
import net.sf.taverna.t2.workflowmodel.processor.dispatch.DispatchStack;
import net.sf.taverna.t2.workflowmodel.processor.dispatch.layers.ErrorBounce;
import net.sf.taverna.t2.workflowmodel.processor.dispatch.layers.IntermediateProvenance;
import org.apache.log4j.Logger;
/**
* Implementation of {@link WorkflowInstanceFacade}
*
* @author Tom Oinn
* @author Stian Soiland-Reyes
* @author Ian Dunlop
* @author Alex Nenadic
*
*/
public class WorkflowInstanceFacadeImpl implements WorkflowInstanceFacade {
private static Logger logger = Logger
.getLogger(WorkflowInstanceFacadeImpl.class);
protected static AtomicLong owningProcessId = new AtomicLong(0);
private InvocationContext context;
public InvocationContext getContext() {
return context;
}
private Dataflow dataflow;
private ResultListener facadeResultListener;
private String instanceOwningProcessId;
private String localName;
private MonitorManager monitorManager = MonitorManager.getInstance();
private boolean pushDataCalled = false;
protected List<FailureListener> failureListeners = new ArrayList<FailureListener>();
protected List<ResultListener> resultListeners = new ArrayList<ResultListener>();
private boolean provEnabled = false;
public WorkflowInstanceFacadeImpl(final Dataflow dataflow,
InvocationContext context, String parentProcess)
throws InvalidDataflowException {
DataflowValidationReport report = dataflow.checkValidity();
if (!report.isValid()) {
throw new InvalidDataflowException(dataflow, report);
}
this.dataflow = dataflow;
this.context = context;
this.localName = "facade" + owningProcessId.getAndIncrement();
if (parentProcess.equals("")) {
this.instanceOwningProcessId = localName;
// Add this WorkflowInstanceFacade to the map of all workflow run IDs
// against the corresponding WorkflowInstanceFacadeS/
WorkflowInstanceFacade.workflowRunFacades.put(localName, this);
// Note that we do not put the IDs for nested workflows, just for the main ones!
} else {
this.instanceOwningProcessId = parentProcess + ":" + localName;
}
WorkflowProvenanceItem workflowItem = null;
if (context.getProvenanceReporter() != null) {
provEnabled = true;
workflowItem = new WorkflowProvenanceItem();
workflowItem.setDataflow(dataflow);
workflowItem.setProcessId(instanceOwningProcessId);
workflowItem.setIdentifier(UUID.randomUUID().toString());
workflowItem.setParentId(dataflow.getInternalIdentier());
addProvenanceLayerToProcessors(dataflow, workflowItem);
context.getProvenanceReporter().setSessionID(workflowItem.getIdentifier());
context.getProvenanceReporter().addProvenanceItem(workflowItem);
}
facadeResultListener = new FacadeResultListener(dataflow, workflowItem);
}
private void addProvenanceLayerToProcessors(Dataflow dataflow2, WorkflowProvenanceItem workflowItem) {
for (Processor processor : dataflow.getProcessors()) {
DispatchStack dispatchStack = processor.getDispatchStack();
List<DispatchLayer<?>> layers = dispatchStack.getLayers();
boolean provAlreadyAdded = false;
for (DispatchLayer<?> layer : layers) {
if (layer instanceof IntermediateProvenance) {
provAlreadyAdded = true;
}
}
if (provAlreadyAdded) {
continue;
}
for (int j = 0; j < layers.size(); j++) {
if (! (layers.get(j) instanceof ErrorBounce)) {
continue;
}
DispatchLayer<?> provenance = new IntermediateProvenance();
IntermediateProvenance intermediateProvenance = (IntermediateProvenance) provenance;
intermediateProvenance.setWorkflow(workflowItem);
intermediateProvenance.setReporter(context
.getProvenanceReporter());
Edits edits = EditsRegistry.getEdits();
try {
edits.getAddDispatchLayerEdit(dispatchStack, provenance,
j).doEdit();
break;
} catch (EditException e) {
logger.warn("adding provenance layer to dispatch stack failed "
+ e.toString());
}
}
}
}
public void addFailureListener(FailureListener listener) {
failureListeners.add(listener);
}
public synchronized void addResultListener(ResultListener listener) {
if (resultListeners.isEmpty()) {
for (DataflowOutputPort port : dataflow.getOutputPorts()) {
port.addResultListener(facadeResultListener);
}
}
resultListeners.add(listener);
}
public void fire() throws IllegalStateException {
if (pushDataCalled)
throw new IllegalStateException(
"Data has already been pushed, fire must be called first!");
monitorManager.registerNode(this, instanceOwningProcessId.split(":"),
new HashSet<MonitorableProperty<?>>());
dataflow.fire(instanceOwningProcessId, context);
}
public Dataflow getDataflow() {
return dataflow;
}
public TypedTreeModel<MonitorNode> getStateModel() {
// TODO WorkflowInstanceFacade.getStateModel not yet implemented
return null;
}
public void pushData(WorkflowDataToken token, String portName)
throws TokenOrderException {
// TODO: throw TokenOrderException when token stream is violates order
// constraints.
for (DataflowInputPort port : dataflow.getInputPorts()) {
if (portName.equals(port.getName())) {
port.receiveEvent(token.pushOwningProcess(localName));
}
}
pushDataCalled = true;
}
public void removeFailureListener(FailureListener listener) {
failureListeners.remove(listener);
}
public synchronized void removeResultListener(ResultListener listener) {
resultListeners.remove(listener);
if (resultListeners.isEmpty()) {
for (DataflowOutputPort port : dataflow.getOutputPorts()) {
port.removeResultListener(facadeResultListener);
}
}
}
protected class FacadeResultListener implements ResultListener {
private int portsToComplete;
private final WorkflowProvenanceItem workflowItem;
public FacadeResultListener(Dataflow dataflow,
WorkflowProvenanceItem workflowItem) {
this.workflowItem = workflowItem;
portsToComplete = dataflow.getOutputPorts().size();
}
public void resultTokenProduced(WorkflowDataToken token, String portName) {
if (!instanceOwningProcessId.equals(token.getOwningProcess())) {
return;
}
if (provEnabled) {
WorkflowDataProvenanceItem workflowDataProvenanceItem = new WorkflowDataProvenanceItem();
workflowDataProvenanceItem.setPortName(portName);
workflowDataProvenanceItem.setData(token.getData());
workflowDataProvenanceItem.setReferenceService(context.getReferenceService());
workflowDataProvenanceItem.setParentId(workflowItem.getIdentifier());
workflowDataProvenanceItem.setIdentifier(UUID.randomUUID().toString());
workflowDataProvenanceItem.setParentId(instanceOwningProcessId);
workflowDataProvenanceItem.setProcessId(instanceOwningProcessId);
workflowDataProvenanceItem.setIndex(token.getIndex());
workflowDataProvenanceItem.setFinal(token.isFinal());
context.getProvenanceReporter().addProvenanceItem(
workflowDataProvenanceItem);
}
synchronized (this) {
if (token.getIndex().length == 0) {
portsToComplete--;
}
if (portsToComplete == 0) {
// Received complete events on all ports, can
// un-register this node from the monitor
MonitorManager.getInstance().deregisterNode(
instanceOwningProcessId.split(":"));
if (provEnabled) {
DataflowRunComplete dataflowRunComplete = new DataflowRunComplete();
dataflowRunComplete.setParentId(workflowItem.getIdentifier());
dataflowRunComplete
.setProcessId(instanceOwningProcessId);
dataflowRunComplete.setIdentifier(UUID.randomUUID().toString());
context.getProvenanceReporter().addProvenanceItem(
dataflowRunComplete);
}
}
}
ArrayList<ResultListener> copyOfListeners = new ArrayList<ResultListener>(
resultListeners);
for (ResultListener resultListener : copyOfListeners) {
try {
resultListener.resultTokenProduced(
token.popOwningProcess(), portName);
} catch (RuntimeException ex) {
logger.warn("Could not notify result listener "
+ resultListener, ex);
}
}
}
}
}