| /** |
| * |
| */ |
| package net.sf.taverna.t2.provenance.lineageservice.utils; |
| |
| import java.io.File; |
| import java.sql.SQLException; |
| import java.util.ArrayList; |
| import java.util.HashMap; |
| import java.util.HashSet; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.Set; |
| import java.util.UUID; |
| |
| import javax.xml.bind.JAXBException; |
| |
| import org.apache.log4j.Logger; |
| import org.openprovenance.model.Account; |
| import org.openprovenance.model.AccountId; |
| import org.openprovenance.model.Accounts; |
| import org.openprovenance.model.Artifact; |
| import org.openprovenance.model.ArtifactId; |
| import org.openprovenance.model.Artifacts; |
| import org.openprovenance.model.CausalDependencies; |
| import org.openprovenance.model.OPMDeserialiser; |
| import org.openprovenance.model.OPMGraph; |
| import org.openprovenance.model.ProcessId; |
| import org.openprovenance.model.Role; |
| import org.openprovenance.model.Used; |
| import org.openprovenance.model.WasControlledBy; |
| import org.openprovenance.model.WasDerivedFrom; |
| import org.openprovenance.model.WasGeneratedBy; |
| import org.openprovenance.model.WasTriggeredBy; |
| import org.tupeloproject.rdf.Resource; |
| |
| import net.sf.taverna.t2.provenance.lineageservice.EventProcessor; |
| import net.sf.taverna.t2.provenance.lineageservice.ProvenanceWriter; |
| |
| /** |
| * @author paolo |
| * imports foreign XML-serialized OPM graphs into the native Taverna provenance DB, so they can be queried using |
| * |
| * {@link net.sf.taverna.t2.provenance.lineageservice.utils.ProvenanceAnalysis} |
| */ |
| public class OPMImporter { |
| |
| private static final String PROC_NAME = "P"; |
| private static final String OPM_DEF_ACCOUNT = "OPMDefaultAccount"; |
| ProvenanceWriter pw; |
| OPMGraph graph; |
| |
| private static Logger logger = Logger.getLogger(OPMImporter.class); |
| |
| // Maps Account names to Taverna workflows |
| Map<String, String> accountToWorkflow = new HashMap<String, String>(); |
| Map<String, String> workflowToInstance = new HashMap<String, String>(); |
| |
| // maps wfName --> (wfName --> List(Var)) |
| private Map<String, Map<String, List<Var>>> usedVarsByAccount = new HashMap<String, Map<String, List<Var>>>(); |
| private Map<String, Map<String, List<Var>>> wgbVarsByAccount = new HashMap<String, Map<String, List<Var>>>(); |
| |
| // maps accountname --> (artifact -> List(Process)) |
| private Map<String, Map<String,List<String>>> wgbArtifactsByAccount = new HashMap<String, Map<String,List<String>>>(); |
| |
| // maps accountname --> (artifact -> List(Process)) |
| private Map<String, Map<String,List<String>>> usedArtifactsByAccount = new HashMap<String, Map<String,List<String>>>(); |
| |
| int procNameCounter; |
| private String masterAccount = OPM_DEF_ACCOUNT; |
| |
| public OPMImporter(ProvenanceWriter pw) { |
| this.pw = pw; |
| } |
| |
| /** |
| * orphan artifacts are those that are in the graph but are never used neither generated. this |
| * indicates some problem with the graph structure |
| * @return |
| */ |
| public List<String> getOrphanArtifacts() { |
| |
| List<String> allwgb = new ArrayList<String>(); |
| List<String> allUsed = new ArrayList<String>(); |
| List<String> orphans = new ArrayList<String>(); |
| |
| Artifacts allArtifacts = graph.getArtifacts(); |
| |
| for ( Map.Entry<String, Map<String,List<String>>>entry: wgbArtifactsByAccount.entrySet()) { |
| allwgb.addAll(entry.getValue().keySet()); |
| } |
| |
| for ( Map.Entry<String, Map<String,List<String>>>entry: usedArtifactsByAccount.entrySet()) { |
| allUsed.addAll(entry.getValue().keySet()); |
| } |
| |
| List<Artifact> artifacts = allArtifacts.getArtifact(); |
| |
| for (Artifact a:artifacts) { |
| if (!allwgb.contains(a.getId()) && !allUsed.contains(a.getId())) { |
| orphans.add(a.getId()); |
| } |
| } |
| return orphans; |
| } |
| |
| |
| public void importGraph(String XMLOPMGraphFilename) throws JAXBException, SQLException { |
| |
| try { |
| logger.info("OPM XML filename: "+XMLOPMGraphFilename); |
| |
| // deserialize an XML OPM graph from file |
| OPMDeserialiser deser = new OPMDeserialiser(); |
| graph = deser.deserialiseOPMGraph(new File(XMLOPMGraphFilename)); |
| |
| } catch (Exception e) { |
| logger.fatal("exception while deserializing -- unable to continue"); |
| logger.fatal(e.getMessage()); |
| return; |
| } |
| |
| logger.debug("XML graph deserialized"); |
| |
| // |
| // generates one pair <wfName, wfInstance> for each account in the graph |
| // |
| List<Account> allAccounts = null; |
| try { |
| Accounts accounts = graph.getAccounts(); |
| |
| // use this global account alongside any other that may be defined in the graph |
| generateWFFromAccount(OPM_DEF_ACCOUNT); |
| |
| if (accounts == null) { |
| logger.warn("this graph contains no accounts -- using only the default"); |
| } else { |
| for (Account acc:accounts.getAccount()) { |
| // writes both workflow and instance into the DB, updates accountToWorkflow |
| generateWFFromAccount(acc.getId()); |
| } |
| } |
| } catch (Exception e) { |
| logger.warn("exception while getting accounts for this graph"); |
| } |
| |
| // |
| // associates processes and ports to workflows and varbindings to corresponding wfInstances |
| // |
| List<Object> allDeps; |
| |
| // what have we got? |
| // retrieve all OPM relations from the graph |
| CausalDependencies cd = graph.getCausalDependencies(); |
| allDeps = cd.getUsedOrWasGeneratedByOrWasTriggeredBy(); |
| |
| // make sure these are processed in the right order: used, wgby, THEN wdf because this latter is derived from the first 2! |
| // so collect them into sets and process them separately |
| |
| Set<WasGeneratedBy> wgbSet = new HashSet<WasGeneratedBy>(); |
| Set<Used> usedSet = new HashSet<Used>(); |
| Set<WasDerivedFrom> wdfSet = new HashSet<WasDerivedFrom>(); |
| Set<WasControlledBy> wcbSet = new HashSet<WasControlledBy>(); |
| Set<WasTriggeredBy> wtbSet = new HashSet<WasTriggeredBy>(); |
| |
| for (Object dep:allDeps) { |
| logger.info("dependency of type: "+dep.getClass().getName()); |
| |
| if (dep instanceof org.openprovenance.model.WasGeneratedBy) { |
| wgbSet.add((WasGeneratedBy) dep); |
| } else if (dep instanceof org.openprovenance.model.Used) { |
| usedSet.add((Used) dep); |
| } else if (dep instanceof org.openprovenance.model.WasDerivedFrom) { |
| wdfSet.add((WasDerivedFrom) dep); |
| } else if (dep instanceof org.openprovenance.model.WasControlledBy) { |
| wcbSet.add((WasControlledBy) dep); |
| } else if (dep instanceof org.openprovenance.model.WasTriggeredBy) { |
| wtbSet.add((WasTriggeredBy) dep); |
| } |
| } |
| |
| // process these in the correct order |
| for (WasGeneratedBy dep: wgbSet) processWGBy(dep); |
| |
| for (Used dep:usedSet) processUsed(dep); |
| |
| for (WasDerivedFrom dep: wdfSet) processWDF(dep); |
| |
| // we actually ignore the others... |
| |
| // ********* |
| // complete the induced graph by building arcs using the Artfact -> [Var] maps |
| // ********* |
| |
| List<String> accountNames = new ArrayList<String>(); |
| |
| accountNames.add(OPM_DEF_ACCOUNT); |
| |
| if (allAccounts != null) |
| for (Account acc:allAccounts) { accountNames.add(acc.getId()); } |
| |
| for (String acc:accountNames) { |
| |
| String wfName = accountToWorkflow.get(acc); |
| |
| Map<String, List<Var>> usedVars = usedVarsByAccount.get(wfName); |
| Map<String, List<Var>> wgbVars = wgbVarsByAccount.get(wfName); |
| |
| |
| // install an Arc from each wgb var to each used var when the artifact is the same |
| for (Map.Entry<String, List<Var>> entry:wgbVars.entrySet()) { |
| |
| // all Vars for this artifact get connected to all corresponding Vars in used |
| List<Var> sourceVars = entry.getValue(); |
| List<Var> targetVars = usedVars.get(entry.getKey()); |
| |
| if (sourceVars == null || targetVars == null) continue; |
| |
| // create an arc from each sourceVar to each targetVar |
| // note that we expect a single targetVar, but this is not guaranteed |
| for (Var sourceVar:sourceVars) { |
| for (Var targetVar:targetVars) { |
| pw.addArc(sourceVar, targetVar, wfName); |
| } |
| } |
| } |
| } |
| } |
| |
| |
| |
| private void generateWFFromAccount(String accName) throws SQLException { |
| |
| String wfName = accName+"-"+UUID.randomUUID().toString(); |
| String wfInstance = accName+"-"+UUID.randomUUID().toString(); |
| |
| pw.addWFId(wfName); |
| pw.addWFInstanceId(wfName, wfInstance); |
| accountToWorkflow.put(accName, wfName); |
| workflowToInstance.put(wfName, wfInstance); |
| |
| logger.info("generated wfName "+wfName+" and instance "+wfInstance+" for account "+accName); |
| } |
| |
| |
| private Var processProcessArtifactDep(String procName, String value, String varName, |
| String wfName, String wfInstance, boolean artifactIsInput) { |
| |
| // generate Process |
| try { |
| pw.addProcessor(procName, wfName); |
| logger.debug("added processor "+procName+" to workflow "+wfName); |
| } catch (SQLException e) { // no panic -- just catch duplicates |
| logger.warn(e.getMessage()); |
| } |
| |
| // generate Var |
| Var outputVar = new Var(); |
| |
| outputVar.setPName(procName); |
| outputVar.setWfInstanceRef(wfName); |
| outputVar.setVName(varName); |
| outputVar.setTypeNestingLevel(0); |
| outputVar.setInput(artifactIsInput); // wgby is an output var |
| |
| List<Var> vars = new ArrayList<Var>(); // only one Var in the list |
| vars.add(outputVar); |
| |
| try { |
| pw.addVariables(vars, wfName); |
| logger.debug("added var "+varName+" to workflow "+wfName); |
| } catch (SQLException e) { // no panic -- just catch duplicates |
| System.out.println(e.getMessage()); |
| e.printStackTrace(); |
| } |
| |
| // generate VarBindings (wfInstance, procName, varname, value) |
| VarBinding vb = new VarBinding(); |
| |
| vb.setWfInstanceRef(wfInstance); |
| vb.setPNameRef(procName); |
| vb.setVarNameRef(varName); |
| vb.setValue(value); |
| vb.setIterationVector("[]"); |
| |
| try { |
| pw.addVarBinding(vb); |
| logger.debug("added var binding with value "+value+" to workflow instance "+wfInstance); |
| } catch (SQLException e) { // no panic -- just catch duplicates |
| System.out.println(e.getMessage()); |
| } |
| |
| return outputVar; |
| } |
| |
| |
| /** |
| * generic processing of a process-artifact dependency |
| * @param procID |
| * @param artId |
| * @param role |
| * @param wfName |
| * @param wfInstance |
| * @param artifactIsInput |
| */ |
| private Var processProcessArtifactDep(ProcessId procID, ArtifactId artId, Role role, |
| String wfName, String wfInstance, boolean artifactIsInput) { |
| |
| String procName = ((org.openprovenance.model.Process) procID.getId()).getId(); |
| String varName = role.getValue(); |
| String value = ((Artifact) artId.getId()).getId(); |
| |
| return processProcessArtifactDep(procName, value, varName, wfName, wfInstance, artifactIsInput); |
| } |
| |
| |
| |
| /** |
| * used(A,R,P,acc): generates a process for P, a Var for (P,R) an <em>input</em> VarBinding for (P,R,A) |
| * <br/> this is very similar to {@link #processWGBy(WasGeneratedBy)} |
| * @param dep |
| */ |
| private void processUsed(Used dep) { |
| |
| // Acc determines the scope -- this dep may belong to > 1 account, deal with all of them |
| List<AccountId> accountIDs = dep.getAccount(); |
| ProcessId procID = dep.getEffect(); |
| ArtifactId artId = dep.getCause(); |
| Role role = dep.getRole(); |
| |
| List<String> accNames = new ArrayList<String>(); |
| |
| for (AccountId accId:accountIDs) { |
| accNames.add(((Account) accId.getId()).getId()); |
| } |
| |
| accNames.add(OPM_DEF_ACCOUNT); |
| |
| for (String accName: accNames) { |
| String wfName = accountToWorkflow.get(accName); |
| String wfInstance = workflowToInstance.get(wfName); |
| |
| Var v = processProcessArtifactDep(procID, artId, role, wfName, wfInstance, true); // true -> input var |
| |
| // save the mapping from artifact to var for this account |
| Map<String, List<Var>> usedVars = usedVarsByAccount.get(wfName); |
| if (usedVars == null) { |
| usedVars = new HashMap<String, List<Var>>(); |
| usedVarsByAccount.put(wfName, usedVars); |
| } |
| List<Var> vars = usedVars.get(((Artifact) artId.getId()).getId()); |
| |
| if (vars == null) { |
| vars = new ArrayList<Var>(); |
| usedVars.put(((Artifact) artId.getId()).getId(), vars); |
| } |
| vars.add(v); |
| |
| // record the fact that (procID used artId) within this account |
| Map<String, List<String>> usedArtifacts = usedArtifactsByAccount.get(accName); |
| if (usedArtifacts == null) { |
| usedArtifacts = new HashMap<String, List<String>>(); |
| usedArtifactsByAccount.put(accName, usedArtifacts); |
| } |
| |
| String artifactName = ((Artifact) artId.getId()).getId(); |
| List<String> processes = usedArtifacts.get(artifactName); |
| if (processes == null) { |
| processes = new ArrayList<String>(); |
| usedArtifacts.put(artifactName, processes); |
| } |
| processes.add(((org.openprovenance.model.Process) procID.getId()).getId()); |
| |
| } |
| } |
| |
| |
| |
| /** |
| * wgb(A,R,P,Acc): generates a Process for P, a Var for (P,R), an <em>output</em> VarBinding for (P,R,A) |
| * This is all relative to the workflow corresponding to account Acc. <br/> |
| * this method also records all |
| * @param dep |
| * @throws SQLException |
| */ |
| private void processWGBy(WasGeneratedBy dep) { |
| |
| // Acc determines the scope -- this dep may belong to > 1 account, deal with all of them |
| List<AccountId> accountIDs = dep.getAccount(); |
| ProcessId procID = dep.getCause(); |
| ArtifactId artId = dep.getEffect(); |
| Role role = dep.getRole(); |
| |
| List<String> accNames = new ArrayList<String>(); |
| |
| for (AccountId accId:accountIDs) { |
| accNames.add(((Account) accId.getId()).getId()); |
| } |
| |
| accNames.add(OPM_DEF_ACCOUNT); |
| |
| for (String accName:accNames) { |
| |
| String wfName = accountToWorkflow.get(accName); |
| String wfInstance = workflowToInstance.get(wfName); |
| |
| Var v = processProcessArtifactDep(procID, artId, role, wfName, wfInstance, false); // false -> output var |
| |
| Map<String, List<Var>> wgbVars = wgbVarsByAccount.get(wfName); |
| if (wgbVars == null) { |
| wgbVars = new HashMap<String, List<Var>>(); |
| wgbVarsByAccount.put(wfName, wgbVars); |
| } |
| |
| List<Var> vars = wgbVars.get(((Artifact) artId.getId()).getId()); |
| if (vars == null) { |
| vars = new ArrayList<Var>(); |
| wgbVars.put(((Artifact) artId.getId()).getId(), vars); |
| } |
| vars.add(v); |
| |
| // record the fact that (artId wgby procID) within this account |
| Map<String, List<String>> wgbArtifacts = wgbArtifactsByAccount.get(accName); |
| if (wgbArtifacts == null) { |
| wgbArtifacts = new HashMap<String, List<String>>(); |
| wgbArtifactsByAccount.put(accName, wgbArtifacts); |
| } |
| |
| String artifactName = ((Artifact) artId.getId()).getId(); |
| List<String> processes = wgbArtifacts.get(artifactName); |
| if (processes == null) { |
| processes = new ArrayList<String>(); |
| wgbArtifacts.put(artifactName, processes); |
| } |
| processes.add(((org.openprovenance.model.Process) procID.getId()).getId()); |
| } |
| } |
| |
| |
| /** |
| * this is a dep between two artifacts A1 and A2. |
| * In Taverna we need to postulate the existence of a Process to mediate this dependency. |
| * <br/> However, we only need to account for this dep if it cannot be inferred from a combination of used and wgby that |
| * involve A1 and A2: if there exists P s.t. A1 wgby P and P used A2, then this dep. is redundant in the DB and we can safely ignore it. |
| * <br/> note that this analysis is conducted regardless of the accounts in which the wgby and used properties appear, as one account could |
| * be used deliberately to |
| * This will unclutter the DB. |
| * @param dep |
| */ |
| private void processWDF(WasDerivedFrom dep) { |
| List<AccountId> accountIDs = dep.getAccount(); |
| ArtifactId fromArtId = dep.getCause(); |
| ArtifactId toArtId = dep.getEffect(); |
| |
| List<String> accNames = new ArrayList<String>(); |
| |
| for (AccountId accId:accountIDs) { |
| accNames.add(((Account) accId.getId()).getId()); |
| } |
| |
| accNames.add(OPM_DEF_ACCOUNT); |
| |
| for (String accName:accNames) { |
| |
| int varCounter = 0; |
| |
| String wfName = accountToWorkflow.get(accName); |
| String wfInstance = workflowToInstance.get(wfName); |
| |
| List<String> generatingProcesses=null, usingProcesses=null; |
| |
| // look for any triple fromArtId wasGeneratedBy P within this account |
| Map<String, List<String>> wgbArtifacts = wgbArtifactsByAccount.get(accName); |
| |
| if (wgbArtifacts != null) { |
| String toArtifactName = ((Artifact) toArtId.getId()).getId(); |
| generatingProcesses = wgbArtifacts.get(toArtifactName); |
| if (generatingProcesses != null) { |
| logger.debug("artifact "+toArtifactName+" wgby one or more processes..."); |
| } |
| } |
| |
| // look for any triple (P used toArtId) within this account |
| |
| // get map for this account |
| Map<String, List<String>> usedArtifacts = usedArtifactsByAccount.get(accName); |
| |
| if (usedArtifacts != null) { |
| String fromArtifactName = ((Artifact) fromArtId.getId()).getId(); |
| usingProcesses = usedArtifacts.get(fromArtifactName); |
| if (usingProcesses != null) { |
| logger.debug("artifact "+fromArtifactName+" was used by one or more processes..."); |
| } |
| } |
| |
| boolean found = false; |
| if (generatingProcesses != null && usingProcesses != null) { |
| for (String gp:generatingProcesses) { |
| if (usingProcesses.contains(gp)) { |
| logger.debug("intersection between process sets not empty, this WDF is redundant"); |
| found = true; |
| break; |
| } |
| } |
| } |
| |
| // only postulate a new process if the native one has not been found |
| if (found) return; |
| |
| String procName = PROC_NAME+"_"+procNameCounter++; |
| |
| try { |
| pw.addProcessor(procName, wfName); |
| logger.info("created non-native added processor "+procName+" to workflow "+wfName); |
| } catch (SQLException e) { // no panic -- just catch duplicates |
| logger.warn(e.getMessage()); |
| } |
| |
| // create a role for fromArtId from the procName |
| String inputVarName = procName+"_"+varCounter++; |
| String inputValue = ((Artifact) fromArtId.getId()).getId(); |
| |
| // add to DB |
| processProcessArtifactDep(procName, inputValue, inputVarName, wfName, wfInstance, true); |
| |
| // create a role for toArtId |
| String outputVarName = procName+"_"+varCounter++; |
| String outputValue = ((Artifact) toArtId.getId()).getId(); |
| |
| // add to DB |
| processProcessArtifactDep(procName, outputValue, outputVarName, wfName, wfInstance, false); |
| } |
| } |
| |
| |
| |
| /** |
| * there is no counterpart in Taverna provenance for this dependency. This is translated into |
| * a control link but this is not part of the provenance model |
| * @param dep |
| */ |
| private void processWTBy(WasTriggeredBy dep) { |
| |
| } |
| |
| |
| /** |
| * there is no counterpart in Taverna for this dependency, as it involves agents which we don't support |
| * @param dep |
| */ |
| private void processWCBy(WasControlledBy dep) { |
| // TODO Auto-generated method stub |
| } |
| |
| |
| |
| } |