blob: 901120ced46e0e43bc988241740af59e9734684a [file] [log] [blame]
* Copyright (C) 2007 The University of Manchester
* Modifications to the initial code base are copyright of their
* respective authors, or their employers as appropriate.
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU Lesser General Public License
* as published by the Free Software Foundation; either version 2.1 of
* the License, or (at your option) any later version.
* This program is distributed in the hope that it will be useful, but
* WITHOUT ANY WARRANTY; without even the implied warranty of
* Lesser General Public License for more details.
* You should have received a copy of the GNU Lesser General Public
* License along with this program; if not, write to the Free Software
* Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
package org.apache.taverna.provenance.lineageservice;
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
import static org.apache.taverna.provenance.connector.AbstractProvenanceConnector.DataflowInvocationTable.DataflowInvocation;
import static org.apache.taverna.provenance.connector.AbstractProvenanceConnector.DataflowInvocationTable.parentProcessorEnactmentId;
import static org.apache.taverna.provenance.lineageservice.utils.ProvenanceProcessor.DATAFLOW_ACTIVITY;
import java.sql.Blob;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import java.sql.Timestamp;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.ListIterator;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import org.apache.taverna.provenance.connector.AbstractProvenanceConnector.CollectionTable;
import org.apache.taverna.provenance.connector.AbstractProvenanceConnector.DataBindingTable;
import org.apache.taverna.provenance.connector.AbstractProvenanceConnector.DataflowInvocationTable;
import org.apache.taverna.provenance.connector.AbstractProvenanceConnector.ProcessorEnactmentTable;
import org.apache.taverna.provenance.lineageservice.utils.Collection;
import org.apache.taverna.provenance.lineageservice.utils.DDRecord;
import org.apache.taverna.provenance.lineageservice.utils.DataLink;
import org.apache.taverna.provenance.lineageservice.utils.DataflowInvocation;
import org.apache.taverna.provenance.lineageservice.utils.NestedListNode;
import org.apache.taverna.provenance.lineageservice.utils.Port;
import org.apache.taverna.provenance.lineageservice.utils.PortBinding;
import org.apache.taverna.provenance.lineageservice.utils.ProcessorEnactment;
import org.apache.taverna.provenance.lineageservice.utils.ProvenanceProcessor;
import org.apache.taverna.provenance.lineageservice.utils.Workflow;
import org.apache.taverna.provenance.lineageservice.utils.WorkflowTree;
import org.apache.taverna.provenance.lineageservice.utils.WorkflowRun;
import org.apache.log4j.Logger;
import org.jdom.Document;
import org.jdom.Element;
import org.apache.taverna.configuration.database.DatabaseManager;
* Handles all the querying of provenance items in the database layer. Uses
* standard SQL so all specific instances of this class can extend this writer
* to handle all of the db queries
* @author Paolo Missier
* @author Ian Dunlop
* @author Stuart Owen
public abstract class ProvenanceQuery {
protected Logger logger = Logger.getLogger(ProvenanceQuery.class);
private final DatabaseManager databaseManager;
public ProvenanceQuery(DatabaseManager databaseManager) {
this.databaseManager = databaseManager;
public Connection getConnection() throws InstantiationException,
IllegalAccessException, ClassNotFoundException, SQLException {
return databaseManager.getConnection();
private Q query(String baseQuery) {
return new Q(baseQuery);
private class Q {
private String q;
private Map<String,String>where;
private List<String> order;
Q(String baseQuery) {
q = baseQuery;
public Q where(String key, String value) {
if (where == null)
where = new HashMap<>();
where.put(key, value);
return this;
public Q where(Map<String, String> clauses) {
if (where == null)
where = new HashMap<>();
return this;
public Q orderBy(String key) {
if (order == null)
order = new ArrayList<>();
return this;
public ResultSet exec(Statement statement) throws SQLException {
return statement.executeQuery(query());
public String query() {
return addOrderByToQuery(addWhereClauseToQuery(q, where, false), order, false);
* implements a set of query constraints of the form var = value into a
* WHERE clause
* @param q
* @param queryConstraints
* @return
protected String addWhereClauseToQuery(String q,
Map<String, String> queryConstraints, boolean terminate) {
// complete query according to constraints
StringBuilder buffer = new StringBuilder(q);
String sep = " WHERE ";
if (queryConstraints != null)
for (Entry<String, String> entry : queryConstraints.entrySet()) {
.append(" = \'").append(entry.getValue()).append("\' ");
sep = " AND ";
return buffer.toString();
protected String addOrderByToQuery(String q, List<String> orderAttr,
boolean terminate) {
// complete query according to constraints
StringBuilder buffer = new StringBuilder(q);
String sep = " ORDER BY ";
if (orderAttr != null)
for (String attr : orderAttr) {
sep = ",";
return buffer.toString();
* select Port records that satisfy constraints
public List<Port> getPorts(Map<String, String> queryConstraints)
throws SQLException {
List<Port> result = new ArrayList<>();
try (Connection connection = getConnection();
Statement stmt = connection.createStatement();
ResultSet rs = query(
+ "JOIN WorkflowRun W ON W.workflowId = V.workflowId")
.orderBy("V.iterationStrategyOrder").exec(stmt)) {
while ( {
Port aPort = new Port();
if (rs.getString("resolvedDepth") != null)
} catch (InstantiationException | IllegalAccessException | ClassNotFoundException e) {
logger.warn("Could not execute query", e);
return result;
* return the input variables for a given processor and a workflowRunId
* @param pname
* @param workflowRunId
* @return list of input variables
* @throws SQLException
public List<Port> getInputPorts(String pname, String wfID)
throws SQLException {
// get (var, proc) from Port to see if it's input/output
Map<String, String> varQueryConstraints = new HashMap<>();
varQueryConstraints.put("V.workflowId", wfID);
varQueryConstraints.put("V.processorName", pname);
varQueryConstraints.put("V.isInputPort", "1");
return getPorts(varQueryConstraints);
* return the output variables for a given processor and a workflowRunId
* @param pname
* @param workflowRunId
* @return list of output variables
* @throws SQLException
public List<Port> getOutputPorts(String pname, String wfID)
throws SQLException {
// get (var, proc) from Port to see if it's input/output
Map<String, String> varQueryConstraints = new HashMap<>();
varQueryConstraints.put("V.workflowId", wfID);
varQueryConstraints.put("V.processorName", pname);
varQueryConstraints.put("V.isInputPort", "0");
return getPorts(varQueryConstraints);
* selects all Datalinks
* @param queryConstraints
* @return
* @throws SQLException
public List<DataLink> getDataLinks(Map<String, String> queryConstraints)
throws SQLException {
List<DataLink> result = new ArrayList<>();
String q = addWhereClauseToQuery("SELECT A.* FROM Datalink A", queryConstraints, true);
try (Connection connection = getConnection();
Statement stmt = connection.createStatement()) {
ResultSet rs = stmt.executeQuery(q);
while ( {
DataLink aDataLink = new DataLink();
} catch (InstantiationException | IllegalAccessException | ClassNotFoundException e) {
logger.warn("Could not execute query", e);
return result;
public String getTopLevelWorkflowIdForRun(String runID) throws SQLException {
for (Workflow w : getWorkflowsForRun(runID))
if (w.getParentWorkflowId() == null)
return w.getWorkflowId();
return null;
* returns the names of all workflows (top level + nested) for a given runID
* @param runID
* @return
* @throws SQLException
public List<String> getWorkflowIdsForRun(String runID) throws SQLException {
List<String> workflowIds = new ArrayList<>();
for (Workflow w : getWorkflowsForRun(runID))
return workflowIds;
* returns the workflows associated to a single runID
* @param runID
* @return
* @throws SQLException
public List<Workflow> getWorkflowsForRun(String runID) throws SQLException {
List<Workflow> result = new ArrayList<>();
String q = "SELECT DISTINCT W.* FROM WorkflowRun I JOIN Workflow W ON I.workflowId = W.workflowId WHERE workflowRunId = ?";
try (Connection connection = getConnection();
PreparedStatement stmt = connection.prepareStatement(q)) {
stmt.setString(1, runID);
ResultSet rs = stmt.executeQuery();
while ( {
Workflow w = new Workflow();
} catch (InstantiationException | IllegalAccessException
| ClassNotFoundException e) {
logger.error("Error finding the workflow reference", e);
return result;
public String getLatestRunID() throws SQLException {
String q = "SELECT workflowRunId FROM WorkflowRun ORDER BY timestamp DESC";
try (Connection connection = getConnection();
PreparedStatement ps = connection.prepareStatement(q)) {
ResultSet rs = ps.executeQuery();
if (
return rs.getString("workflowRunId");
} catch (Exception e) {
logger.warn("Could not execute query", e);
return null;
* @param dataflowID
* @param conditions currently only understands "from" and "to" as timestamps for range queries
* @return
* @throws SQLException
public List<WorkflowRun> getRuns(String dataflowID,
Map<String, String> conditions) throws SQLException {
List<WorkflowRun> result = new ArrayList<>();
StringBuilder q = new StringBuilder(
"SELECT * FROM WorkflowRun I join Workflow W on I.workflowId = W.workflowId");
List<String> conds = new ArrayList<>();
if (dataflowID != null)
conds.add("I.workflowId = '" + dataflowID + "'");
if (conditions != null) {
if (conditions.get("from") != null)
conds.add("timestamp >= " + conditions.get("from"));
if (conditions.get("to") != null)
conds.add("timestamp <= " + conditions.get("to"));
String sep = " where ";
for (String cond : conds) {
sep = " and ";
q.append(" ORDER BY timestamp desc ");
try (Connection connection = getConnection();
PreparedStatement ps = connection.prepareStatement(q.toString())) {
ResultSet rs = ps.executeQuery();
while ( {
WorkflowRun i = new WorkflowRun();
Blob blob = rs.getBlob("dataflow");
long length = blob.length();
blob.getBytes(1, (int) length);
i.setDataflowBlob(blob.getBytes(1, (int) length));
} catch (InstantiationException | IllegalAccessException | ClassNotFoundException e) {
logger.warn("Could not execute query", e);
return result;
* @param constraints
* a Map columnName -> value that defines the query constraints.
* Note: columnName must be fully qualified. This is not done
* well at the moment, i.e., processorNameRef should be
* PortBinding.processorNameRef to avoid ambiguities
* @return
* @throws SQLException
public List<PortBinding> getPortBindings(Map<String, String> constraints)
throws SQLException {
List<PortBinding> result = new ArrayList<>();
String q = "SELECT * FROM PortBinding VB "
+ "JOIN Port V ON VB.portName = V.portName "
+ "AND VB.processorNameRef = V.processorName "
+ "AND VB.workflowId = V.workflowId ";
try (Connection connection = getConnection();
Statement stmt = connection.createStatement();) {
ResultSet rs = stmt.executeQuery(addWhereClauseToQuery(q,
constraints, true));
while ( {
PortBinding vb = new PortBinding();
if (rs.getString("collIdRef") == null || rs.getString("collIdRef").equals("null")) {
} else {
} catch (Exception e) {
logger.warn("Add VB failed", e);
return result;
public List<NestedListNode> getNestedListNodes(
Map<String, String> constraints) throws SQLException {
List<NestedListNode> result = new ArrayList<>();
try (Connection connection = getConnection();
Statement stmt = connection.createStatement();
ResultSet rs = query("SELECT * FROM Collection C ").where(
constraints).exec(stmt)) {
while ( {
NestedListNode nln = new NestedListNode();
} catch (InstantiationException | IllegalAccessException
| ClassNotFoundException e) {
logger.error("Error finding the nested list nodes", e);
return result;
public Map<String, Integer> getPredecessorsCount(String workflowRunId) {
Map<String, Integer> result = new HashMap<>();
// get all datalinks for the entire workflow structure for this particular instance
try (Connection connection = getConnection()) {
PreparedStatement ps = connection
.prepareStatement("SELECT A.sourceProcessorName as source , A.destinationProcessorName as sink, A.workflowId as workflowId1, W1.workflowId as workflowId2, W2.workflowId as workflowId3 "
+ "FROM Datalink A join WorkflowRun I on A.workflowId = I.workflowId "
+ "left outer join Workflow W1 on W1.externalName = A.sourceProcessorName "
+ "left outer join Workflow W2 on W2.externalName = A.destinationProcessorName "
+ "where I.workflowRunId = ?");
ps.setString(1, workflowRunId);
ResultSet rs = ps.executeQuery();
while ( {
String sink = rs.getString("sink");
String source = rs.getString("source");
if (result.get(sink) == null)
result.put(sink, 0);
String name1 = rs.getString("workflowId1");
String name2 = rs.getString("workflowId2");
String name3 = rs.getString("workflowId3");
if (isDataflow(source) && name1.equals(name2))
if (isDataflow(sink) && name1.equals(name3))
result.put(sink, result.get(sink) + 1);
} catch (InstantiationException | SQLException | IllegalAccessException | ClassNotFoundException e1) {
logger.warn("Could not execute query", e1);
return result;
* new impl of getProcessorsIncomingLinks whicih avoids complications due to nesting, and relies on the workflowRunId
* rather than the workflowId
* @param workflowRunId
* @return
public Map<String, Integer> getPredecessorsCountOld(String workflowRunId) {
Map<String, Integer> result = new HashMap<>();
// get all datalinks for the entire workflow structure for this particular instance
try (Connection connection = getConnection()) {
PreparedStatement ps = connection
.prepareStatement("SELECT destinationProcessorName, P1.firstActivityClass, count(*) as pred "
+ " FROM Datalink A join WorkflowRun I on A.workflowId = I.workflowId "
+ " join Processor P1 on P1.processorName = A.destinationProcessorName "
+ " join Processor P2 on P2.processorName = A.sourceProcessorName "
+ " where I.workflowRunId = ? "
+ " and P2.firstActivityClass <> '"
+ "' "
+ " and ((P1.firstActivityClass = '"
+ "' and P1.workflowId = A.workflowId) or "
+ " (P1.firstActivityClass <> '"
+ "' )) "
+ " group by A.destinationProcessorName, firstActivityClass");
ps.setString(1, workflowRunId);
ResultSet rs = ps.executeQuery();
while (
new Integer(rs.getInt("pred")));
} catch (InstantiationException | IllegalAccessException | ClassNotFoundException | SQLException e1) {
logger.warn("Could not execute query", e1);
return result;
* used in the toposort phase -- propagation of anl() values through the
* graph
* @param workflowId
* reference to static wf name
* @return a map <processor name> --> <incoming links count> for each
* processor, without counting the datalinks from the dataflow input to
* processors. So a processor is at the root of the graph if it has
* no incoming links, or all of its incoming links are from dataflow
* inputs.<br/>
* Note: this must be checked for processors that are roots of
* sub-flows... are these counted as top-level root nodes??
public Map<String, Integer> getProcessorsIncomingLinks(String workflowId)
throws SQLException {
Map<String, Integer> result = new HashMap<>();
String currentWorkflowProcessor = null;
String sql = "SELECT processorName, firstActivityClass FROM Processor "
+ "WHERE workflowId = ?";
try (Connection c = getConnection()) {
try (PreparedStatement ps = c.prepareStatement(sql)) {
ps.setString(1, workflowId);
ResultSet rs = ps.executeQuery();
while ( {
// PM CHECK 6/09
if (rs.getString("firstActivityClass").equals(
currentWorkflowProcessor = rs
.getString("processorName");"currentWorkflowProcessor = "
+ currentWorkflowProcessor);
result.put(rs.getString("processorName"), 0);
* fetch the name of the top-level dataflow. We use this to exclude
* datalinks outgoing from its inputs
// CHECK below -- gets confused on nested workflows
String parentWF = getParentOfWorkflow(workflowId);
if (parentWF == null)
parentWF = workflowId; // null parent means we are the top
logger.debug("parent WF: " + parentWF);
// get nested dataflows -- we want to avoid these in the toposort algorithm
List<ProvenanceProcessor> procs = getProcessorsShallow(c,
StringBuilder q = new StringBuilder("SELECT destinationProcessorName, count(*) AS cnt ");
q.append("FROM Datalink WHERE workflowId = \'").append(workflowId)
.append("\' AND destinationProcessorName NOT IN (");
String sep = "";
for (ProvenanceProcessor p : procs) {
sep = ",";
q.append(") GROUP BY destinationProcessorName");"executing \n" + q);
try (Statement stmt = c.createStatement();
ResultSet rs = stmt.executeQuery(q.toString())) {
while (
if (!rs.getString("destinationProcessorName").equals(
result.put(currentWorkflowProcessor, 0);
} catch (InstantiationException | IllegalAccessException | ClassNotFoundException e) {
logger.warn("Could not execute query", e);
return result;
public List<Port> getSuccPorts(String processorName, String portName,
String workflowId) throws SQLException {
List<Port> result = new ArrayList<>();
String sql = "SELECT v.* "
+ "FROM Datalink a JOIN Port v ON a.destinationProcessorName = v.processorName "
+ "AND a.destinationPortName = v.portName "
+ "AND a.workflowId = v.workflowId "
+ "WHERE sourcePortName=? AND sourceProcessorName=?";
if (workflowId != null)
sql += " AND a.workflowId=?";
try (Connection connection = getConnection();
PreparedStatement ps = connection.prepareStatement(sql)) {
ps.setString(1, portName);
ps.setString(2, processorName);
if (workflowId != null)
ps.setString(3, workflowId);
ResultSet rs = ps.executeQuery();
while ( {
Port aPort = new Port();
if (rs.getString("resolvedDepth") != null)
} catch (InstantiationException | IllegalAccessException | ClassNotFoundException e) {
logger.warn("Could not execute query", e);
return result;
public List<String> getSuccProcessors(String pName, String workflowId,
String workflowRunId) throws SQLException {
List<String> result = new ArrayList<>();
String sql = "SELECT distinct destinationProcessorName FROM Datalink A JOIN WorkflowRun I on A.workflowId = I.workflowId "
+ "WHERE A.workflowId = ? and I.workflowRunId = ? AND sourceProcessorName = ?";
try (Connection connection = getConnection();
PreparedStatement ps = connection.prepareStatement(sql)) {
ps.setString(1, workflowId);
ps.setString(2, workflowRunId);
ps.setString(3, pName);
ResultSet rs = ps.executeQuery();
while (
} catch (InstantiationException | IllegalAccessException | ClassNotFoundException e) {
logger.warn("Could not execute query", e);
return result;
* get all processors of a given type within a structure identified by
* workflowId (reference to dataflow). type constraint is ignored if value is null.<br>
* this only returns the processor for the input workflowId, without going into any neted workflows
* @param workflowId
* @param firstActivityClass
* @return a list, that contains at most one element
* @throws SQLException
public List<ProvenanceProcessor> getProcessorsShallow(
String firstActivityClass, String workflowId) throws SQLException {
Map<String, String> constraints = new HashMap<>();
constraints.put("P.workflowId", workflowId);
if (firstActivityClass != null)
constraints.put("P.firstActivityClass", firstActivityClass);
return getProcessors(constraints);
private List<ProvenanceProcessor> getProcessorsShallow(Connection c,
String firstActivityClass, String workflowId) throws SQLException {
Map<String, String> constraints = new HashMap<>();
constraints.put("P.workflowId", workflowId);
if (firstActivityClass != null)
constraints.put("P.firstActivityClass", firstActivityClass);
return getProcessors(c, constraints);
public ProvenanceProcessor getProvenanceProcessorByName(
String workflowId, String processorName) {
Map<String, String> constraints = new HashMap<>();
constraints.put("P.workflowId", workflowId);
constraints.put("P.processorName", processorName);
List<ProvenanceProcessor> processors;
try {
processors = getProcessors(constraints);
} catch (SQLException e1) {
logger.warn("Could not find processor for " + constraints, e1);
return null;
if (processors.size() != 1) {
logger.warn("Could not uniquely find processor for " + constraints + ", got: " + processors);
return null;
return processors.get(0);
public ProvenanceProcessor getProvenanceProcessorById(String processorId) {
Map<String, String> constraints = new HashMap<>();
constraints.put("P.processorId", processorId);
List<ProvenanceProcessor> processors;
try {
processors = getProcessors(constraints);
} catch (SQLException e1) {
logger.warn("Could not find processor for " + constraints, e1);
return null;
if (processors.size() != 1) {
logger.warn("Could not uniquely find processor for " + constraints
+ ", got: " + processors);
return null;
return processors.get(0);
* this is similar to {@link #getProcessorsShallow(String, String)} but it
* recursively fetches all processors within nested workflows. The result is
* collected in the form of a map: workflowId -> {ProvenanceProcessor}
* @param firstActivityClass
* @param workflowId
* @return a map: workflowId -> {ProvenanceProcessor} where workflowId is
* the name of a (possibly nested) workflow, and the values are the
* processors within that workflow
public Map<String, List<ProvenanceProcessor>> getProcessorsDeep(
String firstActivityClass, String workflowId) {
Map<String, List<ProvenanceProcessor>> result = new HashMap<>();
try {
List<ProvenanceProcessor> currentProcs = getProcessorsShallow(null,
List<ProvenanceProcessor> matchingProcessors = new ArrayList<>();
result.put(workflowId, matchingProcessors);
for (ProvenanceProcessor pp:currentProcs) {
if (firstActivityClass == null
|| pp.getFirstActivityClassName().equals(
if (pp.getFirstActivityClassName().equals(DATAFLOW_ACTIVITY)) {
// Can't recurse as there's no way to find ID of nested workflow
//result.putAll(getProcessorsDeep(firstActivityClass, NESTED_WORKFLOW_ID));
// Silly fallback - use the broken getChildrenOfWorkflow() assuming that no other workflows
// have used the same nested workflow
for (String childWf : getChildrenOfWorkflow(workflowId))
result.putAll(getProcessorsDeep(firstActivityClass, childWf));
} catch (SQLException e) {
logger.error("Problem getting nested workflow processors for: " + workflowId, e);
return result;
public String getDataValue(String valueRef) {
String q = "SELECT * FROM Data where dataReference = ?;";
try (Connection connection = getConnection();
PreparedStatement stmt = connection.prepareStatement(q)) {
stmt.setString(1, valueRef);
ResultSet rs = stmt.executeQuery(q);
if (
return rs.getString("data");
} catch (Exception e) {
logger.warn("Could not execute query", e);
return null;
* generic method to fetch processors subject to additional query constraints
* @param constraints
* @return
* @throws SQLException
public List<ProvenanceProcessor> getProcessors(
Map<String, String> constraints) throws SQLException {
try (Connection connection = getConnection()) {
return getProcessors(connection, constraints);
} catch (InstantiationException | IllegalAccessException | ClassNotFoundException e) {
logger.warn("Could not execute query", e);
return new ArrayList<ProvenanceProcessor>();
private List<ProvenanceProcessor> getProcessors(Connection c,
Map<String, String> constraints) throws SQLException {
List<ProvenanceProcessor> result = new ArrayList<>();
try (Statement stmt = c.createStatement();
ResultSet rs = query("SELECT P.* FROM Processor P").where(
constraints).exec(stmt)) {
while ( {
ProvenanceProcessor proc = new ProvenanceProcessor();
return result;
public List<ProvenanceProcessor> getProcessorsForWorkflow(String workflowID) {
List<ProvenanceProcessor> result = new ArrayList<>();
try (Connection connection = getConnection();
PreparedStatement ps = connection
.prepareStatement("SELECT * from Processor WHERE workflowId=?")) {
ps.setString(1, workflowID);
ResultSet rs = ps.executeQuery();
while ( {
ProvenanceProcessor proc = new ProvenanceProcessor();
} catch (SQLException | InstantiationException | IllegalAccessException
| ClassNotFoundException e) {
logger.error("Problem getting processor for workflow: "
+ workflowID, e);
return result;
* simplest possible pinpoint query. Uses iteration info straight away. Assumes result is in PortBinding not in Collection
* @param workflowRun
* @param pname
* @param vname
* @param iteration
* @return
public LineageSQLQuery simpleLineageQuery(String workflowRun, String workflowId, String pname,
String vname, String iteration) {
LineageSQLQuery lq = new LineageSQLQuery();
Q q = query("SELECT * FROM PortBinding VB "
+ "JOIN Port V ON (VB.portName = V.portName AND VB.processorNameRef = V.processorName AND VB.workflowId = V.workflowId) "
+ "JOIN WorkflowRun W ON VB.workflowRunId = W.workflowRunId AND VB.workflowId = W.workflowId ");
// constraints:
q.where("W.workflowRunId", workflowRun)
.where("VB.processorNameRef", pname)
.where("VB.workflowId", workflowId);
if (vname != null)
q.where("VB.portName", vname);
if (iteration != null)
q.where("VB.iteration", iteration);
// add order by clauses
logger.debug("Query is: " + q.query());
return lq;
* if var2Path is null this generates a trivial query for the current output
* var and current path
* @param workflowRunId
* @param proc
* @param var2Path
* @param outputVar
* @param path
* @param returnOutputs
* returns inputs *and* outputs if set to true
* @return
public List<LineageSQLQuery> lineageQueryGen(String workflowRunId, String proc,
Map<Port, String> var2Path, Port outputVar, String path,
boolean returnOutputs) {
// setup
List<LineageSQLQuery> newQueries = new ArrayList<>();
// use the calculated path for each input var
boolean isInput = true;
for (Port v : var2Path.keySet()) {
LineageSQLQuery q = generateSQL2(workflowRunId, proc, v.getPortName(), var2Path.get(v), isInput);
if (q != null)
// is returnOutputs is true, then use proc, path for the output var as well
if (returnOutputs) {
isInput = false;
LineageSQLQuery q = generateSQL2(workflowRunId, proc, outputVar.getPortName(), path, isInput); // && !var2Path.isEmpty());
if (q != null)
return newQueries;
protected LineageSQLQuery generateSQL2(String workflowRun, String proc,
String var, String path, boolean returnInput) {
LineageSQLQuery lq = new LineageSQLQuery();
Q q;
// base Collection query
q = query("SELECT C.*,W.workflowId,V.isInputPort FROM Collection C "
+ "JOIN WorkflowRun W ON C.workflowRunId = W.workflowRunId "
+ "JOIN Port V ON V.workflowId = W.workflowId "
+ "AND C.processorNameRef = V.processorName "
+ "AND C.portName = V.portName ");
if (path != null && path.length() > 0)
q.where("C.iteration", "[" + path + "]"); // PM 1/09 -- path
lq.setCollQuery(q.where("W.workflowRunId", workflowRun).where("C.processorNameRef",
proc).where("V.isInputPort", returnInput ? "1" : "0").query());
// base PortBinding query
q = query("SELECT VB.*,V.isInputPort FROM PortBinding VB "
+ "JOIN WorkflowRun W ON VB.workflowRunId = W.workflowRunId "
+ "JOIN Port V on V.workflowId = W.workflowId "
+ "AND VB.processorNameRef = V.processorName "
+ "AND VB.portName = V.portName ");
if (path != null && path.length() > 0)
q.where("VB.iteration", "[" + path + "]"); // PM 1/09 -- path
lq.setVbQuery(q.where("W.workflowRunId", workflowRun)
.where("VB.processorNameRef", proc).where("VB.portName", var)
.where("V.isInputPort", returnInput ? "1" : "0")
return lq;
* if effectivePath is not null: query varBinding using: workflowRunId =
* workflowRun, iteration = effectivePath, processorNameRef = proc if input vars is
* null, then use the output var this returns the bindings for the set of
* input vars at the correct iteration if effectivePath is null: fetch
* PortBindings for all input vars, without constraint on the iteration<br/>
* additionally, try querying the collection table first -- if the query succeeds, it means
* the path is pointing to an internal node in the collection, and we just got the right node.
* Otherwise, query PortBinding for the leaves
* @param workflowRun
* @param proc
* @param effectivePath
* @param returnOutputs
* returns both inputs and outputs if set to true
* @return
public LineageSQLQuery generateSQL(String workflowRun, String proc,
String effectivePath, boolean returnOutputs) {
LineageSQLQuery lq = new LineageSQLQuery();
Q q;
// base Collection query
q = query("SELECT * FROM Collection C "
+ "JOIN WorkflowRun W ON C.workflowRunId = W.workflowRunId "
+ "JOIN Port V ON V.workflowRunId = W.workflowId "
+ "AND C.processorNameRef = V.processorNameRef "
+ "AND C.portName = V.portName ");
if (effectivePath != null && effectivePath.length() > 0)
q.where("C.iteration", "[" + effectivePath.toString() + "]"); // PM 1/09 -- path
// limit to inputs?
if (returnOutputs)
q.where("V.isInputPort", "1");
lq.setCollQuery(q.where("W.workflowRunId", workflowRun).where("C.processorNameRef",
// base PortBinding query
q = query("SELECT * FROM PortBinding VB "
+ "JOIN WorkflowRun W ON VB.workflowRunId = W.workflowRunId "
+ "JOIN Port V on V.workflowRunId = W.workflowId "
+ "AND VB.processorNameRef = V.processorNameRef "
+ "AND VB.portName = V.portName ");
if (effectivePath != null && effectivePath.length() > 0)
q.where("VB.iteration", "[" + effectivePath.toString() + "]"); // PM 1/09 -- path
// limit to inputs?
if (!returnOutputs)
q.where("V.isInputPort", "1");
lq.setVbQuery(q.where("W.workflowRunId", workflowRun)
.where("VB.processorNameRef", proc).orderBy("portName")
return lq;
public Dependencies runCollectionQuery(LineageSQLQuery lq) throws SQLException {
String q = lq.getCollQuery();
Dependencies lqr = new Dependencies();
if (q == null)
return lqr;
logger.debug("running collection query: " + q);
try (Connection connection = getConnection();
Statement stmt = connection.createStatement()) {
ResultSet rs = stmt.executeQuery(q);
while ( {
String type = Dependencies.ATOM_TYPE; // temp -- FIXME
String workflowId = rs.getString("workflowId");
String workflowRun = rs.getString("workflowRunId");
String proc = rs.getString("processorNameRef");
String var = rs.getString("portName");
String it = rs.getString("iteration");
String coll = rs.getString("collID");
String parentColl = rs.getString("parentCollIDRef");
//boolean isInput = rs.getBoolean("isInputPort");
lqr.addLineageQueryResultRecord(workflowId, proc, var, workflowRun,
it, coll, parentColl, null, null, type, false, true); // true -> is a collection
} catch (InstantiationException | IllegalAccessException | ClassNotFoundException e) {
logger.warn("Could not execute query", e);
return lqr;
* @param lq
* @param includeDataValue IGNORED. always false
* @return
* @throws SQLException
public Dependencies runVBQuery(LineageSQLQuery lq, boolean includeDataValue)
throws SQLException {
String q = lq.getVbQuery();"running VB query: " + q);
try (Connection connection = getConnection();
Statement stmt = connection.createStatement()) {
ResultSet rs = stmt.executeQuery(q);
Dependencies lqr = new Dependencies();
while ( {
String type = Dependencies.ATOM_TYPE; // temp -- FIXME
String workflowId = rs.getString("workflowId");
String workflowRun = rs.getString("workflowRunId");
String proc = rs.getString("processorNameRef");
String var = rs.getString("portName");
String it = rs.getString("iteration");
String coll = rs.getString("collIDRef");
String value = rs.getString("value");
boolean isInput = rs.getBoolean("isInputPort");
// FIXME if the data is required then the query needs fixing
lqr.addLineageQueryResultRecord(workflowId, proc, var, workflowRun,
it, coll, null, value, null, type, isInput, false);
return lqr;
} catch (InstantiationException | IllegalAccessException | ClassNotFoundException e) {
logger.warn("Could not execute query", e);
return null;
* executes one of the lineage queries produced by the graph visit algorithm. This first executes the collection query, and then
* if no result is returned, the varBinding query
* @param lq
* a lineage query computed during the graph traversal
* @param includeDataValue
* if true, then the referenced value is included in the result.
* This may only be necessary for testing: the data reference in
* field value (which is a misleading field name, and actually
* refers to the data reference) should be sufficient
* @return
* @throws SQLException
public Dependencies runLineageQuery(LineageSQLQuery lq,
boolean includeDataValue) throws SQLException {
Dependencies result = runCollectionQuery(lq);
if (result.getRecords().isEmpty())
return runVBQuery(lq, includeDataValue);
return result;
public List<Dependencies> runLineageQueries(List<LineageSQLQuery> lqList,
boolean includeDataValue) throws SQLException {
List<Dependencies> allResults = new ArrayList<>();
if (lqList == null)
logger.warn("lineage queries list is NULL, nothing to evaluate");
for (LineageSQLQuery lq : lqList)
if (lq != null)
allResults.add(runLineageQuery(lq, includeDataValue));
return allResults;
* takes an ordered set of records for the same variable with iteration
* indexes and builds a collection out of it
* @param lqr
* @return a jdom Document with the collection
public Document recordsToCollection(Dependencies lqr) {
// process each var name in turn
// lqr ordered by var name and by iteration number
Document d = new Document(new Element("list"));
String currentVar = null;
for (ListIterator<LineageQueryResultRecord> it = lqr.iterator(); it.hasNext();) {
LineageQueryResultRecord record =;
if (currentVar != null && record.getPortName().equals(currentVar)) {
// multiple occurrences
addToCollection(record, d);
// adds record to d in the correct position given by the iteration vector
if (currentVar == null)
currentVar = record.getPortName();
return d;
private void addToCollection(LineageQueryResultRecord record, Document d) {
Element root = d.getRootElement();
String[] itVector = record.getIteration().split(",");
Element currentEl = root;
// each element gives us a corresponding child in the tree
for (int i = 0; i < itVector.length; i++) {
int index = Integer.parseInt(itVector[i]);
List<?> children = currentEl.getChildren();
if (index < children.size())
currentEl = (Element) children.get(index);
else if (i == itVector.length - 1)
currentEl.addContent(new Element(record.getValue()));
currentEl.addContent(new Element("list"));
* returns the set of all processors that are structurally contained within
* the wf corresponding to the input dataflow name
* @param workflowName the name of a processor of type DataFlowActivity
* @return
* @deprecated as workflow 'names' are not globally unique, this method should not be used!
public List<String> getContainedProcessors(String workflowName) {
List<String> result = new ArrayList<>();
// dataflow name -> wfRef
String containerDataflow = getWorkflowIdForExternalName(workflowName);
// get all processors within containerDataflow
try (Connection connection = getConnection();
PreparedStatement ps = connection
.prepareStatement("SELECT processorName FROM Processor P "
+ "WHERE workflowId = ?")) {
ps.setString(1, containerDataflow);
ResultSet rs = ps.executeQuery();
while (
} catch (InstantiationException | SQLException | IllegalAccessException | ClassNotFoundException e) {
logger.warn("Could not execute query", e);
return result;
public String getTopLevelDataflowName(String workflowRunId) {
String sql = "SELECT processorName FROM Processor P "
+ "JOIN WorkflowRun I on P.workflowId = I.workflowId "
+ "WHERE I.workflowRunId = ? AND isTopLevel = 1";
try (Connection connection = getConnection();
PreparedStatement ps = connection.prepareStatement(sql)) {
ps.setString(1, workflowRunId);
ResultSet rs = ps.executeQuery();
if (
return rs.getString("processorName");
} catch (InstantiationException | SQLException | IllegalAccessException
| ClassNotFoundException e) {
logger.warn("Could not execute query", e);
return null;
* retrieve a tree structure starting from the top parent
* @param workflowID
* @return
* @throws SQLException
public WorkflowTree getWorkflowNestingStructure(String workflowID) throws SQLException {
WorkflowTree tree = new WorkflowTree();
Workflow wf = getWorkflow(workflowID);
List<String> children = getChildrenOfWorkflow(workflowID);
for (String childWfName:children) {
WorkflowTree childStructure = getWorkflowNestingStructure(childWfName);
return tree;
* returns the internal ID of a dataflow given its external name
* @param externalName
* @param workflowRunId
* @return
* @deprecated as workflow 'names' are not globally unique, this method should not be used!
public String getWorkflowIdForExternalName(String externalName) {
//"SELECT workflowId FROM Workflow W join WorkflowRun I on W.workflowId = I.workflowId WHERE W.externalName = ? and I.workflowRunId = ?");
String sql = "SELECT workflowId FROM Workflow W WHERE W.externalName = ?";
try (Connection connection = getConnection();
PreparedStatement ps = connection.prepareStatement(sql)) {
ps.setString(1, externalName);
// ps.setString(2, workflowRunId);
ResultSet rs = ps.executeQuery();
if (
return rs.getString("workflowId");
} catch (InstantiationException | IllegalAccessException | ClassNotFoundException | SQLException e) {
logger.warn("Could not execute query", e);
return null;
* This method is deprecated as parent workflow ID is not correctly
* recorded. If two workflows both contain the same nested workflow, only
* one of them (the most recently added) will return that nested workflow
* from this method.
* @deprecated
* @param parentWorkflowId
* @return
* @throws SQLException
public List<String> getChildrenOfWorkflow(String parentWorkflowId)
throws SQLException {
List<String> result = new ArrayList<>();
try (Connection connection = getConnection();
PreparedStatement ps = connection
.prepareStatement("SELECT workflowId FROM Workflow WHERE parentWorkflowId = ? ")) {
ps.setString(1, parentWorkflowId);
ResultSet rs = ps.executeQuery();
while (
} catch (InstantiationException | IllegalAccessException | ClassNotFoundException e) {
logger.warn("Could not execute query", e);
return result;
* fetch children of parentWorkflowId from the Workflow table
* @return
* @param childworkflowId
* @throws SQLException
public String getParentOfWorkflow(String childworkflowId)
throws SQLException {
String result = null;
String q = "SELECT parentWorkflowId FROM Workflow WHERE workflowId = ?";
try (Connection connection = getConnection();
PreparedStatement ps = connection.prepareStatement(q)) {
ps.setString(1, childworkflowId);
logger.debug("getParentOfWorkflow - query: " + q
+ " with workflowId = " + childworkflowId);
ResultSet rs = ps.executeQuery();
while ( {
result = rs.getString("parentWorkflowId");
logger.debug("result: " + result);
} catch (InstantiationException | IllegalAccessException
| ClassNotFoundException e) {
logger.warn("Could not execute query", e);
return result;
public List<String> getAllworkflowIds() throws SQLException {
List<String> result = new ArrayList<>();
String q = "SELECT workflowId FROM Workflow";
try (Connection connection = getConnection();
Statement stmt = connection.createStatement()) {
ResultSet rs = stmt.executeQuery(q);
while (
} catch (InstantiationException | IllegalAccessException
| ClassNotFoundException e) {
logger.warn("Could not execute query", e);
return result;
* @deprecated This method is not workflowId aware and should not be used
* @param procName
* @return true if procName is the external name of a dataflow, false
* otherwise
* @throws SQLException
public boolean isDataflow(String procName) throws SQLException {
String sql = "SELECT firstActivityClass FROM Processor WHERE processorName = ?";
try (Connection c = getConnection();
PreparedStatement ps = c.prepareStatement(sql)) {
ps.setString(1, procName);
ResultSet rs = ps.executeQuery();
if (
return true;
} catch (InstantiationException | IllegalAccessException
| ClassNotFoundException e) {
logger.warn("Could not execute query", e);
return false;
public boolean isTopLevelDataflow(String workflowIdID) {
String sql = "SELECT * FROM Workflow W where W.workflowId = ?";
try (Connection c = getConnection();
PreparedStatement ps = c.prepareStatement(sql)) {
ps.setString(1, workflowIdID);
ResultSet rs = ps.executeQuery();
if (
return (rs.getString("parentWorkflowId") == null);
} catch (SQLException | InstantiationException | IllegalAccessException | ClassNotFoundException e) {
logger.warn("Could not execute query", e);
return false;
public boolean isTopLevelDataflow(String workflowId, String workflowRunId) {
String sql = "SELECT " + parentProcessorEnactmentId + " AS parent"
+ " FROM " + DataflowInvocation + " W " + " WHERE "
+ DataflowInvocationTable.workflowId + "=? AND "
+ DataflowInvocationTable.workflowRunId + "=?";
try (Connection c = getConnection();
PreparedStatement ps = c.prepareStatement(sql)) {
ps.setString(1, workflowId);
ps.setString(2, workflowRunId);
ResultSet rs = ps.executeQuery();
if (
return (rs.getString("parent") == null);
} catch (SQLException | InstantiationException | IllegalAccessException | ClassNotFoundException e) {
logger.warn("Could not execute query", e);
return false;
public String getTopDataflow(String workflowRunId) {
String sql = "SELECT processorName FROM "
+ "Processor P JOIN WorkflowRun I ON P.workflowId = I.workflowId "
+ " WHERE I.workflowRunId = ? AND isTopLevel = 1 ";
try (Connection c = getConnection();
PreparedStatement ps = c.prepareStatement(sql)) {
ps.setString(1, workflowRunId);
ResultSet rs = ps.executeQuery();
if (
return rs.getString("processorName");
} catch (SQLException | InstantiationException | IllegalAccessException
| ClassNotFoundException e) {
logger.warn("Could not execute query", e);
return null;
* @param p
* pTo processor
* @param var
* vTo
* @param value
* valTo
* @return a set of DDRecord
* @throws SQLException
public List<DDRecord> queryDD(String p, String var, String value,
String iteration, String workflowRun) throws SQLException {
Q q = query("SELECT * FROM DD ");
q.where("pTo", p);
q.where("vTo", var);
if (value != null)
q.where("valTo", value);
if (iteration != null)
q.where("iteration", iteration);
if (workflowRun != null)
q.where("workflowRun", workflowRun);
try (Connection connection = getConnection();
Statement stmt = connection.createStatement();
ResultSet rs = q.exec(stmt)) {
List<DDRecord> result = new ArrayList<>();
while ( {
DDRecord aDDrecord = new DDRecord();
return result;
} catch (InstantiationException | IllegalAccessException | ClassNotFoundException e) {
logger.warn("Could not execute query", e);
return null;
public Set<DDRecord> queryDataLinksForDD(String p, String v, String val,
String workflowRun) throws SQLException {
String sql = "SELECT DISTINCT A.sourceProcessorName AS p, A.sourcePortName AS var, VB.value AS val "
+ "FROM Datalink A "
+ "JOIN PortBinding VB ON VB.portName = A.destinationPortName AND VB.processorNameRef = A.destinationProcessorName "
+ "JOIN WorkflowRun WF ON WF.workflowId = A.workflowId AND WF.workflowRunId = VB.workflowRunId "
+ "WHERE WF.workflowRunId = ? AND A.destinationProcessorName = ? AND A.destinationPortName = ? AND VB.value = ?";
try (Connection connection = getConnection();
PreparedStatement ps = connection.prepareStatement(sql)) {
ps.setString(1, workflowRun);
ps.setString(2, p);
ps.setString(3, v);
ps.setString(4, val);
ResultSet rs = ps.executeQuery();
Set<DDRecord> result = new HashSet<>();
while ( {
DDRecord aDDrecord = new DDRecord();
return result;
} catch (InstantiationException | IllegalAccessException | ClassNotFoundException e) {
logger.warn("Could not execute query", e);
return null;
public Set<DDRecord> queryAllFromValues(String workflowRun)
throws SQLException {
String sql = "SELECT DISTINCT PFrom, vFrom, valFrom FROM DD where workflowRun = ?";
try (Connection connection = getConnection();
PreparedStatement ps = connection.prepareStatement(sql)) {
ps.setString(1, workflowRun);
ResultSet rs = ps.executeQuery();
Set<DDRecord> result = new HashSet<>();
while ( {
DDRecord aDDrecord = new DDRecord();
return result;
} catch (InstantiationException | IllegalAccessException | ClassNotFoundException e) {
logger.warn("Could not execute query", e);
return null;
public boolean isRootProcessorOfWorkflow(String procName, String workflowId,
String workflowRunId) {
String sql = "SELECT * FROM Datalink A JOIN WorkflowRun I ON A.workflowId = I.workflowId "
+ "JOIN Processor P on P.processorName = A.sourceProcessorName "
+ "WHERE sourceProcessorName = ? "
+ "AND P.workflowId <> A.workflowId "
+ "AND I.workflowRunId = ? "
+ "AND destinationProcessorName = ? ";
try (Connection connection = getConnection();
PreparedStatement ps = connection.prepareStatement(sql)) {
ps.setString(1, workflowId);
ps.setString(2, workflowRunId);
ps.setString(3, procName);
if (ps.executeQuery().next())
return true;
} catch (InstantiationException | IllegalAccessException | ClassNotFoundException | SQLException e) {
logger.warn("Could not execute query", e);
return false;
* returns a Workflow record from the DB given the workflow internal ID
* @param dataflowID
* @return
public Workflow getWorkflow(String dataflowID) {
String sql = "SELECT * FROM Workflow W WHERE workflowId = ? ";
try (Connection connection = getConnection();
PreparedStatement ps = connection.prepareStatement(sql)) {
ps.setString(1, dataflowID);
ResultSet rs = ps.executeQuery();
if ( {
Workflow wf = new Workflow();
return wf;
} else {
logger.warn("Could not find workflow " + dataflowID);
} catch (InstantiationException | IllegalAccessException | ClassNotFoundException | SQLException e) {
logger.warn("Could not execute query", e);
return null;
* @param record
* a record representing a single value -- possibly within a list
* hierarchy
* @return the URI for topmost containing collection when the input record
* is within a list hierarchy, or null otherwise
public String getContainingCollection(LineageQueryResultRecord record) {
if (record.getCollectionT2Reference() == null)
return null;
String sql = "SELECT * FROM Collection "
+ "WHERE collID = ? and workflowRunId = ? and processorNameRef = ? and portName = ?";
try (Connection connection = getConnection()) {
String parentCollIDRef = null;
try (PreparedStatement stmt = connection.prepareStatement(sql)) {
stmt.setString(1, record.getCollectionT2Reference());
stmt.setString(2, record.getWorkflowRunId());
stmt.setString(3, record.getProcessorName());
stmt.setString(4, record.getPortName());
ResultSet rs = stmt.executeQuery();
if (
parentCollIDRef = rs.getString("parentCollIDRef");
// INITIALLY not null -- would be TOP if the initial had no parent
while (parentCollIDRef != null) {
String oldParentCollIDRef = parentCollIDRef;
// query Collection again for parent collection
try (PreparedStatement stmt = connection.prepareStatement(sql)) {
stmt.setString(1, oldParentCollIDRef);
stmt.setString(2, record.getWorkflowRunId());
stmt.setString(3, record.getProcessorName());
stmt.setString(4, record.getPortName());
ResultSet rs = stmt.executeQuery();
if ( {
parentCollIDRef = rs.getString("parentCollIDRef");
if (parentCollIDRef.equals("TOP"))
return oldParentCollIDRef;
} catch (Exception e) {
logger.warn("Could not execute query", e);
} catch (InstantiationException | IllegalAccessException | ClassNotFoundException | SQLException e) {
logger.warn("Could not execute query", e);
return null;
public List<ProcessorEnactment> getProcessorEnactments(
String workflowRunId, String... processorPath) {
return getProcessorEnactments(workflowRunId,
(List<ProcessorEnactment>) null, Arrays.asList(processorPath));
private List<ProcessorEnactment> getProcessorEnactments(
String workflowRunId, List<ProcessorEnactment> parentProcessorEnactments,
List<String> processorPath) {
List<String> processorEnactmentIds = null;
if (parentProcessorEnactments != null) {
processorEnactmentIds = new ArrayList<>();
for (ProcessorEnactment processorEnactment : parentProcessorEnactments)
if (processorPath.size() > 1) {
return getProcessorEnactments(
processorEnactmentIds, processorPath.get(0)),
processorPath.subList(1, processorPath.size()));
} else if (processorPath.size() == 1) {
return getProcessorEnactmentsByProcessorName(workflowRunId,
processorEnactmentIds, processorPath.get(0));
} else {
return getProcessorEnactmentsByProcessorName(workflowRunId,
processorEnactmentIds, null);
public List<ProcessorEnactment> getProcessorEnactmentsByProcessorName(
String workflowRunId, List<String> parentProcessorEnactmentIds,
String processorName) {
StringBuilder query = new StringBuilder();
query.append("SELECT ")
.append(ProcessorEnactmentTable.enactmentStarted).append(", ")
.append(ProcessorEnactmentTable.enactmentEnded).append(", ")
.append(", ")
.append(", ")
.append(" AS procId, ")
.append(ProcessorEnactmentTable.processIdentifier).append(", ")
.append(", ")
.append(", ").append(ProcessorEnactmentTable.workflowRunId)
.append(", ").append(ProcessorEnactmentTable.iteration)
.append(", Processor.processorName FROM ")
.append(" INNER JOIN Processor ON ")
.append(" = Processor.processorId WHERE ")
.append(ProcessorEnactmentTable.workflowRunId).append(" = ? ");
if (processorName != null)
// Specific processor
query.append(" AND Processor.processorName = ? ");
if ((parentProcessorEnactmentIds == null || parentProcessorEnactmentIds.isEmpty()) && processorName != null) {
// null - ie. top level
query.append(" AND " + ProcessorEnactmentTable.parentProcessorEnactmentId + " IS NULL");
} else if (parentProcessorEnactmentIds != null) {
// not null, ie. inside nested workflow
query.append(" AND " + ProcessorEnactmentTable.parentProcessorEnactmentId + " IN (");
for (int i=0; i<parentProcessorEnactmentIds.size(); i++) {
if (i < (parentProcessorEnactmentIds.size()-1))
ArrayList<ProcessorEnactment> procEnacts = new ArrayList<>();
try (Connection connection = getConnection();
PreparedStatement statement = connection.prepareStatement(query
.toString())) {
int pos = 1;
statement.setString(pos++, workflowRunId);
if (processorName != null)
statement.setString(pos++, processorName);
if (parentProcessorEnactmentIds != null)
for (String parentId : parentProcessorEnactmentIds)
statement.setString(pos++, parentId);
ResultSet resultSet = statement.executeQuery();
while ( {
ProcessorEnactment procEnact = readProcessorEnactment(resultSet);
} catch (SQLException | InstantiationException | IllegalAccessException | ClassNotFoundException e) {
logger.warn("Could not execute query " + query, e);
return procEnacts;
private ProcessorEnactment readProcessorEnactment(ResultSet resultSet) throws SQLException {
Timestamp enactmentStarted = resultSet.getTimestamp(;
Timestamp enactmentEnded = resultSet.getTimestamp(;
//String pName = resultSet.getString("processorName");
String finalOutputsDataBindingId = resultSet.getString(;
String initialInputsDataBindingId = resultSet.getString(;
String iteration = resultSet.getString(;
String processorId = resultSet.getString("procId");
String processIdentifier = resultSet.getString(;
String processEnactmentId = resultSet.getString(;
String parentProcessEnactmentId = resultSet.getString(;
String workflowRunId = resultSet.getString(;
ProcessorEnactment procEnact = new ProcessorEnactment();
return procEnact;
public ProcessorEnactment getProcessorEnactment(String processorEnactmentId) {
String query =
"SELECT " + ProcessorEnactmentTable.enactmentStarted + ","
+ ProcessorEnactmentTable.enactmentEnded + ","
+ ProcessorEnactmentTable.finalOutputsDataBindingId + ","
+ ProcessorEnactmentTable.initialInputsDataBindingId + ","
+ ProcessorEnactmentTable.ProcessorEnactment + "."
+ ProcessorEnactmentTable.processorId + " AS procId,"
+ ProcessorEnactmentTable.processIdentifier + ","
+ ProcessorEnactmentTable.workflowRunId + ","
+ ProcessorEnactmentTable.processEnactmentId + ","
+ ProcessorEnactmentTable.parentProcessorEnactmentId + ","
+ ProcessorEnactmentTable.iteration
+ " FROM "
+ ProcessorEnactmentTable.ProcessorEnactment
+ " WHERE "
+ ProcessorEnactmentTable.processEnactmentId + "=?";
ProcessorEnactment procEnact = null;
try (Connection connection = getConnection();
PreparedStatement statement = connection
.prepareStatement(query)) {
statement.setString(1, processorEnactmentId);
ResultSet resultSet = statement.executeQuery();
if (! {
logger.warn("Could not find ProcessorEnactment processEnactmentId="
+ processorEnactmentId);
return null;
procEnact = readProcessorEnactment(resultSet);
if ( {
logger.error("Found more than one ProcessorEnactment processEnactmentId="
+ processorEnactmentId);
return null;
} catch (SQLException | InstantiationException | IllegalAccessException | ClassNotFoundException e) {
logger.warn("Could not execute query " + query, e);
return procEnact;
public ProcessorEnactment getProcessorEnactmentByProcessId(
String workflowRunId, String processIdentifier, String iteration) {
String query = "SELECT " + ProcessorEnactmentTable.enactmentStarted
+ "," + ProcessorEnactmentTable.enactmentEnded + ","
+ ProcessorEnactmentTable.finalOutputsDataBindingId + ","
+ ProcessorEnactmentTable.initialInputsDataBindingId + ","
+ ProcessorEnactmentTable.ProcessorEnactment + "."
+ ProcessorEnactmentTable.processorId + " AS procId,"
+ ProcessorEnactmentTable.processIdentifier + ","
+ ProcessorEnactmentTable.workflowRunId + ","
+ ProcessorEnactmentTable.processEnactmentId + ","
+ ProcessorEnactmentTable.parentProcessorEnactmentId + ","
+ ProcessorEnactmentTable.iteration + " FROM "
+ ProcessorEnactmentTable.ProcessorEnactment + " WHERE "
+ ProcessorEnactmentTable.workflowRunId + "=?" + " AND "
+ ProcessorEnactmentTable.processIdentifier + "=?" + " AND "
+ ProcessorEnactmentTable.iteration + "=?";
ProcessorEnactment procEnact = null;
try (Connection connection = getConnection();
PreparedStatement statement = connection
.prepareStatement(query)) {
statement.setString(1, workflowRunId);
statement.setString(2, processIdentifier);
statement.setString(3, iteration);
ResultSet resultSet = statement.executeQuery();
String debugString = "ProcessorEnactment runId=" + workflowRunId
+ " processIdentifier=" + processIdentifier + " iteration="
+ iteration;
if (! {
logger.warn("Could not find " + debugString);
return null;
procEnact = readProcessorEnactment(resultSet);
if ( {
logger.error("Found more than one " + debugString);
return null;
} catch (SQLException | InstantiationException | IllegalAccessException | ClassNotFoundException e) {
logger.warn("Could not execute query " + query, e);
return procEnact;
public Map<Port, String> getDataBindings(String dataBindingId) {
HashMap<Port, String> dataBindings = new HashMap<>();
String query = "SELECT " + DataBindingTable.t2Reference + ","
+ "Port.portId AS portId," + "Port.processorName,"
+ "Port.processorId," + "Port.isInputPort," + "Port.portName,"
+ "Port.depth," + "Port.resolvedDepth," + "Port.workflowId"
+ " FROM " + DataBindingTable.DataBinding + " INNER JOIN "
+ "Port" + " ON " + " Port.portId="
+ DataBindingTable.DataBinding + "." + DataBindingTable.portId
+ " WHERE " + DataBindingTable.dataBindingId + "=?";
try (Connection connection = getConnection();
PreparedStatement statement = connection
.prepareStatement(query)) {
statement.setString(1, dataBindingId);
ResultSet rs = statement.executeQuery();
while ( {
String t2Ref = rs.getString(;
Port port = new Port();
if (rs.getString("resolvedDepth") != null)
dataBindings.put(port, t2Ref);
} catch (SQLException | InstantiationException | IllegalAccessException | ClassNotFoundException e) {
logger.warn("Could not execute query " + query, e);
return dataBindings;
public List<Port> getAllPortsInDataflow(String workflowID) {
Map<String, String> queryConstraints = new HashMap<>();
queryConstraints.put("V.workflowId", workflowID);
try {
return getPorts(queryConstraints);
} catch (SQLException e) {
logger.error("Problem getting ports for dataflow: " + workflowID, e);
return null;
public List<Port> getPortsForDataflow(String workflowID) {
Workflow w = getWorkflow(workflowID);
Map<String, String> queryConstraints = new HashMap<>();
queryConstraints.put("V.workflowId", workflowID);
queryConstraints.put("processorName", w.getExternalName());
try {
return getPorts(queryConstraints);
} catch (SQLException e) {
logger.error("Problem getting ports for dataflow: " + workflowID, e);
return null;
public List<Port> getPortsForProcessor(String workflowID,
String processorName) {
Map<String, String> queryConstraints = new HashMap<>();
queryConstraints.put("V.workflowId", workflowID);
queryConstraints.put("processorName", processorName);
try {
return getPorts(queryConstraints);
} catch (SQLException e) {
logger.error("Problem getting ports for processor: "
+ processorName + " worflow: " + workflowID, e);
return null;
public DataflowInvocation getDataflowInvocation(String workflowRunId) {
String query = "SELECT " +
DataflowInvocationTable.dataflowInvocationId + ","
+ DataflowInvocationTable.inputsDataBinding + ","
+ DataflowInvocationTable.invocationEnded + ","
+ DataflowInvocationTable.invocationStarted + ","
+ DataflowInvocationTable.outputsDataBinding + ","
+ DataflowInvocationTable.parentProcessorEnactmentId + ","
+ DataflowInvocationTable.workflowId + ","
+ DataflowInvocationTable.workflowRunId + ","
+ DataflowInvocationTable.completed
+ " FROM "
+ DataflowInvocationTable.DataflowInvocation +
+ DataflowInvocationTable.parentProcessorEnactmentId + " IS NULL AND "
+ DataflowInvocationTable.workflowRunId + "=?";
DataflowInvocation dataflowInvocation = null;
try (Connection connection = getConnection();
PreparedStatement statement = connection
.prepareStatement(query)) {
statement.setString(1, workflowRunId);
ResultSet rs = statement.executeQuery();
if (! {
logger.warn("Could not find DataflowInvocation for workflowRunId=" + workflowRunId);
return null;
dataflowInvocation = new DataflowInvocation();
if ( {
logger.error("Found more than one DataflowInvocation for workflowRunId=" + workflowRunId);
return null;
} catch (SQLException | InstantiationException | IllegalAccessException | ClassNotFoundException e) {
logger.warn("Could not execute query " + query, e);
return dataflowInvocation;
public DataflowInvocation getDataflowInvocation(
ProcessorEnactment processorEnactment) {
String query = "SELECT " + DataflowInvocationTable.dataflowInvocationId
+ "," + DataflowInvocationTable.inputsDataBinding + ","
+ DataflowInvocationTable.invocationEnded + ","
+ DataflowInvocationTable.invocationStarted + ","
+ DataflowInvocationTable.outputsDataBinding + ","
+ DataflowInvocationTable.parentProcessorEnactmentId + ","
+ DataflowInvocationTable.workflowId + ","
+ DataflowInvocationTable.workflowRunId + ","
+ DataflowInvocationTable.completed + " FROM "
+ DataflowInvocationTable.DataflowInvocation + " WHERE "
+ DataflowInvocationTable.parentProcessorEnactmentId + "=?";
DataflowInvocation dataflowInvocation = null;
try (Connection connection = getConnection();
PreparedStatement statement = connection
.prepareStatement(query)) {
statement.setString(1, processorEnactment.getProcessEnactmentId());
ResultSet rs = statement.executeQuery();
if (! {
logger.warn("Could not find DataflowInvocation for processorEnactmentId="
+ processorEnactment.getProcessEnactmentId());
return null;
dataflowInvocation = new DataflowInvocation();
if ( {
logger.error("Found more than one DataflowInvocation for processorEnactmentId="
+ processorEnactment.getProcessEnactmentId());
return null;
} catch (SQLException | InstantiationException | IllegalAccessException | ClassNotFoundException e) {
logger.warn("Could not execute query " + query, e);
return dataflowInvocation;
public List<DataflowInvocation> getDataflowInvocations(String workflowRunId) {
String query = "SELECT " + DataflowInvocationTable.dataflowInvocationId
+ "," + DataflowInvocationTable.inputsDataBinding + ","
+ DataflowInvocationTable.invocationEnded + ","
+ DataflowInvocationTable.invocationStarted + ","
+ DataflowInvocationTable.outputsDataBinding + ","
+ DataflowInvocationTable.parentProcessorEnactmentId + ","
+ DataflowInvocationTable.workflowId + ","
+ DataflowInvocationTable.workflowRunId + ","
+ DataflowInvocationTable.completed + " FROM "
+ DataflowInvocationTable.DataflowInvocation + " WHERE "
+ DataflowInvocationTable.workflowRunId + "=?";
List<DataflowInvocation> invocations = new ArrayList<>();
try (Connection connection = getConnection();
PreparedStatement statement = connection
.prepareStatement(query)) {
statement.setString(1, workflowRunId);
ResultSet rs = statement.executeQuery();
if (! {
logger.warn("Could not find DataflowInvocation for workflowRunId=" + workflowRunId);
return null;
DataflowInvocation dataflowInvocation = new DataflowInvocation();
} catch (SQLException | InstantiationException | IllegalAccessException | ClassNotFoundException e) {
logger.warn("Could not execute query " + query, e);
return invocations;
public List<Collection> getCollectionsForRun(String wfInstanceID) {
ArrayList<Collection> result = new ArrayList<>();
String sql = "SELECT * FROM Collection C WHERE workflowRunId = ?";
try (Connection c = getConnection();
PreparedStatement ps = c.prepareStatement(sql)) {
ps.setString(1, wfInstanceID);
ResultSet rs = ps.executeQuery();
while ( {
Collection coll = new Collection();
} catch (Exception e) {
logger.warn("Could not execute query", e);
return result;