blob: 718079a138f5462fa2fe571e573b7724323a73aa [file] [log] [blame]
/*******************************************************************************
* Copyright (C) 2007 The University of Manchester
*
* Modifications to the initial code base are copyright of their
* respective authors, or their employers as appropriate.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU Lesser General Public License
* as published by the Free Software Foundation; either version 2.1 of
* the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful, but
* WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* Lesser General Public License for more details.
*
* You should have received a copy of the GNU Lesser General Public
* License along with this program; if not, write to the Free Software
* Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
******************************************************************************/
package net.sf.taverna.t2.workflowmodel.processor.dispatch.layers;
import static java.lang.System.currentTimeMillis;
import java.beans.XMLDecoder;
import java.beans.XMLEncoder;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.InputStream;
import java.sql.Timestamp;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import net.sf.taverna.t2.invocation.Event;
import net.sf.taverna.t2.provenance.item.ActivityProvenanceItem;
import net.sf.taverna.t2.provenance.item.ErrorProvenanceItem;
import net.sf.taverna.t2.provenance.item.InputDataProvenanceItem;
import net.sf.taverna.t2.provenance.item.IterationProvenanceItem;
import net.sf.taverna.t2.provenance.item.OutputDataProvenanceItem;
import net.sf.taverna.t2.provenance.item.ProcessProvenanceItem;
import net.sf.taverna.t2.provenance.item.ProcessorProvenanceItem;
import net.sf.taverna.t2.provenance.item.ProvenanceItem;
import net.sf.taverna.t2.provenance.item.WorkflowProvenanceItem;
import net.sf.taverna.t2.provenance.reporter.ProvenanceReporter;
import net.sf.taverna.t2.reference.ReferenceService;
import net.sf.taverna.t2.workflowmodel.Dataflow;
import net.sf.taverna.t2.workflowmodel.processor.activity.Activity;
import net.sf.taverna.t2.workflowmodel.processor.activity.AsynchronousActivity;
import net.sf.taverna.t2.workflowmodel.processor.dispatch.AbstractDispatchLayer;
import net.sf.taverna.t2.workflowmodel.processor.dispatch.events.DispatchCompletionEvent;
import net.sf.taverna.t2.workflowmodel.processor.dispatch.events.DispatchErrorEvent;
import net.sf.taverna.t2.workflowmodel.processor.dispatch.events.DispatchJobEvent;
import net.sf.taverna.t2.workflowmodel.processor.dispatch.events.DispatchJobQueueEvent;
import net.sf.taverna.t2.workflowmodel.processor.dispatch.events.DispatchResultEvent;
import org.apache.log4j.Logger;
/**
* Sits above the {@link Invoke} layer and collects information about the
* current workflow run to be stored by the {@link ProvenanceConnector}.
*
* @author Ian Dunlop
* @author Stian Soiland-Reyes
*
*/
public class IntermediateProvenance extends AbstractDispatchLayer<String> {
public static final String URI = "http://ns.taverna.org.uk/2010/scufl2/taverna/dispatchlayer/IntermediateProvenance";
private static final Logger logger = Logger.getLogger(IntermediateProvenance.class);
private ProvenanceReporter reporter;
private Map<String, Map<String, IterationProvenanceItem>> processToIndexes = new HashMap<>();
private Map<ActivityProvenanceItem, List<Object>> activityProvenanceItemMap = new HashMap<>();
private Map<InputDataProvenanceItem, List<Object>> inputDataProvenanceItemMap = new HashMap<>();
// private List<ActivityProvenanceItem> activityProvenanceItemList = new ArrayList<>();
// private List<InputDataProvenanceItem> inputDataProvenanceItemList = new ArrayList<>();
private WorkflowProvenanceItem workflowItem;
@Override
public void configure(String o) {
}
/**
* A set of provenance events for a particular owning process has been
* finished with so you can remove all the {@link IterationProvenanceItem}s
* from the map
*/
@Override
public void finishedWith(String owningProcess) {
processToIndexes.remove(owningProcess);
}
@Override
public String getConfiguration() {
return null;
}
protected Map<String, IterationProvenanceItem> getIndexesByProcess(
String owningProcess) {
synchronized (processToIndexes) {
Map<String, IterationProvenanceItem> indexes = processToIndexes
.get(owningProcess);
if (indexes == null) {
indexes = new HashMap<>();
processToIndexes.put(owningProcess, indexes);
}
return indexes;
}
}
protected IterationProvenanceItem getIterationProvItem(Event<?> event) {
String owningProcess = event.getOwningProcess();
int[] originalIndex = event.getIndex();
int[] index = event.getIndex();
String indexStr = indexStr(index);
Map<String, IterationProvenanceItem> indexes = getIndexesByProcess(owningProcess);
IterationProvenanceItem iterationProvenanceItem = null;
synchronized (indexes) {
// find the iteration item for the int index eg [1]
iterationProvenanceItem = indexes.get(indexStr);
// if it is null then strip the index and look again
while (iterationProvenanceItem == null) {
try {
index = removeLastIndex(index);
iterationProvenanceItem = indexes.get(indexStr(index));
/*
* if we have a 'parent' iteration then create a new
* iteration for the original index and link it to the
* activity and the input data
*
* FIXME should this be linked to the parent iteration
* instead?
*/
if (iterationProvenanceItem != null) {
// set the index to the one from the event
IterationProvenanceItem iterationProvenanceItem1 = new IterationProvenanceItem();
iterationProvenanceItem1.setIteration(originalIndex);
iterationProvenanceItem1.setProcessId(owningProcess);
iterationProvenanceItem1.setIdentifier(UUID
.randomUUID().toString());
iterationProvenanceItem1.setWorkflowId(workflowItem.getParentId());
iterationProvenanceItem1.setParentIterationItem(iterationProvenanceItem);
iterationProvenanceItem1.setParentId(iterationProvenanceItem.getParentId());
iterationProvenanceItem1.setInputDataItem(iterationProvenanceItem.getInputDataItem());
// for (Entry<ActivityProvenanceItem, List<Object>> entrySet : activityProvenanceItemMap
// .entrySet()) {
// List<Object> value = entrySet.getValue();
// int[] newIndex = (int[]) value.get(0);
// String owner = (String) value.get(1);
// String indexString = indexStr(newIndex);
// String indexString2 = indexStr(index);
//
// if (owningProcess.equalsIgnoreCase(owner)
// && indexString
// .equalsIgnoreCase(indexString2))
// iterationProvenanceItem1.setParentId(entrySet
// .getKey().getIdentifier());
// }
// for (Entry<InputDataProvenanceItem, List<Object>> entrySet : inputDataProvenanceItemMap
// .entrySet()) {
// List<Object> value = entrySet.getValue();
// int[] newIndex = (int[]) value.get(0);
// String owner = (String) value.get(1);
// String indexString = indexStr(newIndex);
// String indexString2 = indexStr(index);
// if (owningProcess.equalsIgnoreCase(owner)
// && indexString
// .equalsIgnoreCase(indexString2))
// iterationProvenanceItem1
// .setInputDataItem(entrySet.getKey());
// }
// for (ActivityProvenanceItem item :
// activityProvenanceItemList) {
// if (owningProcess.equalsIgnoreCase(item
// .getProcessId())) {
// iterationProvenanceItem1.setParentId(item
// .getIdentifier());
// }
// }
// for (InputDataProvenanceItem item :
// inputDataProvenanceItemList) {
// if (owningProcess.equalsIgnoreCase(item
// .getProcessId())) {
// iterationProvenanceItem1.setInputDataItem(item);
// }
// indexes.put(indexStr, iterationProvenanceItem1);
// return iterationProvenanceItem1;
// // }
// }
// add this new iteration item to the map
getIndexesByProcess(event.getOwningProcess()).put(
indexStr(event.getIndex()),
iterationProvenanceItem1);
return iterationProvenanceItem1;
}
/*
* if we have not found an iteration items and the index is
* [] then something is wrong remove the last index in the
* int array before we go back through the while
*/
} catch (IllegalStateException e) {
logger
.warn("Cannot find a parent iteration with index [] for owning process: "
+ owningProcess
+ "Workflow invocation is in an illegal state");
throw e;
}
}
// if (iterationProvenanceItem == null) {
// logger.info("Iteration item was null for: "
// + event.getOwningProcess() + " " + event.getIndex());
// System.out.println("Iteration item was null for: "
// + event.getOwningProcess() + " " + event.getIndex());
// iterationProvenanceItem = new IterationProvenanceItem(index);
// iterationProvenanceItem.setProcessId(owningProcess);
// iterationProvenanceItem.setIdentifier(UUID.randomUUID()
// .toString());
// // for (ActivityProvenanceItem
// item:activityProvenanceItemList)
// // {
// // if (owningProcess.equalsIgnoreCase(item.getProcessId())) {
// // iterationProvenanceItem.setParentId(item.getIdentifier());
// // }
// // }
// // for (InputDataProvenanceItem item:
// // inputDataProvenanceItemList) {
// // if (owningProcess.equalsIgnoreCase(item.getProcessId())) {
// // iterationProvenanceItem.setInputDataItem(item);
// // }
// // }
// indexes.put(indexStr, iterationProvenanceItem);
}
return iterationProvenanceItem;
}
private String indexStr(int[] index) {
StringBuilder indexStr = new StringBuilder();
for (int ind : index)
indexStr.append(":").append(ind);
return indexStr.toString();
}
/**
* Remove the last index of the int array in the form 1:2:3 etc
*
* @param index
* @return
*/
@SuppressWarnings("unused")
private String[] stripLastIndex(int[] index) {
// will be in form :1:2:3
return indexStr(index).split(":");
}
/**
* Remove the last value in the int array
*
* @param index
* @return
*/
private int[] removeLastIndex(int[] index) {
if (index.length == 0)
throw new IllegalStateException(
"There is no parent iteration of index [] for this result");
int[] newIntArray = new int[index.length - 1];
for (int i = 0; i < index.length - 1; i++)
newIntArray[i] = index[i];
return newIntArray;
}
private static String uuid() {
return UUID.randomUUID().toString();
}
/**
* Create an {@link ErrorProvenanceItem} and send across to the
* {@link ProvenanceConnector}
*/
@Override
public void receiveError(DispatchErrorEvent errorEvent) {
IterationProvenanceItem iterationProvItem = getIterationProvItem(errorEvent);
// get using errorEvent.getOwningProcess();
ErrorProvenanceItem errorItem = new ErrorProvenanceItem();
errorItem.setCause(errorEvent
.getCause());
errorItem.setErrorType(errorEvent
.getFailureType().toString());
errorItem.setMessage(errorEvent.getMessage());
errorItem.setProcessId(errorEvent.getOwningProcess());
errorItem.setIdentifier(uuid());
errorItem.setParentId(iterationProvItem.getIdentifier());
// iterationProvItem.setErrorItem(errorItem);
// FIXME don't need to add to the processor item earlier
getReporter().addProvenanceItem(errorItem);
super.receiveError(errorEvent);
}
/**
* Create the {@link ProvenanceItem}s and send them all across to the
* {@link ProvenanceConnector} except for the
* {@link IterationProvenanceItem}, this one is told what it's inputs are
* but has to wait until the results are received before being sent across.
* Each item has a unique identifier and also knows who its parent item is
*/
@Override
public void receiveJob(DispatchJobEvent jobEvent) {
try {
// FIXME do we need this ProcessProvenanceItem?
ProcessProvenanceItem provenanceItem;
String[] split = jobEvent.getOwningProcess().split(":");
provenanceItem = new ProcessProvenanceItem();
String parentDataflowId = workflowItem.getParentId();
provenanceItem.setWorkflowId(parentDataflowId);
provenanceItem.setFacadeID(split[0]);
provenanceItem.setDataflowID(split[1]);
provenanceItem.setProcessId(jobEvent.getOwningProcess());
provenanceItem.setIdentifier(uuid());
provenanceItem.setParentId(workflowItem.getIdentifier());
ProcessorProvenanceItem processorProvItem;
processorProvItem = new ProcessorProvenanceItem();
processorProvItem.setWorkflowId(parentDataflowId);
processorProvItem.setProcessId(jobEvent
.getOwningProcess());
processorProvItem.setIdentifier(uuid());
processorProvItem.setParentId(provenanceItem.getIdentifier());
provenanceItem.setProcessId(jobEvent.getOwningProcess());
getReporter().addProvenanceItem(provenanceItem);
getReporter().addProvenanceItem(processorProvItem);
IterationProvenanceItem iterationProvItem = null;
iterationProvItem = new IterationProvenanceItem();
iterationProvItem.setWorkflowId(parentDataflowId);
iterationProvItem.setIteration(jobEvent.getIndex());
iterationProvItem.setIdentifier(uuid());
ReferenceService referenceService = jobEvent.getContext()
.getReferenceService();
InputDataProvenanceItem inputDataItem = new InputDataProvenanceItem();
inputDataItem.setDataMap(jobEvent.getData());
inputDataItem.setReferenceService(referenceService);
inputDataItem.setIdentifier(uuid());
inputDataItem.setParentId(iterationProvItem.getIdentifier());
inputDataItem.setProcessId(jobEvent.getOwningProcess());
List<Object> inputIndexOwnerList = new ArrayList<>();
inputIndexOwnerList.add(jobEvent.getIndex());
inputIndexOwnerList.add(jobEvent.getOwningProcess());
inputDataProvenanceItemMap.put(inputDataItem, inputIndexOwnerList);
// inputDataProvenanceItemList.add(inputDataItem);
iterationProvItem.setInputDataItem(inputDataItem);
iterationProvItem.setIteration(jobEvent.getIndex());
iterationProvItem.setProcessId(jobEvent.getOwningProcess());
for (Activity<?> activity : jobEvent.getActivities())
if (activity instanceof AsynchronousActivity) {
ActivityProvenanceItem activityProvItem = new ActivityProvenanceItem();
activityProvItem.setWorkflowId(parentDataflowId);
activityProvItem.setIdentifier(uuid());
iterationProvItem.setParentId(activityProvItem.getIdentifier());
// getConnector().addProvenanceItem(iterationProvItem);
activityProvItem.setParentId(processorProvItem.getIdentifier());
// processorProvItem.setActivityProvenanceItem(activityProvItem);
activityProvItem.setProcessId(jobEvent.getOwningProcess());
List<Object> activityIndexOwnerList = new ArrayList<>();
activityIndexOwnerList.add(jobEvent.getOwningProcess());
activityIndexOwnerList.add(jobEvent.getIndex());
activityProvenanceItemMap.put(activityProvItem,
inputIndexOwnerList);
// activityProvenanceItemList.add(activityProvItem);
// activityProvItem.setIterationProvenanceItem(iterationProvItem);
getReporter().addProvenanceItem(activityProvItem);
break;
}
getIndexesByProcess(jobEvent.getOwningProcess()).put(
indexStr(jobEvent.getIndex()), iterationProvItem);
iterationProvItem.setEnactmentStarted(new Timestamp(currentTimeMillis()));
getReporter().addProvenanceItem(iterationProvItem);
} catch (RuntimeException ex) {
logger.error("Could not store provenance for " + jobEvent, ex);
}
super.receiveJob(jobEvent);
}
@Override
public void receiveJobQueue(DispatchJobQueueEvent jobQueueEvent) {
super.receiveJobQueue(jobQueueEvent);
}
/**
* Populate an {@link OutputDataProvenanceItem} with the results and attach
* it to the appropriate {@link IterationProvenanceItem}. Then send the
* {@link IterationProvenanceItem} across to the {@link ProvenanceConnector}
*/
@Override
public void receiveResult(DispatchResultEvent resultEvent) {
try {
// FIXME use the connector from the result event context
IterationProvenanceItem iterationProvItem = getIterationProvItem(resultEvent);
iterationProvItem.setEnactmentEnded(new Timestamp(currentTimeMillis()));
ReferenceService referenceService = resultEvent.getContext()
.getReferenceService();
OutputDataProvenanceItem outputDataItem = new OutputDataProvenanceItem();
outputDataItem.setDataMap(resultEvent.getData());
outputDataItem.setReferenceService(referenceService);
outputDataItem.setIdentifier(uuid());
outputDataItem.setProcessId(resultEvent.getOwningProcess());
outputDataItem.setParentId(iterationProvItem.getIdentifier());
iterationProvItem.setOutputDataItem(outputDataItem);
getReporter().addProvenanceItem(iterationProvItem);
// getConnector().addProvenanceItem(outputDataItem);
// PM -- testing
// add xencoding of data value here??
// Map<String, T2Reference> inputDataMap = iterationProvItem.getInputDataItem().getDataMap();
// for(Map.Entry<String, T2Reference> entry:inputDataMap.entrySet()) {
// // create a simpler bean that we can serialize?
//
// T2Reference ref = entry.getValue();
//
// SimplerT2Reference t2RefBean = new SimplerT2Reference();
// t2RefBean.setReferenceType(ref.getReferenceType());
// t2RefBean.setDepth(ref.getDepth());
// t2RefBean.setLocalPart(ref.getLocalPart());
// t2RefBean.setNamespacePart(ref.getNamespacePart());
//
// System.out.println("data ref: "+ref);
// String serializedInput = SerializeParam(t2RefBean);
// System.out.println("serialized reference:" + serializedInput);
//
// System.out.println(referenceService.renderIdentifier(entry.getValue(), String.class, resultEvent.getContext()));
// }
} catch (Exception ex) {
logger.error("Could not store provenance for "
+ resultEvent.getOwningProcess() + " "
+ Arrays.toString(resultEvent.getIndex()), ex);
// But don't break super.receiveResult() !!
}
super.receiveResult(resultEvent);
}
@Override
public void receiveResultCompletion(DispatchCompletionEvent completionEvent) {
super.receiveResultCompletion(completionEvent);
}
/**
* Tell this layer what {@link ProvenanceConnector} implementation is being
* used to capture the {@link ProvenanceItem}s. NOTE: should probably use
* the connector from the result events context where possible
*
* @param connector
*/
public void setReporter(ProvenanceReporter connector) {
this.reporter = connector;
}
public ProvenanceReporter getReporter() {
return reporter;
}
/**
* So that the {@link ProvenanceItem}s know which {@link Dataflow} has been
* enacted this layer has to know about the {@link WorkflowProvenanceItem}
*
* @param workflowItem
*/
public void setWorkflow(WorkflowProvenanceItem workflowItem) {
this.workflowItem = workflowItem;
}
// TODO is this unused?
public static String SerializeParam(Object ParamValue) {
ByteArrayOutputStream BStream = new ByteArrayOutputStream();
XMLEncoder encoder = new XMLEncoder(BStream);
encoder.writeObject(ParamValue);
encoder.close();
return BStream.toString();
}
// TODO is this unused?
public static Object DeserializeParam(String SerializedParam) {
InputStream IStream = new ByteArrayInputStream(
SerializedParam.getBytes());
XMLDecoder decoder = new XMLDecoder(IStream);
Object output = decoder.readObject();
decoder.close();
return output;
}
}