blob: 27fdbe4eac69078753ef5cb70c147a1973b8bc7c [file] [log] [blame]
/*******************************************************************************
* Copyright (C) 2007 The University of Manchester
*
* Modifications to the initial code base are copyright of their
* respective authors, or their employers as appropriate.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU Lesser General Public License
* as published by the Free Software Foundation; either version 2.1 of
* the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful, but
* WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* Lesser General Public License for more details.
*
* You should have received a copy of the GNU Lesser General Public
* License along with this program; if not, write to the Free Software
* Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
******************************************************************************/
package net.sf.taverna.t2.provenance.lineageservice;
import java.beans.ExceptionListener;
import java.beans.XMLEncoder;
import java.io.BufferedOutputStream;
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.StringReader;
import java.io.StringWriter;
import java.sql.Blob;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import javax.sql.rowset.serial.SerialBlob;
import net.sf.taverna.t2.provenance.item.InputDataProvenanceItem;
import net.sf.taverna.t2.provenance.item.IterationProvenanceItem;
import net.sf.taverna.t2.provenance.item.OutputDataProvenanceItem;
import net.sf.taverna.t2.provenance.item.ProvenanceItem;
import net.sf.taverna.t2.provenance.item.WorkflowProvenanceItem;
import net.sf.taverna.t2.provenance.lineageservice.utils.Arc;
import net.sf.taverna.t2.provenance.lineageservice.utils.NestedListNode;
import net.sf.taverna.t2.provenance.lineageservice.utils.ProcBinding;
import net.sf.taverna.t2.provenance.lineageservice.utils.ProvenanceUtils;
import net.sf.taverna.t2.provenance.lineageservice.utils.Var;
import net.sf.taverna.t2.provenance.lineageservice.utils.VarBinding;
import net.sf.taverna.t2.provenance.vocabulary.SharedVocabulary;
import net.sf.taverna.t2.workflowmodel.Dataflow;
import net.sf.taverna.t2.workflowmodel.DataflowInputPort;
import net.sf.taverna.t2.workflowmodel.DataflowOutputPort;
import net.sf.taverna.t2.workflowmodel.Datalink;
import net.sf.taverna.t2.workflowmodel.Processor;
import net.sf.taverna.t2.workflowmodel.ProcessorInputPort;
import net.sf.taverna.t2.workflowmodel.ProcessorOutputPort;
import net.sf.taverna.t2.workflowmodel.processor.activity.Activity;
import net.sf.taverna.t2.workflowmodel.serialization.xml.XMLSerializerRegistry;
import org.apache.commons.io.FileUtils;
import org.apache.log4j.Level;
import org.apache.log4j.Logger;
import org.jdom.Document;
import org.jdom.Element;
import org.jdom.JDOMException;
import org.jdom.Namespace;
import org.jdom.input.SAXBuilder;
import org.jdom.output.XMLOutputter;
/**
* @author Paolo Missier
*/
public class EventProcessor {
/**
* A map of UUIDs of the originating processor to the ProcBinding object
* that contains its parameters
*/
private Map<String, ProcBinding> procBindingMap = new ConcurrentHashMap<String, ProcBinding>();;
/** A map of child ids to their parents in the hierarchy of events:
* workflow -> process -> processor -> activity -> iteration
*/
private Map<String, String> parentChildMap= new ConcurrentHashMap<String, String>();
private static Logger logger = Logger.getLogger(EventProcessor.class);
private static final String OUTPUT_CONTAINER_PROCESSOR = "_OUTPUT_";
private static final String INPUT_CONTAINER_PROCESSOR = "_INPUT_";
private static final String TEST_EVENTS_FOLDER = "/tmp/TEST-EVENTS";
private static final String DATAFLOW_PROCESSOR_TYPE = "net.sf.taverna.t2.activities.dataflow.DataflowActivity";
private int eventCnt = 0; // for events logging
private volatile boolean workflowStructureDone = false; // used to inhibit processing of multiple workflow events -- we only need the first
private int dataflowDepth = 0; // incremented when we recurse on a subflow, decremented on exit
private volatile String wfInstanceID = null; // unique run ID. set when we see the first event of type "process"
String topLevelDataflowName = null;
String topLevelDataflowID = null;
Map<String, String> wfNestingMap = new ConcurrentHashMap<String, String>();
// all input bindings are accumulated here so they can be "backpatched" (see backpatching() )
List<VarBinding> allInputVarBindings = Collections.synchronizedList(new ArrayList<VarBinding>());
// dedicated class for processing WorkflowData events which carry workflow output info
private WorkflowDataProcessor wfdp;
private ProvenanceWriter pw = null;
private ProvenanceQuery pq = null;
public EventProcessor() { }
/**
* @param pw
* @throws SQLException
* @throws ClassNotFoundException
* @throws IllegalAccessException
* @throws InstantiationException
*
*/
public EventProcessor(ProvenanceWriter pw, ProvenanceQuery pq, WorkflowDataProcessor wfdp)
throws InstantiationException, IllegalAccessException,
ClassNotFoundException, SQLException {
this.pw = pw;
this.pq = pq;
this.wfdp = wfdp;
logger.setLevel((Level)Level.INFO);
}
/**
* this is the new version that makes use of the T2 deserializer
* populate static portion of the DB<br/>
* the static structure may already be in the DB -- this is detected as a duplicate top-level workflow ID.
* In this case, we skip this processing altogether
* @param content
* is a serialized dataflow (XML) -- this is parsed using the T2
* Deserializer
* @return the wfInstanceRef for this workflow structure
*/
public String processWorkflowStructure(ProvenanceItem provenanceItem) {
// this flag is set to prevent processing of separate workflowProvenanceItems that describe nested workflows.
// the processing of all nested workflows is done as part of the very first workflowProvenanceItem that we receive,
// which is self-consistent. so we ignore all others
if (workflowStructureDone) { return null; }
setWfInstanceID(((WorkflowProvenanceItem)provenanceItem).getIdentifier());
// logger.debug("Workflow instance is: " + getWfInstanceID());
Dataflow df = null;
df = ((WorkflowProvenanceItem)provenanceItem).getDataflow();
workflowStructureDone = true;
topLevelDataflowName = df.getLocalName();
topLevelDataflowID = df.getInternalIdentier();
// check whether we already have this WF in the DB
List<String> wfNames = null;
try {
wfNames = pq.getAllWFnames();
} catch (SQLException e) {
logger.warn("Problem processing workflow structure", e);
}
if (wfNames != null && wfNames.contains(topLevelDataflowID)) { // already in the DB
// logger.info("workflow structure with ID "+topLevelDataflowID+" is in the DB -- clearing static portion");
// clearing the portion of the static DB that pertains to this specific WF.
// it is going to be rewritten right away in the rest of this method
// this is simpler to implement than selectively avoiding duplicate writes to the DB
try {
pw.clearDBStatic(topLevelDataflowID);
} catch (SQLException e) {
logger.warn("Can't clear static database for " + topLevelDataflowID, e);
}
} else {
// logger.info("new workflow structure with ID "+topLevelDataflowID);
}
// record the top level dataflow as a processor in the DB
try {
pw.addProcessor(topLevelDataflowName, DATAFLOW_PROCESSOR_TYPE, topLevelDataflowID, true); // true -> is top level
} catch (SQLException e) {
logger.warn("Can't add processor " + topLevelDataflowID, e);
}
// logger.info("top level wf name: "+topLevelDataflowName);
return processDataflowStructure(df, topLevelDataflowID, df.getLocalName()); // null: no external name given to top level dataflow
}
/**
* note: this method can be called as part of a recursion on sub-workflows
* @param df
* @param dataflowID the UUID for the entire dataflow (may be a sub-dataflow)
* @param localName the external name of the dataflow. Null if this is top level, not null if a sub-dataflow
* @return the wfInstanceRef for this workflow structure
*/
@SuppressWarnings("unchecked")
public String processDataflowStructure(Dataflow df, String dataflowID, String externalName) {
String localWfInstanceID = getWfInstanceID();
dataflowDepth++;
try {
List<Var> vars = new ArrayList<Var>();
// check whether we already have this WF in the DB
List<String> wfNames = null;
try {
wfNames = pq.getAllWFnames();
} catch (SQLException e) {
logger.warn("Problem processing dataflow structure for " + dataflowID, e);
}
if (wfNames != null && wfNames.contains(dataflowID)) { // already in the DB
// logger.info("workflow structure with ID "+dataflowID+" is in the DB -- clearing static portion");
// clearing the portion of the static DB that pertains to this specific WF.
// it is going to be rewritten right away in the rest of this method
// this is simpler to implement than selectively avoiding duplicate writes to the DB
pw.clearDBStatic(dataflowID);
} else {
// logger.warn("new workflow structure with ID "+dataflowID);
}
// //////
// add workflow ID -- this is NOT THE SAME AS the wfInstanceID
// /////
// this could be a nested workflow -- in this case, override its wfInstanceID with that of its parent
String parentDataflow;
if ((parentDataflow = wfNestingMap.get(dataflowID)) == null) {
Element serializeDataflow = XMLSerializerRegistry.getInstance().getSerializer().serializeDataflow(df);
String dataflowString = null;
try {
XMLOutputter outputter = new XMLOutputter();
StringWriter stringWriter = new StringWriter();
outputter.output(serializeDataflow, stringWriter);
dataflowString = stringWriter.toString();
} catch (java.io.IOException e) {
logger.error("Could not serialise dataflow", e);
}
Blob blob = new SerialBlob(dataflowString.getBytes("UTF-8"));
// this is a top level dataflow description
pw.addWFId(dataflowID, null, externalName, blob); // set its dataflowID with no parent
} else {
Element serializeDataflow = XMLSerializerRegistry.getInstance().getSerializer().serializeDataflow(df);
String dataflowString = null;
try {
XMLOutputter outputter = new XMLOutputter();
StringWriter stringWriter = new StringWriter();
outputter.output(serializeDataflow, stringWriter);
dataflowString = stringWriter.toString();
} catch (java.io.IOException e) {
logger.error("Could not serialise dataflow", e);
}
Blob blob = new SerialBlob(dataflowString.getBytes("UTF-8"));
// we are processing a nested workflow structure
logger.debug("dataflow "+dataflowID+" with external name "+externalName+" is nested within "+parentDataflow);
pw.addWFId(dataflowID, parentDataflow, externalName, blob); // set its dataflowID along with its parent
// override wfInstanceID to point to top level -- UNCOMMENTED PM 9/09 CHECK
localWfInstanceID = pq.getRuns(parentDataflow, null).get(0).getInstanceID();
// logger.debug("overriding nested WFRef "+getWfInstanceID()+" with parent WFRef "+localWfInstanceID);
}
pw.addWFInstanceId(dataflowID, localWfInstanceID); // wfInstanceID stripped by stripWfInstanceHeader() above
// //////
// add processors along with their variables
// /////
List<? extends Processor> processors = df.getProcessors();
for (Processor p : processors) {
// logger.info("adding processor "+p.getLocalName());
String pName = p.getLocalName();
//CHECK get type of first activity and set this as the type of the processor itself
List<? extends Activity<?>> activities = p.getActivityList();
String pType = null;
if (activities != null && !activities.isEmpty()) {
pType = activities.get(0).getClass().getCanonicalName();
}
pw.addProcessor(pName, pType, dataflowID, false); // false: not a top level processor
// ///
// add all input ports for this processor as input variables
// ///
List<? extends ProcessorInputPort> inputs = p.getInputPorts();
for (ProcessorInputPort ip : inputs) {
Var inputVar = new Var();
inputVar.setPName(pName);
inputVar.setWfInstanceRef(dataflowID);
inputVar.setVName(ip.getName());
inputVar.setTypeNestingLevel(ip.getDepth());
inputVar.setInput(true);
// logger.info("processDataflowStructure: adding input var "+pName+":"+ip.getName());
vars.add(inputVar);
}
// ///
// add all output ports for this processor as output variables
// ///
List<? extends ProcessorOutputPort> outputs = p
.getOutputPorts();
for (ProcessorOutputPort op : outputs) {
Var outputVar = new Var();
outputVar.setPName(pName);
outputVar.setWfInstanceRef(dataflowID);
outputVar.setVName(op.getName());
outputVar.setTypeNestingLevel(op.getDepth());
outputVar.setInput(false);
vars.add(outputVar);
}
// check for nested structures: if the activity is DataflowActivity
// then this processor is a nested workflow
// make an entry into wfNesting map with its ID and recurse on the nested workflow
for (Activity a:activities) {
if (a.getClass().getCanonicalName().contains("DataflowActivity" )) {
Dataflow nested = (Dataflow) a.getConfiguration();
// logger.info("RECURSION ON nested workflow: "+p.getLocalName()+" with id: "+nested.getInternalIdentier());
wfNestingMap.put(nested.getInternalIdentier(), dataflowID); // child -> parent
//////////////
/// RECURSIVE CALL
//////////////
processDataflowStructure(nested, nested.getInternalIdentier(), p.getLocalName());
//List<? extends Processor> procs = nested.getProcessors();
// for (Processor nestedP:procs) {
// System.out.println("recursion on nested processor: "+nestedP.getLocalName());
// }
}
}
} // end for each processor
// ////
// add inputs to entire dataflow
// ////
String pName = INPUT_CONTAINER_PROCESSOR; // overridden -- see below
// check whether we are processing a nested workflow. in this case
// the input vars are not assigned to the INPUT processor but to the containing dataflow
if (externalName != null) { // override the default if we are nested or someone external name is provided
pName = externalName;
}
List<? extends DataflowInputPort> inputPorts = df.getInputPorts();
for (DataflowInputPort ip : inputPorts) {
Var inputVar = new Var();
inputVar.setPName(pName);
inputVar.setWfInstanceRef(dataflowID);
inputVar.setVName(ip.getName());
inputVar.setTypeNestingLevel(ip.getDepth());
inputVar.setInput(true); // CHECK PM modified 11/08 -- input vars are actually outputs of input processors...
vars.add(inputVar);
}
// ////
// add outputs of entire dataflow
// ////
pName = OUTPUT_CONTAINER_PROCESSOR; // overridden -- see below
// check whether we are processing a nested workflow. in this case
// the output vars are not assigned to the OUTPUT processor but to the containing dataflow
if (externalName != null) { // we are nested
pName = externalName;
}
List<? extends DataflowOutputPort> outputPorts = df
.getOutputPorts();
for (DataflowOutputPort op : outputPorts) {
Var outputVar = new Var();
outputVar.setPName(pName);
outputVar.setWfInstanceRef(dataflowID);
outputVar.setVName(op.getName());
outputVar.setTypeNestingLevel(op.getDepth());
outputVar.setInput(false); // CHECK PM modified 11/08 -- output vars are actually outputs of output processors...
vars.add(outputVar);
}
pw.addVariables(vars, dataflowID);
// ////
// add arc records using the dataflow links
// retrieving the processor names requires navigating from links to
// source/sink and from there to the processors
// ////
List<? extends Datalink> links = df.getLinks();
for (Datalink l : links) {
// TODO cover the case of arcs from an input and to an output to
// the entire dataflow
String sourcePname = null;
String sinkPname = null;
if (l.getSource() instanceof ProcessorOutputPort) {
sourcePname = ((ProcessorOutputPort) l.getSource())
.getProcessor().getLocalName();
} else {
// System.out.println("found link from dataflow input");
}
if (l.getSink() instanceof ProcessorInputPort) {
sinkPname = ((ProcessorInputPort) l.getSink())
.getProcessor().getLocalName();
} else {
// System.out.println("found link to dataflow output");
}
if (sourcePname != null && sinkPname != null) {
// System.out.println("adding regular internal arc");
pw.addArc(l.getSource().getName(), sourcePname, l.getSink()
.getName(), sinkPname, dataflowID);
} else if (sourcePname == null) {
// link is from dataflow input or subflow input
if (externalName != null) { // link from subflow input
sourcePname = externalName;
} else {
sourcePname = INPUT_CONTAINER_PROCESSOR;
}
//Ian added this logic since there were some null sinkPnameRefs with merge ports
if (sinkPname == null) {
// link is to dataflow output
if (externalName != null) { // link from subflow input
sinkPname = externalName;
} else {
sinkPname = OUTPUT_CONTAINER_PROCESSOR;
}
}
// System.out.println("adding arc from dataflow input");
pw.addArc(l.getSource().getName(),
sourcePname, l.getSink().getName(),
sinkPname, dataflowID);
} else if (sinkPname == null) {
// link is to dataflow output
if (externalName != null) { // link from subflow input
sinkPname = externalName;
} else {
sinkPname = OUTPUT_CONTAINER_PROCESSOR;
}
//Ian added this bit at the same time as the null sinkPnameRef logic above - hope it is correct
if (sourcePname == null) {
// link is from dataflow input or subflow input
if (externalName != null) { // link from subflow input
sourcePname = externalName;
} else {
sourcePname = INPUT_CONTAINER_PROCESSOR;
}
}
// System.out.println("adding arc to dataflow output");
pw.addArc(l.getSource().getName(), sourcePname, l.getSink()
.getName(), sinkPname,
dataflowID);
}
}
// logger.info("completed processing dataflow " + dataflowID);
} catch (Exception e) {
logger.error("Problem processing provenance for dataflow", e);
}
// logger.debug("wfInstanceID at the end of processDataflowStructure: "+getWfInstanceID());
return dataflowID;
}
private Element stripWfInstanceHeader(String content) {
SAXBuilder b = new SAXBuilder();
Document d;
try {
d = b.build (new StringReader(content));
// get identifier from <workflowItem> element
Element root = d.getRootElement();
setWfInstanceID(root.getAttributeValue("identifier"));
Namespace ns = Namespace.getNamespace("http://taverna.sf.net/2008/xml/t2flow");
Element workflowEl = root.getChild("workflow", ns);
return workflowEl;
} catch (JDOMException e) {
logger.warn("Problem stripping workflow instance header", e);
} catch (IOException e) {
logger.warn("Problem stripping workflow instance header", e);
}
return null;
}
/**
* processes an elementary process execution event from T2. Collects info
* from events as they happen and sends them to the writer for processing
* when the iteration event is received. Uses the map of procBindings to
* process event id and the map of child ids to parent ids to ensure that
* the correct proc binding is used
* @param currentWorkflowID
*
* @param d
* @param context
*/
public void processProcessEvent(ProvenanceItem provenanceItem, String currentWorkflowID) {
if (provenanceItem.getEventType().equals(SharedVocabulary.PROCESS_EVENT_TYPE)) {
String parentId = provenanceItem.getParentId(); // this is the workflowID
String identifier = provenanceItem.getIdentifier(); // use this as wfInstanceID if this is the top-level process
parentChildMap.put(identifier, parentId);
ProcBinding pb = new ProcBinding();
pb.setExecIDRef(getWfInstanceID());
pb.setWfNameRef(currentWorkflowID);
procBindingMap.put(identifier, pb);
} else if (provenanceItem.getEventType().equals(SharedVocabulary.PROCESSOR_EVENT_TYPE)) {
String identifier = provenanceItem.getIdentifier();
String parentId = provenanceItem.getParentId();
String processID = provenanceItem.getProcessId(); // this is the external process ID
// this has the weird form facade0:dataflowname:pname need to extract pname from here
String[] processName = processID.split(":");
procBindingMap.get(parentId).setPNameRef(processName[processName.length-1]); // 3rd component of composite name
parentChildMap.put(identifier, parentId);
} else if (provenanceItem.getEventType().equals(SharedVocabulary.ACTIVITY_EVENT_TYPE)) {
String identifier = provenanceItem.getIdentifier();
String parentId = provenanceItem.getParentId();
procBindingMap.get(parentChildMap.get(parentId)).setActName(identifier);
parentChildMap.put(identifier, parentId);
} else if (provenanceItem.getEventType().equals(SharedVocabulary.ITERATION_EVENT_TYPE)) {
// traverse up to root to retrieve ProcBinding that was created when we saw the process event
String iterationID = provenanceItem.getIdentifier();
String activityID = provenanceItem.getParentId();
String processorID = parentChildMap.get(activityID);
String processID = parentChildMap.get(processorID);
parentChildMap.put(iterationID, activityID);
parentChildMap.put(iterationID, activityID);
ProcBinding procBinding = procBindingMap.get(processID);
String itVector = extractIterationVector(ProvenanceUtils.iterationToString(((IterationProvenanceItem)provenanceItem).getIteration()));
procBinding.setIterationVector(itVector);
InputDataProvenanceItem inputDataEl = ((IterationProvenanceItem)provenanceItem).getInputDataItem();
OutputDataProvenanceItem outputDataEl = ((IterationProvenanceItem)provenanceItem).getOutputDataItem();
processInput(inputDataEl, procBinding, currentWorkflowID);
processOutput(outputDataEl, procBinding, currentWorkflowID);
try {
getPw().addProcessorBinding(procBinding);
} catch (SQLException e) {
logger.debug("provenance has duplicate processor binding -- skipping the insertion"); //, e);
}
} else if (provenanceItem.getEventType().equals(SharedVocabulary.END_WORKFLOW_EVENT_TYPE)) {
// use this event to do housekeeping on the input/output varbindings
dataflowDepth--;
if (dataflowDepth == 0) {
// process the outputs accumulated by WorkflowDataProcessor
getWfdp().processTrees(provenanceItem.getWorkflowId(), getWfInstanceID());
patchTopLevelnputs();
// PM changed 23/4/09
reconcileTopLevelOutputs(); // patchTopLevelOutputs
workflowStructureDone = false; // CHECK reset for next run...
}
} else if (provenanceItem.getEventType().equals(SharedVocabulary.WORKFLOW_DATA_EVENT_TYPE)) {
// give this event to a WorkflowDataProcessor object for pre-processing
// try {
// TODO may generate an exception when the data is an error CHECK
getWfdp().addWorkflowDataItem(provenanceItem);
// } catch (NumberFormatException e) {
// logger.error(e);
// }
// logger.info("Received workflow data - not processing");
//FIXME not sure - needs to be stored somehow
} else if (provenanceItem.getEventType().equals((SharedVocabulary.ERROR_EVENT_TYPE))) {
//TODO process the error
} else {
// TODO broken, should we throw something here?
return;
}
}
/**
* fills in the VBs for the global inputs -- this removes the need for explicit events
* that account for these value bindings...
*/
public void patchTopLevelnputs() {
// for each input I to topLevelDataflow:
// pick first outgoing arc with sink P:X
// copy value X to I -- this can be a collection, so copy everything
// get all global input vars
// logger.info("\n\n BACKPATCHING GLOBAL INPUTS with dataflowDepth = "+dataflowDepth+"*******\n");
List<Var> inputs=null;
try {
inputs = getPq().getInputVars(topLevelDataflowName, topLevelDataflowID, getWfInstanceID());
for (Var input:inputs) {
// logger.info("global input: "+input.getVName());
Map<String,String> queryConstraints = new HashMap<String,String>();
queryConstraints.put("sourceVarNameRef", input.getVName());
queryConstraints.put("sourcePNameRef", input.getPName());
List<Arc> outgoingArcs = getPq().getArcs(queryConstraints);
// any arc will do, use the first
String targetPname = outgoingArcs.get(0).getSinkPnameRef();
String targetVname = outgoingArcs.get(0).getSinkVarNameRef();
// logger.info("copying values from ["+targetPname+":"+targetVname+"] for instance ID: ["+wfInstanceID+"]");
queryConstraints.clear();
queryConstraints.put("varNameRef", targetVname);
queryConstraints.put("V.pNameRef", targetPname);
queryConstraints.put("VB.wfInstanceRef", getWfInstanceID());
queryConstraints.put("V.wfInstanceRef", topLevelDataflowID);
List<VarBinding> VBs = getPq().getVarBindings(queryConstraints);
// logger.info("found the following VBs:");
for (VarBinding vb:VBs) {
// logger.info(vb.getValue());
// insert VarBinding back into VB with the global input varname
vb.setPNameRef(input.getPName());
vb.setVarNameRef(input.getVName());
getPw().addVarBinding(vb);
// logger.info("added");
}
}
} catch (SQLException e) {
logger.warn("Patch top level inputs problem for provenance", e);
} catch (IndexOutOfBoundsException e) {
logger.error("Could not patch top level", e);
}
}
// PM added 23/4/09
/**
* reconcile the top level outputs with the results from its immediate precedessors in the graph.<br/>
* various cases have to be considered: predecessors may include records that are not in the output,
* while the output may include nested list structures that are not in the precedessors. This method accounts
* for a 2-way reconciliation that considers all possible cases.<br/>
* at the end, outputs and their predecessors contain the same data.<p/>
* NOTE: if we assume that data values (URIs) are <em>always</em> unique then this is greatly simplified by just
* comparing two sets of value records by their URIs and reconciling them. But this is not the way it is done here
*/
public void reconcileTopLevelOutputs() {
/*
for each output O
for each variable V in predecessors(O)
fetch all VB records for O into list OValues
fetch all VB records for V into list Yalues
compare OValues and VValues:
it SHOULD be the case that OValues is a subset of YValues. Under this assumption:
for each vb in YValues:
- if there is a matching o in OValues then (vb may be missing collection information)
copy o to vb
else
if vb has no collection info && there is a matching tree node tn in OTree (use iteration index for the match) then
set vb to be in collection tb
copy vb to o
finally copy all Collection records for O in OTree -- catch duplicate errors
*/
Map<String,String> queryConstraints = new HashMap<String,String>();
List<Var> outputs=null;
try {
outputs = pq.getOutputVars(topLevelDataflowName, topLevelDataflowID, null); // null InstanceID
// for each output O
for (Var output:outputs) {
// collect all VBs for O
// String oPName = output.getPName();
// String oVName = output.getVName();
// queryConstraints.put("varNameRef", oVName);
// queryConstraints.put("V.pNameRef", oPName);
// queryConstraints.put("VB.wfInstanceRef", wfInstanceID);
// queryConstraints.put("V.wfInstanceRef", topLevelDataflowID);
// List<VarBinding> OValues = pq.getVarBindings(queryConstraints);
// find all records for the immediate precedessor Y of O
queryConstraints.clear();
queryConstraints.put("sinkVarNameRef", output.getVName());
queryConstraints.put("sinkPNameRef", output.getPName());
List<Arc> incomingArcs = pq.getArcs(queryConstraints);
// there can be only one -- but check that there is one!
if (incomingArcs.size()==0) continue;
String sourcePname = incomingArcs.get(0).getSourcePnameRef();
String sourceVname = incomingArcs.get(0).getSourceVarNameRef();
queryConstraints.clear();
queryConstraints.put("varNameRef", sourceVname);
queryConstraints.put("V.pNameRef", sourcePname);
queryConstraints.put("VB.wfInstanceRef", getWfInstanceID());
queryConstraints.put("V.wfInstanceRef", topLevelDataflowID);
List<VarBinding> YValues = pq.getVarBindings(queryConstraints);
// for each YValue look for a match in OValues
// (assume the YValues values are a superset of OValues)!)
for (VarBinding yValue:YValues) {
// System.out.println("reconcileTopLevelOutputs:: processing "+
// yValue.getPNameRef()+"/"+yValue.getVarNameRef()+"/"+yValue.getValue()+
// " with collid "+yValue.getCollIDRef());
// look for a matching record in VarBinding for output O
queryConstraints.clear();
queryConstraints.put("varNameRef", output.getVName());
queryConstraints.put("V.pNameRef", output.getPName());
queryConstraints.put("VB.wfInstanceRef", getWfInstanceID());
queryConstraints.put("V.wfInstanceRef", topLevelDataflowID);
queryConstraints.put("VB.iteration", yValue.getIteration());
if (yValue.getCollIDRef()!= null) {
queryConstraints.put("VB.collIDRef", yValue.getCollIDRef());
queryConstraints.put("VB.positionInColl", Integer.toString(yValue.getPositionInColl()));
}
List<VarBinding> matchingOValues = pq.getVarBindings(queryConstraints);
// System.out.println("querying for matching oValues: ");
// result at most size 1
if (matchingOValues.size() > 0) {
VarBinding oValue = matchingOValues.get(0);
// System.out.println("found "+oValue.getPNameRef()+"/"+oValue.getVarNameRef()+"/"+oValue.getValue()+
// " with collid "+oValue.getCollIDRef());
// copy collection info from oValue to yValue
yValue.setCollIDRef(oValue.getCollIDRef());
yValue.setPositionInColl(oValue.getPositionInColl());
pw.updateVarBinding(yValue);
// System.out.println("oValue copied to yValue");
} else {
// System.out.println("no match found");
// copy the yValue to O
// insert VarBinding back into VB with the global output varname
yValue.setPNameRef(output.getPName());
yValue.setVarNameRef(output.getVName());
pw.addVarBinding(yValue);
}
} // for each yValue in YValues
// copy all Collection records for O to Y
// get all collections refs for O
queryConstraints.clear();
queryConstraints.put("wfInstanceRef", getWfInstanceID());
queryConstraints.put("PNameRef", output.getPName());
queryConstraints.put("varNameRef", output.getVName());
List<NestedListNode> oCollections = pq.getNestedListNodes(queryConstraints);
// insert back as collection refs for Y -- catch duplicates
for (NestedListNode nln:oCollections) {
// System.out.println("collection: "+nln.getCollId());
nln.setPNameRef(sourcePname);
nln.setPNameRef(sourceVname);
getPw().replaceCollectionRecord(nln, sourcePname, sourceVname);
}
} // for each output var
} catch (SQLException e) {
logger.warn("Problem reconciling top level outputs", e);
}
}
@SuppressWarnings("unchecked")
private void processOutput(OutputDataProvenanceItem provenanceItem, ProcBinding procBinding, String currentWorkflowID) {
Element dataItemAsXML = ProvenanceUtils.getDataItemAsXML(provenanceItem);
List<Element> outputPorts = dataItemAsXML.getChildren("port");
for (Element outputport : outputPorts) {
String portName = outputport.getAttributeValue("name");
// value type may vary
List<Element> valueElements = outputport.getChildren();
if (valueElements != null && valueElements.size() > 0) {
Element valueEl = valueElements.get(0); // only really 1 child
processVarBinding(valueEl, procBinding.getPNameRef(), portName, procBinding.getIterationVector(),
getWfInstanceID(), currentWorkflowID);
}
}
}
/**
* this method reconciles values in varBindings across an arc: Firstly, if vb's value is within a collection,
* _and_ it is copied from a value generated during a previous iteration,
* then this method propagates the list reference to that iteration value, which wouldn't have it.
* Conversely, if vb is going to be input to an iteration, then it's lost its containing list node, and we
* put it back in by looking at the corresponding predecessor
* @param vb
* @throws SQLException
*/
private void backpatchIterationResults(List<VarBinding> newBindings) throws SQLException {
logger.debug("backpatchIterationResults: start");
for (VarBinding vb:newBindings) {
logger.debug("backpatchIterationResults: processing vb "+vb.getPNameRef()+"/"+vb.getVarNameRef()+"="+vb.getValue());
if (vb.getCollIDRef()!= null) { // this is a member of a collection
logger.debug("...which is inside a collection ");
}
// look for its antecedent
Map<String,String> queryConstraints = new HashMap<String,String>();
queryConstraints.put("sinkVarNameRef", vb.getVarNameRef());
queryConstraints.put("sinkPNameRef", vb.getPNameRef());
queryConstraints.put("wfInstanceRef", pq.getWfNames(vb.getWfInstanceRef()).get(0)); // CHECK picking first element in list...
List<Arc> incomingArcs = pq.getArcs(queryConstraints);
// there can be only one -- but check that there is one!
if (incomingArcs.size()==0) return;
String sourcePname = incomingArcs.get(0).getSourcePnameRef();
String sourceVname = incomingArcs.get(0).getSourceVarNameRef();
logger.debug("antecedent: "+sourcePname+":"+sourceVname);
// get the varbindings for this port and select the one with the same iteration vector as its successor
queryConstraints.clear();
queryConstraints.put("varNameRef", sourceVname);
queryConstraints.put("V.pNameRef", sourcePname);
queryConstraints.put("VB.value", vb.getValue());
queryConstraints.put("VB.wfInstanceRef", vb.getWfInstanceRef());
List<VarBinding> VBs = pq.getVarBindings(queryConstraints);
if (VBs.size() == 0) { logger.debug("nothing to reconcile"); }
// reconcile
for (VarBinding b:VBs) {
logger.debug("backpatching "+sourceVname+" "+sourcePname);
if (vb.getCollIDRef() != null && b.getCollIDRef() == null) {
logger.debug("successor "+vb.getVarNameRef()+" is in collection "+vb.getCollIDRef()+" but pred "+b.getVarNameRef()+" is not");
logger.debug("putting "+b.getVarNameRef()+" in collection "+vb.getCollIDRef()+" at pos "+vb.getPositionInColl());
b.setCollIDRef(vb.getCollIDRef());
b.setPositionInColl(vb.getPositionInColl());
getPw().updateVarBinding(b);
} else if (vb.getCollIDRef() == null && b.getCollIDRef() != null) {
logger.debug("successor "+vb.getVarNameRef()+" is NOT in collection but pred "+b.getVarNameRef()+" IS");
logger.debug("putting "+vb.getVarNameRef()+" in collection "+b.getCollIDRef()+" at pos "+b.getPositionInColl());
vb.setCollIDRef(b.getCollIDRef());
vb.setPositionInColl(b.getPositionInColl());
getPw().updateVarBinding(vb);
}
}
}
}
/**
* create one new VarBinding record for each input port binding
* @param currentWorkflowID
*/
@SuppressWarnings("unchecked")
private void processInput(InputDataProvenanceItem provenanceItem, ProcBinding procBinding, String currentWorkflowID) {
Element dataItemAsXML = ProvenanceUtils.getDataItemAsXML(provenanceItem);
List<Element> inputPorts = dataItemAsXML.getChildren("port");
int order = 0;
for (Element inputport : inputPorts) {
String portName = inputport.getAttributeValue("name");
// logger.info("processInput: processing VarBinding for "+procBinding.getPNameRef()+" "+portName);
try {
// add process order sequence to Var for this portName
Map<String, String> queryConstraints = new HashMap<String, String>();
queryConstraints.put("wfInstanceRef", currentWorkflowID);
queryConstraints.put("pnameRef", procBinding.getPNameRef());
queryConstraints.put("varName", portName);
queryConstraints.put("inputOrOutput", "1");
List<Var> vars = getPq().getVars(queryConstraints);
try {
Var v = vars.get(0);
v.setPortNameOrder(order++);
getPw().updateVar(v);
}
catch (IndexOutOfBoundsException e) {
logger.error("Could not process input " + portName, e);
}
} catch (SQLException e1) {
logger.error("Could not process input " + portName, e1);
}
// value type may vary
List<Element> valueElements = inputport.getChildren(); // hopefully
// in the
// right
// order...
if (valueElements != null && valueElements.size() > 0) {
Element valueEl = valueElements.get(0); // expect only 1 child
// processVarBinding(valueEl, processor, portName, iterationVector,
// dataflow);
List<VarBinding> newBindings =
processVarBinding(valueEl, procBinding.getPNameRef(), portName, procBinding.getIterationVector(),
getWfInstanceID(), currentWorkflowID);
// this is a list whenever valueEl is of type list: in this case processVarBinding recursively
// processes all values within the collection, and generates one VarBinding record for each of them
allInputVarBindings.addAll(newBindings);
// logger.debug("newBindings now has "+newBindings.size()+" elements");
// // if the new binding involves list values, then check to see if they need to be propagated back to
// // results of iterations
try {
backpatchIterationResults(newBindings);
} catch (SQLException e) {
logger.warn("Problem with back patching iteration results", e);
}
} else {
if (valueElements != null) logger.debug("port name "+portName+" "+valueElements.size());
else logger.debug("valueElements is null for port name "+portName);
}
}
}
/**
* capture the default case where the value is not a list
*
* @param valueEl
* @param processorId
* @param portName
* @param iterationId
* @param wfInstanceRef
* @param currentWorkflowID
*/
private List<VarBinding> processVarBinding(Element valueEl, String processorId,
String portName, String iterationId, String wfInstanceRef, String currentWorkflowID) {
// uses the defaults:
// collIdRef = null
// parentcollectionRef = null
// positionInCollection = 1
return processVarBinding(valueEl, processorId, portName, null, 1, null,
iterationId, wfInstanceRef, null, currentWorkflowID);
}
/**
* general case where value can be a list
* @param valueEl
* @param processorId
* @param portName
* @param collIdRef
* @param positionInCollection
* @param parentCollectionRef
* @param iterationId
* @param wfInstanceRef
* @param currentWorkflowID
*/
@SuppressWarnings("unchecked")
private List<VarBinding> processVarBinding(Element valueEl, String processorId,
String portName, String collIdRef, int positionInCollection,
String parentCollectionRef, String iterationId, String wfInstanceRef, String itVector, String currentWorkflowID) {
List<VarBinding> newBindings = new ArrayList<VarBinding>();
String valueType = valueEl.getName();
// logger.info("value element for " + processorId + ": "
// + valueType);
String iterationVector = null;
if (itVector == null)
iterationVector = extractIterationVector(iterationId);
else iterationVector = itVector;
VarBinding vb = new VarBinding();
vb.setWfNameRef(currentWorkflowID);
vb.setWfInstanceRef(wfInstanceRef);
vb.setPNameRef(processorId);
vb.setValueType(valueType);
vb.setVarNameRef(portName);
vb.setCollIDRef(collIdRef);
vb.setPositionInColl(positionInCollection);
newBindings.add(vb);
if (valueType.equals("literal")) {
// logger.warn("input of type literal");
try {
vb.setIterationVector(iterationVector);
vb.setValue(valueEl.getAttributeValue("id"));
logger.debug("new input VB with wfNameRef="+currentWorkflowID+" processorId="+processorId+
" valueType="+valueType+" portName="+portName+" collIdRef="+collIdRef+
" position="+positionInCollection+" itvector="+iterationVector+
" value="+vb.getValue());
// logger.info("calling addVarBinding on "+vb.getPNameRef()+" : "+vb.getVarNameRef());
getPw().addVarBinding(vb);
} catch (SQLException e) {
logger.warn("Process Var Binding problem with provenance", e);
}
} else if (valueType.equals("referenceSet")) {
vb.setIterationVector(iterationVector);
vb.setValue(valueEl.getAttributeValue("id"));
vb.setRef(valueEl.getChildText("reference"));
logger.debug("new input VB with wfNameRef="+currentWorkflowID+" processorId="+processorId+
" valueType="+valueType+" portName="+portName+" collIdRef="+collIdRef+
" position="+positionInCollection+" itvector="+iterationVector+
" value="+vb.getValue());
try {
// logger.debug("calling addVarBinding on "+vb.getPNameRef()+" : "+vb.getVarNameRef()+" with it "+vb.getIteration());
getPw().addVarBinding(vb);
} catch (SQLException e) {
logger.debug("Problem processing var binding -- performing update instead of insert", e); //, e);
// try to update the existing record instead using the current collection info
getPw().updateVarBinding(vb);
// logger.warn("VarBinding update successful");
}
} else if (valueType.equals("list")) {
logger.debug("input of type list");
// add entries to the Collection and to the VarBinding tables
// list id --> Collection.collId
String collId = valueEl.getAttributeValue("id");
try {
parentCollectionRef = getPw().addCollection(processorId, collId,
parentCollectionRef, iterationVector, portName,
wfInstanceRef);
// iterate over each list element
List<Element> listElements = valueEl.getChildren();
positionInCollection = 1; // also use this as a suffix to extend the iteration vector
// extend iteration vector to account for additional levels within the list
String originalIterationVector = iterationVector;
// children can be any base type, including list itself -- so
// use recursion
for (Element el : listElements) {
if (originalIterationVector.length() >2) { // vector is not empty
iterationVector = originalIterationVector.substring(0,
originalIterationVector.length()-1) + ","+
Integer.toString(positionInCollection-1) + "]";
} else {
iterationVector = "["+ Integer.toString(positionInCollection-1) + "]";
}
List<VarBinding> bindings = processVarBinding(el, processorId, portName, collId,
positionInCollection, parentCollectionRef,
iterationId, wfInstanceRef, iterationVector, currentWorkflowID);
newBindings.addAll(bindings);
positionInCollection++;
}
} catch (SQLException e) {
logger.warn("Problem processing var binding", e);
}
} else if (valueType.equals("error")) {
try {
vb.setIterationVector(iterationVector);
vb.setValue(valueEl.getAttributeValue("id"));
getPw().addVarBinding(vb);
} catch (SQLException e) {
logger.warn("Process Var Binding problem with provenance", e);
}
} else {
logger.warn("unrecognized value type element for "
+ processorId + ": " + valueType);
}
return newBindings;
}
/**
* OBSOLETE: returns the iteration vector x,y,z,... from [x,y,z,...]
* <p/>
* now returns the vector itself -- this is still experimental
*
* @param iteration
* @return
*/
String extractIterationVector(String iteration) {
return iteration;
// return iteration.substring(1, iteration.length() - 1);
// iteration is of the form "[n]" so we extract n
// String iterationN = iteration.substring(1, iteration.length()-1);
// if (iterationN.length() == 0) return 0;
// return Integer.parseInt(iterationN);
}
/**
* log raw event to file system
*
* @param content
* @param eventType
* @throws IOException
*/
public void saveEvent(ProvenanceItem provenanceItem, SharedVocabulary eventType) throws IOException {
// HACK -- XMLEncoder fails on IterationEvents and there is no way to catch the exception...
// so avoid this case
if (eventType.equals(SharedVocabulary.ITERATION_EVENT_TYPE)) {
return;
}
// System.out.println("saveEvent: start");
File f1 = null;
f1 = new File(TEST_EVENTS_FOLDER);
FileUtils.forceMkdir(f1);
String fname = "event_" + eventCnt++ + "_" + eventType + ".xml";
File f = new File(f1, fname);
// FileWriter fw = new FileWriter(f);
XMLEncoder en = new XMLEncoder(new BufferedOutputStream(
new FileOutputStream(f)));
en.setExceptionListener(new ExceptionListener()
{
public void exceptionThrown(Exception e)
{
logger.warn("XML encoding ERROR", e);
return;
}
});
logger.debug("saving to " + f); // save event for later inspection
logger.debug(provenanceItem);
en.writeObject(provenanceItem);
logger.debug("writer ok");
en.close();
logger.debug("closed");
// fw.write(content);
// fw.flush();
// fw.close();
// FileWriter fw = new FileWriter(f);
// fw.write(content);
// fw.flush();
// fw.close();
// System.out.println("saved as file " + fname);
}
/**
* for each arc of the form (_INPUT_/I, P/V): propagate VarBinding for P/V
* to var _INPUT_/I <br/>
*
* @throws SQLException
*/
public void fillInputVarBindings(Object context) throws SQLException {
// System.out.println("*** fillInputVarBindings: ***");
// retrieve appropriate arcs
Map<String, String> constraints = new HashMap<String, String>();
constraints.put("sourcePnameRef", "_INPUT_");
constraints.put("W.instanceID", getWfInstanceID());
List<Arc> arcs = getPq().getArcs(constraints);
// backpropagate VarBinding from the target var of the arc to the source
for (Arc aArc : arcs) {
// logger.info("propagating VarBinding from ["
// + aArc.getSinkPnameRef() + "/" + aArc.getSinkVarNameRef()
// + "] to input [" + aArc.getSourcePnameRef() + "/"
// + aArc.getSourceVarNameRef() + "]");
// get the varBinding for the arc sinks
Map<String, String> vbConstraints = new HashMap<String, String>();
vbConstraints.put("VB.PNameRef", aArc.getSinkPnameRef());
vbConstraints.put("VB.varNameRef", aArc.getSinkVarNameRef());
vbConstraints.put("VB.wfInstanceRef", getWfInstanceID());
List<VarBinding> vbList = getPq().getVarBindings(vbConstraints); // DB
// QUERY
for (VarBinding vb : vbList) {
// add a new VarBinding for the input
vb.setPNameRef(aArc.getSourcePnameRef());
vb.setVarNameRef(aArc.getSourceVarNameRef());
// all other attributes are the same --> CHECK!!
getPw().addVarBinding(vb);
}
}
}
/**
* for each arc of the form (P/V, _OUTPUT_/O): propagate VarBinding for P/V
* to var _OUTPUT_/O <br/>
*
* @throws SQLException
*/
public void fillOutputVarBindings(Object context) throws SQLException {
//System.out.println("*** fillOutputVarBindings: ***");
// retrieve appropriate arcs
Map<String, String> constraints = new HashMap<String, String>();
constraints.put("sinkPnameRef", "_OUTPUT_");
constraints.put("wfInstanceRef", getWfInstanceID());
List<Arc> arcs = getPq().getArcs(constraints);
// fowd propagate VarBinding from the source var of the arc to the
// output
for (Arc aArc : arcs) {
// logger.info("fwd propagating VarBinding from ["
// + aArc.getSourcePnameRef() + "/"
// + aArc.getSourceVarNameRef() + "] to input ["
// + aArc.getSinkPnameRef() + "/" + aArc.getSinkVarNameRef()
// + "]");
// get the varBinding for the arc sinks
Map<String, String> vbConstraints = new HashMap<String, String>();
vbConstraints.put("VB.PNameRef", aArc.getSourcePnameRef());
vbConstraints.put("VB.varNameRef", aArc.getSourceVarNameRef());
vbConstraints.put("VB.wfInstanceRef", getWfInstanceID());
List<VarBinding> vbList = getPq().getVarBindings(vbConstraints); // DB
// QUERY
for (VarBinding vb : vbList) {
// add a new VarBinding for the input
getPw().addVarBinding(vb); // DB UPDATE
}
}
}
/**
* silly class to hold pairs of strings. any better way??
* @author paolo
*
*/
class Pair {
String v1, v2;
public Pair(String current, String wfNameRef) {
v1=current; v2=wfNameRef;
}
/**
* @return the v1
*/
public String getV1() {
return v1;
}
/**
* @param v1 the v1 to set
*/
public void setV1(String v1) {
this.v1 = v1;
}
/**
* @return the v2
*/
public String getV2() {
return v2;
}
/**
* @param v2 the v2 to set
*/
public void setV2(String v2) {
this.v2 = v2;
}
}
public List<Pair> toposort(String dataflowName, String wfInstanceId) throws SQLException {
// String wfNameRef = pq.getWfNameForDataflow(dataflowName, wfInstanceID);
String wfNameRef = pq.getWfNameForDataflow(dataflowName);
// fetch processors along with the count of their predecessors
Map<String, Integer> predecessorsCount = getPq().getPredecessorsCount(wfInstanceId);
Map<String, List<String>> successorsOf = new HashMap<String, List<String>>();
// List<String> procList = pq.getContainedProcessors(dataflowName, wfInstanceId);
List<String> procList = pq.getContainedProcessors(dataflowName);
// logger.debug("toposort on "+dataflowName);
// logger.debug("contained procs: ");
for (String s:procList) {
List<String> successors = getPq().getSuccProcessors(s, wfNameRef, wfInstanceId);
successorsOf.put(s, successors);
// logger.debug(s+" with "+predecessorsCount.get(s)+" predecessors and successors:");
// for (String s1:successors) { logger.debug(s1); }
}
List<Pair> sorted = tsort(procList, dataflowName, predecessorsCount, successorsOf, wfNameRef, wfInstanceId);
// logger.debug("tsort:");
// for (String p : sorted) { logger.debug(p); }
for (int i=0; i< sorted.size(); i++) {
String procName = sorted.get(i).getV1();
if (pq.isDataflow(procName) && !procName.equals(dataflowName)) { // handle weirdness: a dataflow is contained within itself..
// recurse on procName
// logger.debug("recursion on "+procName);
List<Pair> sortedSublist = toposort(procName, wfInstanceId);
// replace procName with sortedSublist in sorted
sorted.remove(i);
sorted.addAll(i, sortedSublist);
}
}
return sorted;
}
/**
* STUB
* @param procList
* @param predecessorsCount
* @param successorsOf
* @param wfInstanceId
* @return
* @throws SQLException
*/
public List<Pair> tsort(List<String> procList,
String dataflowName,
Map<String, Integer> predecessorsCount,
Map<String, List<String>> successorsOf,
String wfNameRef, String wfInstanceId) throws SQLException {
List<Pair> L = new ArrayList<Pair>(); // holds sorted elements
List<String> Q = new ArrayList<String>(); // temp queue
Set<String> visited = new HashSet<String>();
// logger.debug("queue init with procList");
// init queue with procList processors that have no predecessors
for (String proc:procList) {
// logger.debug("dataflowName: "+dataflowName+" proc: "+proc);
if (predecessorsCount.get(proc) == null || predecessorsCount.get(proc) == 0 &&
!proc.equals(dataflowName)) {
Q.add(proc);
}
// logger.debug(proc + " added to queue");
// } else
// logger.debug(proc+" not added to queue");
}
// logger.debug("queue has "+Q.size()+" elements");
while (!Q.isEmpty()) {
String current = Q.remove(0);
// logger.debug("extracted "+current+" and added to L");
L.add(new Pair(current,wfNameRef));
// for (String s:L) logger.debug(s);
List<String> successors = successorsOf.get(current);
// logger.debug("\n****successors of "+current);
if (successors == null) continue;
// reduce the number of predecessors to each of the successors by one
// NB we must traverse an additional arc through a nested workflow input if the successor is a dataflow!!
for (String succ : successors) {
// logger.debug(succ);
// decrease edge count for each successor processor
Integer cnt = predecessorsCount.get(succ);
predecessorsCount.put(succ, new Integer(cnt.intValue() - 1));
// logger.debug("now "+succ+" has "+predecessorsCount.get(succ)+" predecessors");
if (predecessorsCount.get(succ) == 0 && !succ.equals(dataflowName)) {
Q.add(succ);
// logger.debug("adding "+succ+" to queue");
}
}
} // end loop on Q
return L;
}
public void propagateANL(String wfInstanceId) throws SQLException {
String top = pq.getTopLevelDataflowName(wfInstanceId);
// //////////////////////
// PHASE I: toposort the processors in the whole graph
// //////////////////////
List<Pair> sorted = toposort(top, wfInstanceId);
logger.debug("final sorted list of processors");
for (Pair p:sorted) { logger.debug(p.getV1()+" in wfnameRef "+p.getV2()); }
// //////////////////////
// PHASE II: traverse and set anl on each port
// //////////////////////
// logger.debug("***** STARTING ANL *****");
// // sorted processor names in L at this point
// // process them in order
for (Pair pnameInContext : sorted) {
// logger.debug("setting ANL for "+pnameInContext.getV1()+" input vars");
// // process pname's inputs -- set ANL to be the DNL if not set in prior steps
String pname = pnameInContext.getV1();
String wfNameRef = pnameInContext.getV2();
// logger.debug("processor "+pname);
List<Var> inputs = getPq().getInputVars(pname, wfNameRef, wfInstanceId); // null -> do not use instance (??) CHECK
// logger.debug(inputs.size()+" inputs for "+pnameInContext.getV1());
int totalANL = 0;
for (Var iv : inputs) {
if (iv.isANLset() == false) {
iv.setActualNestingLevel(iv.getTypeNestingLevel());
iv.setANLset(true);
getPw().updateVar(iv);
// logger.debug("var: "+iv.getVName()+" set at nominal level "+iv.getActualNestingLevel());
}
int delta_nl = iv.getActualNestingLevel() - iv.getTypeNestingLevel();
// if delta_nl < 0 then Taverna wraps the value into a list --> use dnl(X) in this case
if (delta_nl < 0 ) delta_nl = 0;// CHECK iv.getTypeNestingLevel();
// logger.debug("delta for "+iv.getVName()+" "+delta_nl);
totalANL += delta_nl;
// this should take care of the special case of the top level dataflow with inputs that have successors in the graph
// propagate this through all the links from this var
// List<Var> successors = getPq().getSuccVars(pname, iv.getVName(), wfInstanceId);
// logger.debug(successors.size()+ " successors for var "+iv.getVName());
// for (Var v : successors) {
// v.setActualNestingLevel(iv.getActualNestingLevel());
// v.setANLset(true);
// getPw().updateVar(v);
// }
}
// logger.debug("total anl: "+totalANL);
// logger.debug("now setting ANL for "+pname+" output vars");
// process pname's outputs -- set ANL based on the sum formula (see
// paper)
List<Var> outputs = getPq().getOutputVars(pname, wfNameRef, wfInstanceId);
for (Var ov : outputs) {
ov.setActualNestingLevel(ov.getTypeNestingLevel() + totalANL);
logger.debug("anl for "+pname+":"+ov.getVName()+" = "+(ov.getTypeNestingLevel() + totalANL));
ov.setANLset(true);
getPw().updateVar(ov);
// propagate this through all the links from this var
List<Var> successors = getPq().getSuccVars(pname, ov.getVName(), wfNameRef);
// logger.debug(successors.size()+ " successors for var "+ov.getVName());
for (Var v : successors) {
List<Var> toBeProcessed = new ArrayList<Var>();
toBeProcessed.add(v);
if (pq.isDataflow(v.getPName()) && v.isInput()) { // this is the input to a nested workflow
// String tempWfNameRef = pq.getWfNameForDataflow(v.getPName(), wfInstanceId);
String tempWfNameRef = pq.getWfNameForDataflow(v.getPName());
List<Var> realSuccessors = getPq().getSuccVars(v.getPName(), v.getVName(), tempWfNameRef);
// logger.debug("realSuccessors size = "+realSuccessors.size());
toBeProcessed.remove(0);
toBeProcessed.addAll(realSuccessors);
} else if (pq.isDataflow(v.getPName()) && !v.isInput()) { // this is the output to a nested workflow
// String tempWfNameRef = pq.getWfNameForDataflow(v.getPName(), wfInstanceId);
String tempWfNameRef = pq.getWfNameForDataflow(v.getPName());
List<Var> realSuccessors = getPq().getSuccVars(v.getPName(), v.getVName(), null);
// logger.debug("realSuccessors size = "+realSuccessors.size());
toBeProcessed.remove(0);
toBeProcessed.addAll(realSuccessors);
}
for (Var v1:toBeProcessed) {
v1.setActualNestingLevel(ov.getActualNestingLevel());
logger.debug("anl for "+v1.getPName()+":"+v1.getVName()+" = "+ov.getActualNestingLevel());
v1.setANLset(true);
getPw().updateVar(v1);
}
}
}
}
}
public void setPw(ProvenanceWriter pw) {
this.pw = pw;
}
public ProvenanceWriter getPw() {
return pw;
}
public void setPq(ProvenanceQuery pq) {
this.pq = pq;
}
public ProvenanceQuery getPq() {
return pq;
}
public void setWfInstanceID(String wfInstanceID) {
this.wfInstanceID = wfInstanceID;
}
public String getWfInstanceID() {
return wfInstanceID;
}
public void setWfdp(WorkflowDataProcessor wfdp) {
this.wfdp = wfdp;
}
public WorkflowDataProcessor getWfdp() {
return wfdp;
}
}