blob: 72913fad284e6967aab9ef4ca4425c50690e540f [file] [log] [blame]
/**
*
*/
package net.sf.taverna.t2.provenance.lineageservice;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.log4j.Logger;
import net.sf.taverna.t2.provenance.item.ProvenanceItem;
import net.sf.taverna.t2.provenance.item.WorkflowDataProvenanceItem;
import net.sf.taverna.t2.provenance.lineageservice.utils.ProvenanceUtils;
import net.sf.taverna.t2.provenance.lineageservice.utils.VarBinding;
/**
* @author paolo
* this class manages the outputs from a workflow, as they come along through WorkflowData events
*/
public class WorkflowDataProcessor {
private static Logger logger = Logger.getLogger(WorkflowDataProcessor.class);
// set of trees (impl as lists), one for each varname
// PM varname not enough must use the WFID as context as well, because the same output varname
// may occur in multiple nested workflows
Map<String, List<WorkflowDataNode>> workflowDataTrees = new HashMap<String, List<WorkflowDataNode>>();
ProvenanceQuery pq=null;
ProvenanceWriter pw = null;
/**
* adds the input ProvenanceItem event to the tree structure corresponding to the varname found in the item.
* Repeated invocations of this method incrementally reconstruct the tree structure for each of the workflow outputs
* @param root
*/
public void addWorkflowDataItem(ProvenanceItem provenanceItem) {
WorkflowDataNode wdn = new WorkflowDataNode();
wdn.setVarName(((WorkflowDataProvenanceItem)provenanceItem).getPortName());
wdn.setValue(((WorkflowDataProvenanceItem)provenanceItem).getData().toString());
int[] index = ((WorkflowDataProvenanceItem)provenanceItem).getIndex();
String iterationToString = ProvenanceUtils.iterationToString(index);
wdn.setIndex(iterationToString);
wdn.setWorkflowID(((WorkflowDataProvenanceItem)provenanceItem).getWorkflowId());
if (wdn.getValue().contains("list")) wdn.setList(true); // HACK
else wdn.setList(false);
// position this wdn into the tree associated to its varname
List<WorkflowDataNode> aTree = workflowDataTrees.get(wdn.getVarName());
if (aTree == null) { // first item in the tree
aTree = new ArrayList<WorkflowDataNode>();
workflowDataTrees.put(wdn.getVarName(), aTree);
} else {
// update parent pointers
for (WorkflowDataNode aNode: aTree) {
if (isParent(wdn.getIndex(), aNode.getIndex())) {
aNode.setParent(wdn);
// set position in collection as the last index in the vector
aNode.setRelativePosition(getPosition(aNode));
}
}
}
aTree.add(wdn);
}
/**
* writes records to VarBinding or Collection by traversing the trees<br/>
* expect this to be invoked after workflow completion
* @param wfInstanceRef the runID
* @param dataflowID the external name of the dataflow (not the UUID)
*/
public void processTrees(String dataflowID, String wfInstanceRef) {
logger.debug("processing output trees");
for (Map.Entry<String, List<WorkflowDataNode>> entry:workflowDataTrees.entrySet()) {
String varName = entry.getKey();
List<WorkflowDataNode> tree = entry.getValue();
VarBinding vb = null;
try {
logger.debug("storing tree for var "+varName+" in workflow with ID "+dataflowID+" and instance "+wfInstanceRef);
for (WorkflowDataNode node:tree) {
if (!node.getWorkflowID().equals(dataflowID)) continue;
if (node.isList) {
logger.debug("creating collection entry for "+
node.value+" with index "+
node.index);
if (node.getParent()!=null) {
logger.debug(" and parent "+node.parent.index);
// write a collection record to DB
getPw().addCollection(dataflowID,
node.getValue(),
node.getParent().getValue(),
node.getIndex(),
varName,
wfInstanceRef);
} else {
getPw().addCollection(dataflowID,
node.getValue(),
null,
node.getIndex(),
varName,
wfInstanceRef);
}
} else {
logger.debug("creating VarBinding for "+node.value+" with index "+node.index);
vb = new VarBinding();
vb.setWfNameRef(dataflowID);
vb.setWfInstanceRef(wfInstanceRef);
vb.setPNameRef(pq.getWorkflow(dataflowID).getExternalName());
// vb.setValueType(); // TODO not sure what to set this to
vb.setVarNameRef(varName);
vb.setIterationVector(node.getIndex());
vb.setValue(node.getValue());
if (node.getParent()!=null) {
logger.debug(" in collection "+node.getParent().value+
" with index "+node.getParent().getIndex());
vb.setCollIDRef(node.getParent().getValue());
vb.setPositionInColl(node.getRelativePosition());
} else {
vb.setPositionInColl(1); // default
}
getPw().addVarBinding(vb);
}
}
} catch (SQLException e) {
logger.debug("Problem processing trees for workflow: " +dataflowID + " instance: " + wfInstanceRef + " : "+
" updating instead of inserting");
getPw().updateVarBinding(vb);
}
}
}
/**
* @param node
* @return the last digit in the index
*/
private int getPosition(WorkflowDataNode node) {
String[] vector = node.getIndex().substring(1, node.getIndex().length()-1).split(",");
//TODO need some logic here to avoid trying to parse "" as integer, this is my try
//logger.debug("Vector length is " + vector.length);
//logger.debug("get position is " + vector[vector.length-1]);
if ((vector[vector.length-1]).equals("")) {
return 1;
}
return Integer.parseInt(vector[vector.length-1]) +1;
}
private boolean isParent(String index1, String index2) {
// strip first and last '[' ']'
String index11 = index1.substring(1, index1.length()-1);
String index22 = index2.substring(1, index2.length()-1);
String[] tokens1 = index11.split(",");
String[] tokens2 = index22.split(",");
// [] cannot be parent of [x1,x2,...] with >= 2 elements
if (index11.equals("") && tokens2.length>1) return false;
// [] is parent of any [x]
if (index11.equals("") && tokens2.length==1) return true;
// [x1,x2, ...,xk] cannot be parent of [x1,x2,...xh] when k < h-1
// because [x1,x2,...xh] is more than one level deeper than [x1,x2, ...,xk]
if (tokens1.length != tokens2.length -1) return false;
return (index22.startsWith(index11));
}
class WorkflowDataNode {
String varName;
String value;
String index;
String workflowID;
int relativePosition;
boolean isList;
WorkflowDataNode parent;
/**
* @return the value
*/
public String getValue() {
return value;
}
/**
* @param value the value to set
*/
public void setValue(String value) {
this.value = value;
}
/**
* @return the index
*/
public String getIndex() {
return index;
}
/**
* @param index the index to set
*/
public void setIndex(String index) {
this.index = index;
}
/**
* @return the varName
*/
public String getVarName() {
return varName;
}
/**
* @param varName the varName to set
*/
public void setVarName(String varName) {
this.varName = varName;
}
/**
* @return the isList
*/
public boolean isList() {
return isList;
}
/**
* @param isList the isList to set
*/
public void setList(boolean isList) {
this.isList = isList;
}
/**
* @return the parent
*/
public WorkflowDataNode getParent() {
return parent;
}
/**
* @param parent the parent to set
*/
public void setParent(WorkflowDataNode parent) {
this.parent = parent;
}
/**
* @return the relativePosition
*/
public int getRelativePosition() {
return relativePosition;
}
/**
* @param relativePosition the relativePosition to set
*/
public void setRelativePosition(int relativePosition) {
this.relativePosition = relativePosition;
}
/**
* @return the workflowID
*/
public String getWorkflowID() {
return workflowID;
}
/**
* @param workflowID the workflowID to set
*/
public void setWorkflowID(String workflowID) {
this.workflowID = workflowID;
}
}
/**
* @return the pq
*/
public ProvenanceQuery getPq() {
return pq;
}
/**
* @param pq the pq to set
*/
public void setPq(ProvenanceQuery pq) {
this.pq = pq;
}
/**
* @return the pw
*/
public ProvenanceWriter getPw() {
return pw;
}
/**
* @param pw the pw to set
*/
public void setPw(ProvenanceWriter pw) {
this.pw = pw;
}
}