| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| */ |
| package org.apache.taverna.provenance.lineageservice; |
| |
| import static java.util.Collections.synchronizedList; |
| import static org.apache.taverna.provenance.lineageservice.utils.ProvenanceProcessor.DATAFLOW_ACTIVITY; |
| import static org.apache.taverna.provenance.lineageservice.utils.ProvenanceUtils.getDataItemAsXML; |
| import static org.apache.taverna.provenance.lineageservice.utils.ProvenanceUtils.iterationToString; |
| import static org.apache.taverna.provenance.lineageservice.utils.ProvenanceUtils.parentProcess; |
| import static net.sf.taverna.t2.provenance.vocabulary.SharedVocabulary.ACTIVITY_EVENT_TYPE; |
| import static net.sf.taverna.t2.provenance.vocabulary.SharedVocabulary.END_WORKFLOW_EVENT_TYPE; |
| import static net.sf.taverna.t2.provenance.vocabulary.SharedVocabulary.INVOCATION_STARTED_EVENT_TYPE; |
| import static net.sf.taverna.t2.provenance.vocabulary.SharedVocabulary.ITERATION_EVENT_TYPE; |
| import static net.sf.taverna.t2.provenance.vocabulary.SharedVocabulary.PROCESSOR_EVENT_TYPE; |
| import static net.sf.taverna.t2.provenance.vocabulary.SharedVocabulary.PROCESS_EVENT_TYPE; |
| import static net.sf.taverna.t2.provenance.vocabulary.SharedVocabulary.WORKFLOW_DATA_EVENT_TYPE; |
| |
| import java.beans.ExceptionListener; |
| import java.beans.XMLEncoder; |
| import java.io.BufferedOutputStream; |
| import java.io.File; |
| import java.io.FileOutputStream; |
| import java.io.IOException; |
| import java.io.StringReader; |
| import java.io.StringWriter; |
| import java.sql.Blob; |
| import java.sql.SQLException; |
| import java.util.ArrayList; |
| import java.util.Collections; |
| import java.util.HashMap; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.Map.Entry; |
| import java.util.UUID; |
| import java.util.concurrent.ConcurrentHashMap; |
| |
| import javax.sql.rowset.serial.SerialBlob; |
| |
| import net.sf.taverna.t2.provenance.item.DataProvenanceItem; |
| import net.sf.taverna.t2.provenance.item.DataflowRunComplete; |
| import net.sf.taverna.t2.provenance.item.InputDataProvenanceItem; |
| import net.sf.taverna.t2.provenance.item.InvocationStartedProvenanceItem; |
| import net.sf.taverna.t2.provenance.item.IterationProvenanceItem; |
| import net.sf.taverna.t2.provenance.item.OutputDataProvenanceItem; |
| import net.sf.taverna.t2.provenance.item.ProvenanceItem; |
| import net.sf.taverna.t2.provenance.item.WorkflowProvenanceItem; |
| import org.apache.taverna.provenance.lineageservice.utils.DataBinding; |
| import org.apache.taverna.provenance.lineageservice.utils.DataLink; |
| import org.apache.taverna.provenance.lineageservice.utils.NestedListNode; |
| import org.apache.taverna.provenance.lineageservice.utils.Port; |
| import org.apache.taverna.provenance.lineageservice.utils.PortBinding; |
| import org.apache.taverna.provenance.lineageservice.utils.ProcessorBinding; |
| import org.apache.taverna.provenance.lineageservice.utils.ProcessorEnactment; |
| import org.apache.taverna.provenance.lineageservice.utils.ProvenanceProcessor; |
| import org.apache.taverna.provenance.lineageservice.utils.ProvenanceUtils; |
| import net.sf.taverna.t2.provenance.vocabulary.SharedVocabulary; |
| import org.apache.taverna.reference.T2Reference; |
| import org.apache.taverna.workflowmodel.Dataflow; |
| import org.apache.taverna.workflowmodel.DataflowInputPort; |
| import org.apache.taverna.workflowmodel.DataflowOutputPort; |
| import org.apache.taverna.workflowmodel.Datalink; |
| import org.apache.taverna.workflowmodel.InputPort; |
| import org.apache.taverna.workflowmodel.MergeInputPort; |
| import org.apache.taverna.workflowmodel.MergeOutputPort; |
| import org.apache.taverna.workflowmodel.OutputPort; |
| import org.apache.taverna.workflowmodel.Processor; |
| import org.apache.taverna.workflowmodel.ProcessorInputPort; |
| import org.apache.taverna.workflowmodel.ProcessorOutputPort; |
| import org.apache.taverna.workflowmodel.processor.activity.Activity; |
| import org.apache.taverna.workflowmodel.processor.activity.NestedDataflow; |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| //import org.apache.commons.io.FileUtils; |
| import org.apache.log4j.Level; |
| import org.apache.log4j.Logger; |
| import org.jdom.Document; |
| import org.jdom.Element; |
| import org.jdom.JDOMException; |
| import org.jdom.Namespace; |
| import org.jdom.input.SAXBuilder; |
| import org.jdom.output.XMLOutputter; |
| |
| import org.apache.taverna.scufl2.api.io.WorkflowBundleIO; |
| |
| /** |
| * @author Paolo Missier |
| */ |
| public class EventProcessor { |
| /** |
| * A map of UUIDs of the originating processor to the ProcBinding object |
| * that contains its parameters |
| */ |
| private Map<String, ProcessorBinding> procBindingMap = new ConcurrentHashMap<>(); |
| |
| /** A map of child ids to their parents in the hierarchy of events: |
| * workflow -> process -> processor -> activity -> iteration |
| */ |
| private Map<String, String> parentChildMap= new ConcurrentHashMap<>(); |
| |
| private static Logger logger = Logger.getLogger(EventProcessor.class); |
| |
| private static final String OUTPUT_CONTAINER_PROCESSOR = "_OUTPUT_"; |
| private static final String INPUT_CONTAINER_PROCESSOR = "_INPUT_"; |
| |
| private volatile boolean workflowStructureDone = false; // used to inhibit processing of multiple workflow events -- we only need the first |
| private volatile String workflowRunId = null; // unique run ID. set when we see the first event of type "process" |
| |
| String topLevelDataflowName = null; |
| String topLevelDataflowID = null; |
| |
| Map<String, String> wfNestingMap = new ConcurrentHashMap<>(); |
| |
| // all input bindings are accumulated here so they can be "backpatched" (see backpatching() ) |
| List<PortBinding> allInputVarBindings = synchronizedList(new ArrayList<PortBinding>()); |
| |
| // dedicated class for processing WorkflowData events which carry workflow output info |
| private WorkflowDataProcessor wfdp; |
| private ProvenanceWriter pw = null; |
| private ProvenanceQuery pq = null; |
| |
| private HashMap<String, Port> mapping; |
| |
| private Map<String, ProcessorEnactment> processorEnactmentMap = new ConcurrentHashMap<>(); |
| |
| private Map<String, ProvenanceProcessor> processorMapById = new ConcurrentHashMap<>(); |
| |
| private WorkflowBundleIO io; |
| |
| // Backpatching temporarily disabled |
| private static final boolean backpatching = false; |
| |
| public EventProcessor(WorkflowBundleIO io) { |
| this.io = io; |
| } |
| |
| /** |
| * @param pw |
| * @throws SQLException |
| * @throws ClassNotFoundException |
| * @throws IllegalAccessException |
| * @throws InstantiationException |
| * |
| */ |
| public EventProcessor(ProvenanceWriter pw, ProvenanceQuery pq, |
| WorkflowDataProcessor wfdp,WorkflowBundleIO io) throws InstantiationException, |
| IllegalAccessException, ClassNotFoundException, SQLException { |
| this.pw = pw; |
| this.pq = pq; |
| this.wfdp = wfdp; |
| this.io = io; |
| |
| //logger.setLevel((Level) Level.INFO); |
| } |
| |
| /** |
| * this is the new version that makes use of the T2 deserializer |
| * populate static portion of the DB<br/> |
| * the static structure may already be in the DB -- this is detected as a duplicate top-level workflow ID. |
| * In this case, we skip this processing altogether |
| * @param content |
| * is a serialized dataflow (XML) -- this is parsed using the T2 |
| * Deserializer |
| * @return the workflowRunId for this workflow structure |
| */ |
| public String processWorkflowStructure(ProvenanceItem provenanceItem) { |
| /* |
| * this flag is set to prevent processing of separate |
| * workflowProvenanceItems that describe nested workflows. the |
| * processing of all nested workflows is done as part of the very first |
| * workflowProvenanceItem that we receive, which is self-consistent. so |
| * we ignore all others |
| */ |
| if (workflowStructureDone) |
| return null; |
| WorkflowProvenanceItem wpi = (WorkflowProvenanceItem) provenanceItem; |
| setWorkflowRunId(wpi.getIdentifier()); |
| workflowStructureDone = true; |
| return processWorkflowStructure(wpi.getDataflow()); |
| } |
| |
| public String processWorkflowStructure(Dataflow df) { |
| topLevelDataflowName = df.getLocalName(); |
| topLevelDataflowID = df.getIdentifier(); |
| |
| // check whether we already have this WF in the DB |
| List<String> workflowIds = null; |
| try { |
| workflowIds = pq.getAllworkflowIds(); |
| } catch (SQLException e) { |
| logger.warn("Problem processing workflow structure", e); |
| } |
| |
| if (workflowIds == null || workflowIds.contains(topLevelDataflowID)) { |
| // not already in the DB |
| logger.info("new workflow structure with ID " + topLevelDataflowID); |
| ProvenanceProcessor provProc = new ProvenanceProcessor(); |
| provProc.setIdentifier(UUID.randomUUID().toString()); |
| provProc.setProcessorName(topLevelDataflowName); |
| provProc.setFirstActivityClassName(DATAFLOW_ACTIVITY); |
| provProc.setWorkflowId(topLevelDataflowID); |
| provProc.setTopLevelProcessor(true); |
| // record the top level dataflow as a processor in the DB |
| try { |
| pw.addProcessor(provProc); |
| // pw.addProcessor(topLevelDataflowName, DATAFLOW_PROCESSOR_TYPE, topLevelDataflowID, true); // true -> is top level |
| } catch (SQLException e) { |
| logger.warn("Can't add processor " + topLevelDataflowID, e); |
| } |
| } |
| |
| return processDataflowStructure(df, topLevelDataflowID, df.getLocalName()); // null: no external name given to top level dataflow |
| } |
| |
| private Blob serialize(Dataflow df) { |
| Element serializeDataflow = null;xmlSerializer.serializeDataflow(df);//FIXME |
| String dataflowString = null; |
| try { |
| XMLOutputter outputter = new XMLOutputter(); |
| StringWriter stringWriter = new StringWriter(); |
| outputter.output(serializeDataflow, stringWriter); |
| dataflowString = stringWriter.toString(); |
| } catch (java.io.IOException e) { |
| logger.error("Could not serialise dataflow", e); |
| // FIXME Bad Exception handling! |
| } |
| return new SerialBlob(dataflowString.getBytes("UTF-8")); |
| } |
| |
| /** |
| * note: this method can be called as part of a recursion on sub-workflows |
| * |
| * @param df |
| * @param dataflowID |
| * the UUID for the entire dataflow (may be a sub-dataflow) |
| * @param localName |
| * the external name of the dataflow. Null if this is top level, |
| * not null if a sub-dataflow |
| * @return the workflowRunId for this workflow structure |
| */ |
| private String processDataflowStructure(Dataflow df, String dataflowID, String externalName) { |
| String localWorkflowRunID = getWorkflowRunId(); |
| |
| //dataflowDepth++; |
| |
| try { |
| // check whether we already have this WF in the DB |
| boolean alreadyInDb; |
| try { |
| List<String> workflowIds = pq.getAllworkflowIds(); |
| alreadyInDb = workflowIds != null && workflowIds.contains(dataflowID); |
| } catch (SQLException e) { |
| logger.warn("Problem processing dataflow structure for " + dataflowID, e); |
| alreadyInDb = false; |
| } |
| |
| // add workflow ID -- this is NOT THE SAME AS the workflowRunId |
| |
| /* |
| * this could be a nested workflow -- in this case, override its |
| * workflowRunId with that of its parent |
| */ |
| if (!alreadyInDb) { |
| String parentDataflow = wfNestingMap.get(dataflowID); |
| Blob blob = serialize(df); |
| if (parentDataflow == null) { |
| // this is a top level dataflow description |
| pw.addWFId(dataflowID, null, externalName, blob); // set its dataflowID with no parent |
| |
| } else { |
| // we are processing a nested workflow structure |
| logger.debug("dataflow "+dataflowID+" with external name "+externalName+" is nested within "+parentDataflow); |
| |
| pw.addWFId(dataflowID, parentDataflow, externalName, blob); // set its dataflowID along with its parent |
| |
| // override workflowRunId to point to top level -- UNCOMMENTED PM 9/09 CHECK |
| localWorkflowRunID = pq.getRuns(parentDataflow, null).get(0).getWorkflowRunId(); |
| } |
| } |
| // Log the run itself |
| pw.addWorkflowRun(dataflowID, localWorkflowRunID); |
| |
| // add processors along with their variables |
| List<Port> vars = new ArrayList<Port>(); |
| for (Processor p : df.getProcessors()) { |
| String pName = p.getLocalName(); |
| |
| //CHECK get type of first activity and set this as the type of the processor itself |
| List<? extends Activity<?>> activities = p.getActivityList(); |
| |
| if (! alreadyInDb) { |
| ProvenanceProcessor provProc; |
| String pType = null; |
| if (activities != null && !activities.isEmpty()) |
| pType = activities.get(0).getClass().getCanonicalName(); |
| provProc = new ProvenanceProcessor(); |
| provProc.setIdentifier(UUID.randomUUID().toString()); |
| provProc.setProcessorName(pName); |
| provProc.setFirstActivityClassName(pType); |
| provProc.setWorkflowId(dataflowID); |
| provProc.setTopLevelProcessor(false); |
| |
| pw.addProcessor(provProc); |
| |
| //pw.addProcessor(pName, pType, dataflowID, false); // false: not a top level processor |
| |
| /* |
| * add all input ports for this processor as input variables |
| */ |
| for (ProcessorInputPort ip : p.getInputPorts()) { |
| Port inputVar = new Port(); |
| inputVar.setIdentifier(UUID.randomUUID().toString()); |
| inputVar.setProcessorId(provProc.getIdentifier()); |
| inputVar.setProcessorName(pName); |
| inputVar.setWorkflowId(dataflowID); |
| inputVar.setPortName(ip.getName()); |
| inputVar.setDepth(ip.getDepth()); |
| inputVar.setInputPort(true); |
| vars.add(inputVar); |
| } |
| |
| /* |
| * add all output ports for this processor as output |
| * variables |
| */ |
| for (ProcessorOutputPort op : p.getOutputPorts()) { |
| Port outputVar = new Port(); |
| outputVar.setIdentifier(UUID.randomUUID().toString()); |
| outputVar.setProcessorName(pName); |
| outputVar.setProcessorId(provProc.getIdentifier()); |
| outputVar.setWorkflowId(dataflowID); |
| outputVar.setPortName(op.getName()); |
| outputVar.setDepth(op.getDepth()); |
| outputVar.setInputPort(false); |
| |
| vars.add(outputVar); |
| } |
| } |
| |
| /* |
| * check for nested structures: if the activity is |
| * DataflowActivity then this processor is a nested workflow; |
| * make an entry into wfNesting map with its ID and recurse on |
| * the nested workflow |
| */ |
| |
| if (activities != null) |
| for (Activity<?> a : activities) { |
| if (!(a instanceof NestedDataflow)) |
| continue; |
| |
| Dataflow nested = ((NestedDataflow) a) |
| .getNestedDataflow(); |
| wfNestingMap.put(nested.getIdentifier(), dataflowID); // child -> parent |
| |
| // RECURSIVE CALL |
| processDataflowStructure(nested, |
| nested.getIdentifier(), p.getLocalName()); |
| } |
| } // end for each processor |
| |
| // add inputs to entire dataflow |
| String pName = INPUT_CONTAINER_PROCESSOR; // overridden -- see below |
| |
| /* |
| * check whether we are processing a nested workflow. in this case |
| * the input vars are not assigned to the INPUT processor but to the |
| * containing dataflow |
| */ |
| if (! alreadyInDb) { |
| if (externalName != null) // override the default if we are nested or someone external name is provided |
| pName = externalName; |
| |
| for (DataflowInputPort ip : df.getInputPorts()) { |
| Port inputVar = new Port(); |
| inputVar.setIdentifier(UUID.randomUUID().toString()); |
| inputVar.setProcessorId(null); // meaning workflow port |
| inputVar.setProcessorName(pName); |
| inputVar.setWorkflowId(dataflowID); |
| inputVar.setPortName(ip.getName()); |
| inputVar.setDepth(ip.getDepth()); |
| inputVar.setInputPort(true); // CHECK PM modified 11/08 -- input vars are actually outputs of input processors... |
| |
| vars.add(inputVar); |
| } |
| |
| // add outputs of entire dataflow |
| pName = OUTPUT_CONTAINER_PROCESSOR; // overridden -- see below |
| |
| /* |
| * check whether we are processing a nested workflow. in this |
| * case the output vars are not assigned to the OUTPUT processor |
| * but to the containing dataflow |
| */ |
| if (externalName != null) // we are nested |
| pName = externalName; |
| |
| for (DataflowOutputPort op : df.getOutputPorts()) { |
| Port outputVar = new Port(); |
| outputVar.setIdentifier(UUID.randomUUID().toString()); |
| outputVar.setProcessorId(null); // meaning workflow port |
| outputVar.setProcessorName(pName); |
| outputVar.setWorkflowId(dataflowID); |
| outputVar.setPortName(op.getName()); |
| outputVar.setDepth(op.getDepth()); |
| outputVar.setInputPort(false); // CHECK PM modified 11/08 -- output vars are actually outputs of output processors... |
| vars.add(outputVar); |
| } |
| |
| pw.addPorts(vars, dataflowID); |
| makePortMapping(vars); |
| |
| /* |
| * add datalink records using the dataflow links retrieving the |
| * processor names requires navigating from links to source/sink |
| * and from there to the processors |
| */ |
| for (Datalink l : df.getLinks()) { |
| // TODO cover the case of datalinks from an input and to an output to the entire dataflow |
| |
| Port sourcePort = null; |
| Port destinationPort = null; |
| |
| OutputPort source = l.getSource(); |
| if (source instanceof ProcessorOutputPort) { |
| String sourcePname = ((ProcessorOutputPort) source) |
| .getProcessor().getLocalName(); |
| sourcePort = lookupPort(sourcePname, source.getName(), false); |
| } else if (source instanceof MergeOutputPort) { |
| // TODO: Handle merge output ports |
| } else |
| // Assume it is internal port from DataflowInputPort |
| sourcePort = lookupPort(externalName, source.getName(), true); |
| |
| InputPort sink = l.getSink(); |
| if (sink instanceof ProcessorInputPort) { |
| String sinkPname = ((ProcessorInputPort) sink) |
| .getProcessor().getLocalName(); |
| destinationPort = lookupPort(sinkPname, sink.getName(), true); |
| } else if (sink instanceof MergeInputPort) { |
| // TODO: Handle merge input ports |
| } else |
| // Assume it is internal port from DataflowOutputPort |
| destinationPort = lookupPort(externalName, sink.getName(), false); |
| |
| if (sourcePort != null && destinationPort != null) |
| pw.addDataLink(sourcePort, destinationPort, dataflowID); |
| else |
| logger.info("Can't record datalink " + l); |
| } |
| } |
| } catch (Exception e) { |
| logger.error("Problem processing provenance for dataflow", e); |
| } |
| |
| return dataflowID; |
| } |
| |
| private void makePortMapping(List<Port> ports) { |
| mapping = new HashMap<>(); |
| for (Port port: ports) { |
| String key = port.getProcessorName() |
| + (port.isInputPort() ? "/i:" : "/o:") + port.getPortName(); |
| mapping.put(key, port); |
| } |
| } |
| |
| private Port lookupPort(String processorName, String portName, boolean isInputPort) { |
| String key = processorName + (isInputPort ? "/i:" : "/o:") + portName; |
| return mapping.get(key); |
| } |
| |
| /** |
| * processes an elementary process execution event from T2. Collects info |
| * from events as they happen and sends them to the writer for processing |
| * when the iteration event is received. Uses the map of procBindings to |
| * process event id and the map of child ids to parent ids to ensure that |
| * the correct proc binding is used |
| * @param currentWorkflowID |
| * |
| * @param d |
| * @param context |
| */ |
| public void processProcessEvent(ProvenanceItem provenanceItem, String currentWorkflowID) { |
| switch (provenanceItem.getEventType()) { |
| case PROCESS_EVENT_TYPE: { |
| String parentId = provenanceItem.getParentId(); // this is the workflowID |
| String identifier = provenanceItem.getIdentifier(); // use this as workflowRunId if this is the top-level process |
| |
| parentChildMap.put(identifier, parentId); |
| ProcessorBinding pb = new ProcessorBinding(); |
| pb.setWorkflowRunId(getWorkflowRunId()); |
| pb.setWorkflowId(currentWorkflowID); |
| procBindingMap.put(identifier, pb); |
| return; |
| } |
| case PROCESSOR_EVENT_TYPE: { |
| String identifier = provenanceItem.getIdentifier(); |
| String parentId = provenanceItem.getParentId(); |
| String processID = provenanceItem.getProcessId(); // this is the external process ID |
| |
| // this has the weird form facade0:dataflowname:pname need to extract pname from here |
| String[] processName = processID.split(":"); |
| procBindingMap.get(parentId).setProcessorName( |
| processName[processName.length - 1]); |
| // 3rd component of composite name |
| |
| parentChildMap.put(identifier, parentId); |
| return; |
| } |
| case ACTIVITY_EVENT_TYPE: { |
| String identifier = provenanceItem.getIdentifier(); |
| String parentId = provenanceItem.getParentId(); |
| procBindingMap.get(parentChildMap.get(parentId)) |
| .setFirstActivityClassName(identifier); |
| parentChildMap.put(identifier, parentId); |
| return; |
| } |
| case ITERATION_EVENT_TYPE: { |
| IterationProvenanceItem iterationProvenanceItem = (IterationProvenanceItem)provenanceItem; |
| if (iterationProvenanceItem.getParentIterationItem() != null) |
| // Skipping pipelined outputs, we'll process the parent output later instead |
| return; |
| |
| // traverse up to root to retrieve ProcBinding that was created when we saw the process event |
| String activityID = provenanceItem.getParentId(); |
| String processorID = parentChildMap.get(activityID); |
| String processID = parentChildMap.get(processorID); |
| String iterationID = provenanceItem.getIdentifier(); |
| parentChildMap.put(iterationID, activityID); |
| |
| ProcessorEnactment processorEnactment = processorEnactmentMap |
| .get(iterationID); |
| if (processorEnactment == null) |
| processorEnactment = new ProcessorEnactment(); |
| |
| ProcessorBinding procBinding = procBindingMap.get(processID); |
| |
| String itVector = extractIterationVector(iterationToString(iterationProvenanceItem |
| .getIteration())); |
| procBinding.setIterationVector(itVector); |
| |
| processorEnactment.setEnactmentStarted(iterationProvenanceItem |
| .getEnactmentStarted()); |
| processorEnactment.setEnactmentEnded(iterationProvenanceItem |
| .getEnactmentEnded()); |
| processorEnactment.setWorkflowRunId(workflowRunId); |
| processorEnactment.setIteration(itVector); |
| |
| String processId = iterationProvenanceItem.getProcessId(); |
| String parentProcessId = parentProcess(processId, 3); |
| if (parentProcessId != null) { |
| ProcessorEnactment parentProcEnact = getWfdp().invocationProcessToProcessEnactment |
| .get(parentProcessId); |
| if (parentProcEnact != null) |
| processorEnactment |
| .setParentProcessorEnactmentId(parentProcEnact |
| .getProcessEnactmentId()); |
| } |
| processorEnactment.setProcessEnactmentId(iterationProvenanceItem |
| .getIdentifier()); |
| processorEnactment.setProcessIdentifier(processId); |
| |
| ProvenanceProcessor provenanceProcessor; |
| if (processorEnactment.getProcessorId() == null) { |
| provenanceProcessor = pq.getProvenanceProcessorByName( |
| currentWorkflowID, procBinding.getProcessorName()); |
| if (provenanceProcessor == null) |
| // already logged warning |
| return; |
| processorMapById.put(provenanceProcessor.getIdentifier(), |
| provenanceProcessor); |
| processorEnactment.setProcessorId(provenanceProcessor |
| .getIdentifier()); |
| } else { |
| provenanceProcessor = processorMapById.get(processorEnactment |
| .getProcessorId()); |
| if (provenanceProcessor == null) { |
| provenanceProcessor = pq |
| .getProvenanceProcessorById(processorEnactment |
| .getProcessorId()); |
| processorMapById.put(provenanceProcessor.getIdentifier(), |
| provenanceProcessor); |
| } |
| } |
| |
| InputDataProvenanceItem inputDataEl = iterationProvenanceItem.getInputDataItem(); |
| OutputDataProvenanceItem outputDataEl = iterationProvenanceItem.getOutputDataItem(); |
| |
| if (inputDataEl != null |
| && processorEnactment.getInitialInputsDataBindingId() == null) { |
| processorEnactment |
| .setInitialInputsDataBindingId(processDataBindings( |
| inputDataEl, provenanceProcessor)); |
| processInput(inputDataEl, procBinding, currentWorkflowID); |
| } |
| |
| if (outputDataEl != null |
| && processorEnactment.getFinalOutputsDataBindingId() == null) { |
| processorEnactment |
| .setFinalOutputsDataBindingId(processDataBindings( |
| outputDataEl, provenanceProcessor)); |
| processOutput(outputDataEl, procBinding, currentWorkflowID); |
| } |
| |
| try { |
| if (processorEnactmentMap.containsKey(iterationID)) { |
| getPw().updateProcessorEnactment(processorEnactment); |
| } else { |
| getPw().addProcessorEnactment(processorEnactment); |
| processorEnactmentMap.put(iterationID, processorEnactment); |
| } |
| } catch (SQLException e) { |
| logger.warn("Could not store processor enactment", e); |
| } |
| return; |
| } |
| case END_WORKFLOW_EVENT_TYPE: { |
| DataflowRunComplete completeEvent = (DataflowRunComplete) provenanceItem; |
| // use this event to do housekeeping on the input/output varbindings |
| |
| // process the input and output values accumulated by WorkflowDataProcessor |
| getWfdp().processTrees(completeEvent, getWorkflowRunId()); |
| |
| reconcileLocalOutputs(provenanceItem.getWorkflowId()); |
| |
| if (! provenanceItem.getProcessId().contains(":")) { |
| // Top-level workflow finished |
| // No longer needed, done by processTrees() |
| // patchTopLevelnputs(); |
| |
| workflowStructureDone = false; // CHECK reset for next run... |
| // reconcileTopLevelOutputs(); // Done by reconcileLocalOutputs |
| getPw().closeCurrentModel(); // only real impl is for RDF |
| } |
| return; |
| } |
| case WORKFLOW_DATA_EVENT_TYPE: { |
| // give this event to a WorkflowDataProcessor object for pre-processing |
| // try { |
| // TODO may generate an exception when the data is an error CHECK |
| getWfdp().addWorkflowDataItem(provenanceItem); |
| // } catch (NumberFormatException e) { |
| // logger.error(e); |
| // } |
| // logger.info("Received workflow data - not processing"); |
| //FIXME not sure - needs to be stored somehow |
| return; |
| } |
| case INVOCATION_STARTED_EVENT_TYPE: { |
| InvocationStartedProvenanceItem startedItem = (InvocationStartedProvenanceItem) provenanceItem; |
| ProcessorEnactment processorEnactment = processorEnactmentMap |
| .get(startedItem.getParentId()); |
| if (processorEnactment == null) { |
| logger.error("Could not find ProcessorEnactment for invocation " |
| + startedItem); |
| return; |
| } |
| getWfdp().invocationProcessToProcessEnactment.put( |
| startedItem.getInvocationProcessId(), processorEnactment); |
| return; |
| } |
| case ERROR_EVENT_TYPE: |
| //TODO process the error |
| return; |
| default: |
| // TODO broken, should we throw something here? |
| return; |
| } |
| } |
| |
| private String processDataBindings( |
| DataProvenanceItem provenanceItem, ProvenanceProcessor provenanceProcessor) { |
| // TODO: Cache known provenaneItems and avoid registering again |
| String dataBindingId = UUID.randomUUID().toString(); |
| boolean isInput = provenanceItem instanceof InputDataProvenanceItem; |
| |
| for (Entry<String, T2Reference> entry : provenanceItem.getDataMap().entrySet()) { |
| DataBinding dataBinding = new DataBinding(); |
| dataBinding.setDataBindingId(dataBindingId); |
| Port port = findPort(provenanceProcessor, entry.getKey(), isInput); // findPort |
| if (port == null) { |
| logger.warn("Could not find port for " + entry.getKey()); |
| continue; |
| } |
| dataBinding.setPort(port); |
| dataBinding.setT2Reference(entry.getValue().toUri().toASCIIString()); |
| dataBinding.setWorkflowRunId(workflowRunId); |
| try { |
| getPw().addDataBinding(dataBinding); |
| } catch (SQLException e) { |
| logger.warn("Could not register data binding for " + port, e); |
| } |
| } |
| return dataBindingId; |
| } |
| |
| private Port findPort(ProvenanceProcessor provenanceProcessor, |
| String portName, boolean isInput) { |
| // TODO: Query pr dataflow and cache |
| Map<String, String> queryConstraints = new HashMap<>(); |
| queryConstraints.put("V.workflowId", |
| provenanceProcessor.getWorkflowId()); |
| String processorName = provenanceProcessor.getProcessorName(); |
| queryConstraints.put("processorName", processorName); |
| queryConstraints.put("portName", portName); |
| queryConstraints.put("isInputPort", isInput ? "1" : "0"); |
| try { |
| List<Port> vars = pq.getPorts(queryConstraints); |
| if (vars.isEmpty()) { |
| logger.warn("Can't find port " + portName + " in " |
| + processorName); |
| } else if (vars.size() > 1) { |
| logger.warn("Multiple matches for port " + portName + " in " |
| + processorName + ", got:" + vars); |
| } else |
| return vars.get(0); |
| } catch (SQLException e) { |
| logger.error( |
| "Problem getting ports for processor: " + processorName |
| + " worflow: " |
| + provenanceProcessor.getWorkflowId(), e); |
| } |
| return null; |
| } |
| |
| |
| /** |
| * fills in the VBs for the global inputs -- this removes the need for explicit events |
| * that account for these value bindings... |
| */ |
| public void patchTopLevelnputs() { |
| |
| // for each input I to topLevelDataflow: |
| // pick first outgoing datalink with sink P:X |
| // copy value X to I -- this can be a collection, so copy everything |
| |
| // get all global input vars |
| |
| // logger.info("\n\n BACKPATCHING GLOBAL INPUTS with dataflowDepth = "+dataflowDepth+"*******\n"); |
| |
| List<Port> inputs=null; |
| try { |
| inputs = getPq().getInputPorts(topLevelDataflowName, topLevelDataflowID); |
| |
| for (Port input:inputs) { |
| |
| // logger.info("global input: "+input.getVName()); |
| |
| Map<String,String> queryConstraints = new HashMap<String,String>(); |
| |
| // queryConstraints.put("sourcePortName", input.getVName()); |
| // queryConstraints.put("sourceProcessorName", input.getPName()); |
| queryConstraints.put("sourcePortId", input.getIdentifier()); |
| queryConstraints.put("workflowId", input.getWorkflowId()); |
| List<DataLink> outgoingDataLinks = getPq().getDataLinks(queryConstraints); |
| |
| // any datalink will do, use the first |
| String targetPname = outgoingDataLinks.get(0).getDestinationProcessorName(); |
| String targetVname = outgoingDataLinks.get(0).getDestinationPortName(); |
| |
| // logger.info("copying values from ["+targetPname+":"+targetVname+"] for instance ID: ["+workflowRunId+"]"); |
| |
| queryConstraints.clear(); |
| queryConstraints.put("V.portName", targetVname); |
| queryConstraints.put("V.processorName", targetPname); |
| queryConstraints.put("VB.workflowRunId", getWorkflowRunId()); |
| queryConstraints.put("V.workflowId", topLevelDataflowID); |
| |
| for (PortBinding vb : getPq().getPortBindings(queryConstraints)) { |
| PortBinding inputPortBinding = new PortBinding(vb); |
| |
| // insert PortBinding back into VB with the global input portName |
| inputPortBinding.setProcessorName(input.getProcessorName()); |
| inputPortBinding.setPortName(input.getPortName()); |
| try { |
| getPw().addPortBinding(inputPortBinding); |
| } catch (SQLException ex) { |
| logger.info("Already logged port binding", ex); |
| } |
| } |
| } |
| } catch (SQLException e) { |
| logger.warn("Patch top level inputs problem for provenance", e); |
| } catch (IndexOutOfBoundsException e) { |
| logger.error("Could not patch top level", e); |
| } |
| } |
| |
| public void reconcileTopLevelOutputs() { |
| reconcileLocalOutputs(topLevelDataflowID); |
| } |
| |
| // PM added 23/4/09 |
| /** |
| * reconcile the top level outputs with the results from its immediate precedessors in the graph.<br/> |
| * various cases have to be considered: predecessors may include records that are not in the output, |
| * while the output may include nested list structures that are not in the precedessors. This method accounts |
| * for a 2-way reconciliation that considers all possible cases.<br/> |
| * at the end, outputs and their predecessors contain the same data.<p/> |
| * NOTE: if we assume that data values (URIs) are <em>always</em> unique then this is greatly simplified by just |
| * comparing two sets of value records by their URIs and reconciling them. But this is not the way it is done here |
| */ |
| public void reconcileLocalOutputs(String dataflowID) { |
| /* |
| for each output O |
| |
| for each variable V in predecessors(O) |
| |
| fetch all VB records for O into list OValues |
| fetch all VB records for V into list Yalues |
| |
| compare OValues and VValues: |
| it SHOULD be the case that OValues is a subset of YValues. Under this assumption: |
| |
| for each vb in YValues: |
| - if there is a matching o in OValues then (vb may be missing collection information) |
| copy o to vb |
| else |
| if vb has no collection info && there is a matching tree node tn in OTree (use iteration index for the match) then |
| set vb to be in collection tb |
| copy vb to o |
| |
| finally copy all Collection records for O in OTree -- catch duplicate errors |
| */ |
| |
| Map<String, String> queryConstraints = new HashMap<>(); |
| |
| try { |
| // for each output O |
| for (Port output:pq.getOutputPorts(topLevelDataflowName, topLevelDataflowID)) { |
| // collect all VBs for O |
| // String oPName = output.getPName(); |
| // String oVName = output.getVName(); |
| // queryConstraints.put("V.portName", oVName); |
| // queryConstraints.put("V.processorName", oPName); |
| // queryConstraints.put("VB.workflowRunId", workflowRunId); |
| // queryConstraints.put("V.workflowId", topLevelDataflowID); |
| |
| // List<PortBinding> OValues = pq.getPortBindings(queryConstraints); |
| |
| // find all records for the immediate precedessor Y of O |
| queryConstraints.clear(); |
| // queryConstraints.put("destinationPortName", output.getVName()); |
| // queryConstraints.put("destinationProcessorName", output.getPName()); |
| queryConstraints.put("destinationPortId", output.getIdentifier()); |
| queryConstraints.put("workflowId", output.getWorkflowId()); |
| List<DataLink> incomingDataLinks = pq.getDataLinks(queryConstraints); |
| |
| // there can be only one -- but check that there is one! |
| if (incomingDataLinks.isEmpty()) |
| continue; |
| |
| String sourcePname = incomingDataLinks.get(0).getSourceProcessorName(); |
| String sourceVname = incomingDataLinks.get(0).getSourcePortName(); |
| |
| queryConstraints.clear(); |
| queryConstraints.put("V.portName", sourceVname); |
| queryConstraints.put("V.processorName", sourcePname); |
| queryConstraints.put("VB.workflowRunId", getWorkflowRunId()); |
| queryConstraints.put("V.workflowId", topLevelDataflowID); |
| |
| List<PortBinding> YValues = pq.getPortBindings(queryConstraints); |
| |
| // for each YValue look for a match in OValues |
| // (assume the YValues values are a superset of OValues)!) |
| |
| for (PortBinding yValue:YValues) { |
| // look for a matching record in PortBinding for output O |
| queryConstraints.clear(); |
| queryConstraints.put("V.portName", output.getPortName()); |
| queryConstraints.put("V.processorName", output.getProcessorName()); |
| queryConstraints.put("VB.workflowRunId", getWorkflowRunId()); |
| queryConstraints.put("V.workflowid", topLevelDataflowID); |
| queryConstraints.put("VB.iteration", yValue.getIteration()); |
| if (yValue.getCollIDRef()!= null) { |
| queryConstraints.put("VB.collIDRef", yValue.getCollIDRef()); |
| queryConstraints.put("VB.positionInColl", Integer.toString(yValue.getPositionInColl())); |
| } |
| List<PortBinding> matchingOValues = pq.getPortBindings(queryConstraints); |
| |
| // result at most size 1 |
| if (!matchingOValues.isEmpty()) { |
| PortBinding oValue = matchingOValues.get(0); |
| |
| // copy collection info from oValue to yValue |
| yValue.setCollIDRef(oValue.getCollIDRef()); |
| yValue.setPositionInColl(oValue.getPositionInColl()); |
| |
| pw.updatePortBinding(yValue); |
| } else { |
| // copy the yValue to O |
| // insert PortBinding back into VB with the global output portName |
| yValue.setProcessorName(output.getProcessorName()); |
| yValue.setPortName(output.getPortName()); |
| pw.addPortBinding(yValue); |
| } |
| |
| } // for each yValue in YValues |
| |
| // copy all Collection records for O to Y |
| |
| // get all collections refs for O |
| queryConstraints.clear(); |
| queryConstraints.put("workflowRunId", getWorkflowRunId()); |
| queryConstraints.put("processorNameRef", output.getProcessorName()); |
| queryConstraints.put("portName", output.getPortName()); |
| |
| List<NestedListNode> oCollections = pq.getNestedListNodes(queryConstraints); |
| |
| // insert back as collection refs for Y -- catch duplicates |
| for (NestedListNode nln:oCollections) { |
| nln.setProcessorName(sourcePname); |
| nln.setProcessorName(sourceVname); |
| |
| getPw().replaceCollectionRecord(nln, sourcePname, sourceVname); |
| } |
| |
| } // for each output var |
| |
| } catch (SQLException e) { |
| logger.warn("Problem reconciling top level outputs", e); |
| } |
| |
| } |
| |
| @SuppressWarnings("unchecked") |
| private void processOutput(OutputDataProvenanceItem provenanceItem, |
| ProcessorBinding procBinding, String currentWorkflowID) { |
| Element dataItemAsXML = getDataItemAsXML(provenanceItem); |
| List<Element> outputPorts = dataItemAsXML.getChildren("port"); |
| for (Element outputport : outputPorts) { |
| String portName = outputport.getAttributeValue("name"); |
| |
| // value type may vary |
| List<Element> valueElements = outputport.getChildren(); |
| if (valueElements != null && !valueElements.isEmpty()) { |
| Element valueEl = valueElements.get(0); // only really 1 child |
| |
| processPortBinding(valueEl, procBinding.getProcessorName(), |
| portName, procBinding.getIterationVector(), |
| getWorkflowRunId(), currentWorkflowID); |
| } |
| } |
| } |
| |
| /** |
| * this method reconciles values in varBindings across an datalink: Firstly, |
| * if vb's value is within a collection, _and_ it is copied from a value |
| * generated during a previous iteration, then this method propagates the |
| * list reference to that iteration value, which wouldn't have it. |
| * Conversely, if vb is going to be input to an iteration, then it's lost |
| * its containing list node, and we put it back in by looking at the |
| * corresponding predecessor |
| * |
| * @param vb |
| * @throws SQLException |
| */ |
| private void backpatchIterationResults(List<PortBinding> newBindings) throws SQLException { |
| logger.debug("backpatchIterationResults: start"); |
| for (PortBinding vb : newBindings) { |
| logger.debug("backpatchIterationResults: processing vb " |
| + vb.getProcessorName() + "/" + vb.getPortName() + "=" |
| + vb.getValue()); |
| |
| if (vb.getCollIDRef()!= null) // this is a member of a collection |
| logger.debug("...which is inside a collection "); |
| |
| // look for its antecedent |
| Map<String,String> queryConstraints = new HashMap<>(); |
| queryConstraints.put("destinationPortName", vb.getPortName()); |
| queryConstraints.put("destinationProcessorName", vb.getProcessorName()); |
| queryConstraints.put("workflowId", pq.getWorkflowIdsForRun(vb.getWorkflowRunId()).get(0)); // CHECK picking first element in list... |
| List<DataLink> incomingDataLinks = pq.getDataLinks(queryConstraints); |
| |
| // there can be only one -- but check that there is one! |
| if (incomingDataLinks.isEmpty()) |
| return; |
| |
| String sourcePname = incomingDataLinks.get(0).getSourceProcessorName(); |
| String sourceVname = incomingDataLinks.get(0).getSourcePortName(); |
| |
| logger.debug("antecedent: "+sourcePname+":"+sourceVname); |
| |
| // get the varbindings for this port and select the one with the same iteration vector as its successor |
| queryConstraints.clear(); |
| queryConstraints.put("VB.portName", sourceVname); |
| queryConstraints.put("V.processorName", sourcePname); |
| queryConstraints.put("VB.value", vb.getValue()); |
| queryConstraints.put("VB.workflowRunId", vb.getWorkflowRunId()); |
| |
| // reconcile |
| for (PortBinding b : pq.getPortBindings(queryConstraints)) { |
| logger.debug("backpatching " + sourceVname + " " + sourcePname); |
| |
| if (vb.getCollIDRef() != null && b.getCollIDRef() == null) { |
| logger.debug("successor " + vb.getPortName() |
| + " is in collection " + vb.getCollIDRef() |
| + " but pred " + b.getPortName() + " is not"); |
| logger.debug("putting " + b.getPortName() |
| + " in collection " + vb.getCollIDRef() |
| + " at pos " + vb.getPositionInColl()); |
| b.setCollIDRef(vb.getCollIDRef()); |
| b.setPositionInColl(vb.getPositionInColl()); |
| getPw().updatePortBinding(b); |
| |
| } else if (vb.getCollIDRef() == null && b.getCollIDRef() != null) { |
| logger.debug("successor " + vb.getPortName() |
| + " is NOT in collection but pred " |
| + b.getPortName() + " IS"); |
| logger.debug("putting " + vb.getPortName() |
| + " in collection " + b.getCollIDRef() + " at pos " |
| + b.getPositionInColl()); |
| vb.setCollIDRef(b.getCollIDRef()); |
| vb.setPositionInColl(b.getPositionInColl()); |
| getPw().updatePortBinding(vb); |
| } |
| } |
| } |
| } |
| |
| |
| /** |
| * create one new PortBinding record for each input port binding |
| * @param currentWorkflowID |
| */ |
| @SuppressWarnings("unchecked") |
| private void processInput(InputDataProvenanceItem provenanceItem, |
| ProcessorBinding procBinding, String currentWorkflowID) { |
| Element dataItemAsXML = getDataItemAsXML(provenanceItem); |
| int order = 0; |
| for (Element inputport : (List<Element>) dataItemAsXML.getChildren("port")) { |
| String portName = inputport.getAttributeValue("name"); |
| |
| try { |
| // add process order sequence to Port for this portName |
| Map<String, String> queryConstraints = new HashMap<>(); |
| queryConstraints.put("V.workflowId", currentWorkflowID); |
| queryConstraints.put("processorName", procBinding.getProcessorName()); |
| queryConstraints.put("portName", portName); |
| queryConstraints.put("isInputPort", "1"); |
| |
| Port v = getPq().getPorts(queryConstraints).get(0); |
| v.setIterationStrategyOrder(order++); |
| getPw().updatePort(v); |
| } catch (IndexOutOfBoundsException e) { |
| logger.error("Could not process input " + portName, e); |
| } catch (SQLException e1) { |
| logger.error("Could not process input " + portName, e1); |
| } |
| |
| // value type may vary |
| List<Element> valueElements = inputport.getChildren(); // hopefully |
| // in the right order... |
| if (valueElements != null && valueElements.size() > 0) { |
| Element valueEl = valueElements.get(0); // expect only 1 child |
| // processVarBinding(valueEl, processor, portName, iterationVector, |
| // dataflow); |
| |
| List<PortBinding> newBindings = processPortBinding(valueEl, |
| procBinding.getProcessorName(), portName, |
| procBinding.getIterationVector(), getWorkflowRunId(), |
| currentWorkflowID); |
| // this is a list whenever valueEl is of type list: in this case processVarBinding recursively |
| // processes all values within the collection, and generates one PortBinding record for each of them |
| |
| allInputVarBindings.addAll(newBindings); |
| |
| // // if the new binding involves list values, then check to see if they need to be propagated back to |
| // // results of iterations |
| |
| // Backpatching disabled as it is very inefficient and not needed |
| // for current Taverna usage |
| |
| try { |
| if (backpatching) |
| backpatchIterationResults(newBindings); |
| } catch (SQLException e) { |
| logger.warn("Problem with back patching iteration results", e); |
| } |
| } else { |
| if (valueElements != null) |
| logger.debug("port name " + portName + " " |
| + valueElements.size()); |
| else |
| logger.debug("valueElements is null for port name " |
| + portName); |
| } |
| } |
| } |
| |
| /** |
| * capture the default case where the value is not a list |
| * |
| * @param valueEl |
| * @param processorId |
| * @param portName |
| * @param iterationId |
| * @param workflowRunId |
| * @param currentWorkflowID |
| */ |
| private List<PortBinding> processPortBinding(Element valueEl, |
| String processorId, String portName, String iterationId, |
| String workflowRunId, String currentWorkflowID) { |
| // uses the defaults: |
| // collIdRef = null |
| // parentcollectionRef = null |
| // positionInCollection = 1 |
| return processPortBinding(valueEl, processorId, portName, null, 1, null, |
| iterationId, workflowRunId, null, currentWorkflowID); |
| } |
| |
| /** |
| * general case where value can be a list |
| * @param valueEl |
| * @param processorId |
| * @param portName |
| * @param collIdRef |
| * @param positionInCollection |
| * @param parentCollectionRef |
| * @param iterationId |
| * @param workflowRunId |
| * @param currentWorkflowID |
| */ |
| @SuppressWarnings("unchecked") |
| private List<PortBinding> processPortBinding(Element valueEl, |
| String processorId, String portName, String collIdRef, |
| int positionInCollection, String parentCollectionRef, |
| String iterationId, String workflowRunId, String itVector, |
| String currentWorkflowID) { |
| List<PortBinding> newBindings = new ArrayList<>(); |
| |
| String valueType = valueEl.getName(); |
| // logger.info("value element for " + processorId + ": " |
| // + valueType); |
| |
| String iterationVector = (itVector == null ? extractIterationVector(iterationId) |
| : itVector); |
| |
| PortBinding vb = new PortBinding(); |
| |
| vb.setWorkflowId(currentWorkflowID); |
| vb.setWorkflowRunId(workflowRunId); |
| vb.setProcessorName(processorId); |
| vb.setValueType(valueType); |
| vb.setPortName(portName); |
| vb.setCollIDRef(collIdRef); |
| vb.setPositionInColl(positionInCollection); |
| |
| newBindings.add(vb); |
| |
| if (valueType.equals("literal")) { |
| try { |
| vb.setIteration(iterationVector); |
| vb.setValue(valueEl.getAttributeValue("id")); |
| logger.debug("new input VB with workflowId="+currentWorkflowID+" processorId="+processorId+ |
| " valueType="+valueType+" portName="+portName+" collIdRef="+collIdRef+ |
| " position="+positionInCollection+" itvector="+iterationVector+ |
| " value="+vb.getValue()); |
| getPw().addPortBinding(vb); |
| } catch (SQLException e) { |
| logger.warn("Process Port Binding problem with provenance", e); |
| } |
| |
| } else if (valueType.equals("referenceSet")) { |
| vb.setIteration(iterationVector); |
| vb.setValue(valueEl.getAttributeValue("id")); |
| vb.setReference(valueEl.getChildText("reference")); |
| |
| logger.debug("new input VB with workflowId=" + currentWorkflowID |
| + " processorId=" + processorId + " valueType=" + valueType |
| + " portName=" + portName + " collIdRef=" + collIdRef |
| + " position=" + positionInCollection + " itvector=" |
| + iterationVector + " value=" + vb.getValue()); |
| |
| try { |
| getPw().addPortBinding(vb); |
| } catch (SQLException e) { |
| logger.debug("Problem processing var binding -- performing update instead of insert", e); //, e); |
| // try to update the existing record instead using the current collection info |
| |
| getPw().updatePortBinding(vb); |
| } |
| |
| } else if (valueType.equals("list")) { |
| logger.debug("input of type list"); |
| |
| // add entries to the Collection and to the PortBinding tables |
| // list id --> Collection.collId |
| |
| String collId = valueEl.getAttributeValue("id"); |
| try { |
| parentCollectionRef = getPw().addCollection(processorId, collId, |
| parentCollectionRef, iterationVector, portName, |
| workflowRunId); |
| |
| // iterate over each list element |
| List<Element> listElements = valueEl.getChildren(); |
| |
| positionInCollection = 1; // also use this as a suffix to extend the iteration vector |
| |
| // extend iteration vector to account for additional levels within the list |
| |
| String originalIterationVector = iterationVector; |
| |
| // children can be any base type, including list itself -- so |
| // use recursion |
| for (Element el : listElements) { |
| if (originalIterationVector.length() > 2) // vector is not empty |
| iterationVector = originalIterationVector.substring(0, |
| originalIterationVector.length()-1) + ","+ |
| Integer.toString(positionInCollection-1) + "]"; |
| else |
| iterationVector = "["+ (positionInCollection-1) + "]"; |
| |
| newBindings.addAll(processPortBinding(el, processorId, |
| portName, collId, positionInCollection, |
| parentCollectionRef, iterationId, workflowRunId, |
| iterationVector, currentWorkflowID)); |
| |
| positionInCollection++; |
| } |
| } catch (SQLException e) { |
| logger.warn("Problem processing var binding", e); |
| } |
| } else if (valueType.equals("error")) { |
| vb.setIteration(iterationVector); |
| vb.setValue(valueEl.getAttributeValue("id")); |
| vb.setReference(valueEl.getChildText("reference")); |
| try { |
| getPw().addPortBinding(vb); |
| } catch (SQLException e) { |
| logger.debug("Problem processing var binding -- performing update instead of insert", e); //, e); |
| // try to update the existing record instead using the current collection info |
| |
| getPw().updatePortBinding(vb); |
| } |
| } else { |
| logger.warn("unrecognized value type element for " |
| + processorId + ": " + valueType); |
| } |
| |
| return newBindings; |
| } |
| |
| |
| /** |
| * OBSOLETE: returns the iteration vector x,y,z,... from [x,y,z,...] |
| * <p/> |
| * now returns the vector itself -- this is still experimental |
| * |
| * @param iteration |
| * @return |
| */ |
| @Deprecated |
| String extractIterationVector(String iteration) { |
| return iteration; |
| } |
| |
| /** |
| * silly class to hold pairs of strings. any better way?? |
| * @author paolo |
| * |
| */ |
| class Pair { |
| String v1, v2; |
| |
| public Pair(String current, String workflowId) { |
| v1=current; v2=workflowId; |
| } |
| |
| /** |
| * @return the v1 |
| */ |
| public String getV1() { |
| return v1; |
| } |
| |
| /** |
| * @param v1 the v1 to set |
| */ |
| public void setV1(String v1) { |
| this.v1 = v1; |
| } |
| |
| /** |
| * @return the v2 |
| */ |
| public String getV2() { |
| return v2; |
| } |
| |
| /** |
| * @param v2 the v2 to set |
| */ |
| public void setV2(String v2) { |
| this.v2 = v2; |
| } |
| } |
| |
| @SuppressWarnings("deprecation") |
| public List<Pair> toposort(String dataflowName, String workflowRunId) throws SQLException { |
| |
| // String workflowId = pq.getworkflowIdForDataflow(dataflowName, workflowRunId); |
| String workflowId = pq.getWorkflowIdForExternalName(dataflowName); |
| |
| // fetch processors along with the count of their predecessors |
| Map<String, Integer> predecessorsCount = getPq().getPredecessorsCount(workflowRunId); |
| Map<String, List<String>> successorsOf = new HashMap<String, List<String>>(); |
| // List<String> procList = pq.getContainedProcessors(dataflowName, workflowRunId); |
| List<String> procList = pq.getContainedProcessors(dataflowName); |
| |
| for (String s:procList) { |
| List<String> successors = getPq().getSuccProcessors(s, workflowId, workflowRunId); |
| successorsOf.put(s, successors); |
| } |
| |
| List<Pair> sorted = tsort(procList, dataflowName, predecessorsCount, successorsOf, workflowId, workflowRunId); |
| |
| for (int i=0; i< sorted.size(); i++) { |
| String procName = sorted.get(i).getV1(); |
| |
| if (pq.isDataflow(procName) && !procName.equals(dataflowName)) { // handle weirdness: a dataflow is contained within itself.. |
| // recurse on procName |
| List<Pair> sortedSublist = toposort(procName, workflowRunId); |
| |
| // replace procName with sortedSublist in sorted |
| sorted.remove(i); |
| sorted.addAll(i, sortedSublist); |
| } |
| } |
| return sorted; |
| } |
| |
| |
| |
| /** |
| * @param procList |
| * @param predecessorsCount |
| * @param successorsOf |
| * @param workflowRunId |
| * @return |
| * @throws SQLException |
| */ |
| public List<Pair> tsort(List<String> procList, String dataflowName, |
| Map<String, Integer> predecessorsCount, |
| Map<String, List<String>> successorsOf, String workflowId, |
| String workflowRunId) throws SQLException { |
| List<Pair> l = new ArrayList<>(); // holds sorted elements |
| List<String> q = new ArrayList<>(); // temp queue |
| |
| // init queue with procList processors that have no predecessors |
| for (String proc:procList) |
| if (predecessorsCount.get(proc) == null || predecessorsCount.get(proc) == 0 && |
| !proc.equals(dataflowName)) |
| q.add(proc); |
| |
| while (!q.isEmpty()) { |
| String current = q.remove(0); |
| l.add(new Pair(current, workflowId)); |
| |
| List<String> successors = successorsOf.get(current); |
| |
| if (successors == null) |
| continue; |
| |
| // reduce the number of predecessors to each of the successors by one |
| // NB we must traverse an additional datalink through a nested workflow input if the successor is a dataflow!! |
| for (String succ : successors) { |
| // decrease edge count for each successor processor |
| predecessorsCount.put(succ, predecessorsCount.get(succ) - 1); |
| |
| if (predecessorsCount.get(succ) == 0 && !succ.equals(dataflowName)) |
| q.add(succ); |
| } |
| } // end loop on q |
| return l; |
| } |
| |
| @SuppressWarnings("deprecation") |
| public void propagateANL(String workflowRunId) throws SQLException { |
| String top = pq.getTopLevelDataflowName(workflowRunId); |
| |
| // ////////////////////// |
| // PHASE I: toposort the processors in the whole graph |
| // ////////////////////// |
| List<Pair> sorted = toposort(top, workflowRunId); |
| |
| List<String> sortedProcessors = new ArrayList<>(); |
| |
| for (Pair p : sorted) |
| sortedProcessors.add(p.getV1()); |
| |
| logger.debug("final sorted list of processors"); |
| for (Pair p : sorted) |
| logger.debug(p.getV1() + " in workflowId " + p.getV2()); |
| |
| // ////////////////////// |
| // PHASE II: traverse and set anl on each port |
| // ////////////////////// |
| |
| // // sorted processor names in L at this point |
| // // process them in order |
| for (Pair pnameInContext : sorted) { |
| // // process pname's inputs -- set ANL to be the DNL if not set in prior steps |
| String pname = pnameInContext.getV1(); |
| String workflowId = pnameInContext.getV2(); |
| |
| List<Port> inputs = getPq().getInputPorts(pname, workflowId); // null -> do not use instance (??) CHECK |
| |
| int totalANL = 0; |
| for (Port iv : inputs) { |
| |
| if (! iv.isResolvedDepthSet()) { |
| iv.setResolvedDepth(iv.getDepth()); |
| getPw().updatePort(iv); |
| } |
| |
| int delta_nl = iv.getResolvedDepth() - iv.getDepth(); |
| |
| // if delta_nl < 0 then Taverna wraps the value into a list --> use dnl(X) in this case |
| if (delta_nl < 0 ) delta_nl = 0;// CHECK iv.getTypedepth(); |
| |
| totalANL += delta_nl; |
| |
| // this should take care of the special case of the top level dataflow with inputs that have successors in the graph |
| // propagate this through all the links from this var |
| // List<Port> successors = getPq().getSuccVars(pname, iv.getVName(), workflowRunId); |
| |
| // for (Port v : successors) { |
| // v.setresolvedDepth(iv.getresolvedDepth()); |
| // getPw().updateVar(v); |
| // } |
| } |
| |
| // process pname's outputs -- set ANL based on the sum formula (see |
| // paper) |
| for (Port ov : getPq().getOutputPorts(pname, workflowId)) { |
| |
| ov.setResolvedDepth(ov.getDepth() + totalANL); |
| |
| logger.debug("anl for "+pname+":"+ov.getPortName()+" = "+(ov.getDepth() + totalANL)); |
| getPw().updatePort(ov); |
| |
| // propagate this through all the links from this var |
| for (Port v : getPq().getSuccPorts(pname, ov.getPortName(), workflowId)) { |
| List<Port> toBeProcessed = new ArrayList<>(); |
| toBeProcessed.add(v); |
| |
| if (v.getProcessorId() == null && v.isInputPort()) { // this is the input to a nested workflow |
| // String tempWorkflowId = pq.getworkflowIdForDataflow(v.getPName(), workflowRunId); |
| String tempWorkflowId = pq |
| .getWorkflowIdForExternalName(v |
| .getProcessorName()); |
| List<Port> realSuccessors = getPq().getSuccPorts( |
| v.getProcessorName(), v.getPortName(), |
| tempWorkflowId); |
| |
| toBeProcessed.remove(0); |
| toBeProcessed.addAll(realSuccessors); |
| |
| } else if (v.getProcessorId() == null && !v.isInputPort()) { // this is the output to a nested workflow |
| // String tempworkflowId = pq.getworkflowIdForDataflow(v.getPName(), workflowRunId); |
| List<Port> realSuccessors = getPq().getSuccPorts( |
| v.getProcessorName(), v.getPortName(), null); |
| |
| toBeProcessed.remove(0); |
| toBeProcessed.addAll(realSuccessors); |
| } |
| |
| for (Port v1 : toBeProcessed) { |
| v1.setResolvedDepth(ov.getResolvedDepth()); |
| logger.debug("anl for " + v1.getProcessorName() + ":" |
| + v1.getPortName() + " = " |
| + ov.getResolvedDepth()); |
| getPw().updatePort(v1); |
| } |
| } |
| } |
| } |
| } |
| |
| public void setPw(ProvenanceWriter pw) { |
| this.pw = pw; |
| } |
| |
| public ProvenanceWriter getPw() { |
| return pw; |
| } |
| |
| public void setPq(ProvenanceQuery pq) { |
| this.pq = pq; |
| } |
| |
| public ProvenanceQuery getPq() { |
| return pq; |
| } |
| |
| public void setWorkflowRunId(String workflowRunId) { |
| this.workflowRunId = workflowRunId; |
| } |
| |
| public String getWorkflowRunId() { |
| return workflowRunId; |
| } |
| |
| public void setWfdp(WorkflowDataProcessor wfdp) { |
| this.wfdp = wfdp; |
| } |
| |
| public WorkflowDataProcessor getWfdp() { |
| return wfdp; |
| } |
| } |