blob: eafe89e34f2551385748a6062a1ac38d6ff48e34 [file] [log] [blame]
/*******************************************************************************
* Copyright (C) 2007 The University of Manchester
*
* Modifications to the initial code base are copyright of their
* respective authors, or their employers as appropriate.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU Lesser General Public License
* as published by the Free Software Foundation; either version 2.1 of
* the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful, but
* WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* Lesser General Public License for more details.
*
* You should have received a copy of the GNU Lesser General Public
* License along with this program; if not, write to the Free Software
* Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
******************************************************************************/
package net.sf.taverna.t2.provenance.lineageservice;
import java.sql.Blob;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.ListIterator;
import java.util.Map;
import java.util.Set;
import java.util.Map.Entry;
import net.sf.taverna.t2.invocation.InvocationContext;
import net.sf.taverna.t2.provenance.connector.JDBCConnector;
import net.sf.taverna.t2.provenance.lineageservice.utils.Arc;
import net.sf.taverna.t2.provenance.lineageservice.utils.DDRecord;
import net.sf.taverna.t2.provenance.lineageservice.utils.NestedListNode;
import net.sf.taverna.t2.provenance.lineageservice.utils.ProcBinding;
import net.sf.taverna.t2.provenance.lineageservice.utils.ProvenanceProcessor;
import net.sf.taverna.t2.provenance.lineageservice.utils.Var;
import net.sf.taverna.t2.provenance.lineageservice.utils.VarBinding;
import net.sf.taverna.t2.provenance.lineageservice.utils.Workflow;
import net.sf.taverna.t2.provenance.lineageservice.utils.WorkflowInstance;
import net.sf.taverna.t2.reference.ReferenceService;
import net.sf.taverna.t2.reference.T2Reference;
import net.sf.taverna.t2.reference.impl.T2ReferenceImpl;
import org.apache.log4j.Logger;
import org.jdom.Document;
import org.jdom.Element;
/**
* Handles all the querying of provenance items in the database layer. Uses
* standard SQL so all specific instances of this class can extend this writer
* to handle all of the db queries
*
* @author Paolo Missier
* @author Ian Dunlop
* @author Stuart Owen
*
*/
public abstract class ProvenanceQuery {
protected Logger logger = Logger.getLogger(ProvenanceQuery.class);
public static String DATAFLOW_TYPE = "net.sf.taverna.t2.activities.dataflow.DataflowActivity";
public Connection getConnection() throws InstantiationException,
IllegalAccessException, ClassNotFoundException, SQLException {
return JDBCConnector.getConnection();
}
/**
* implements a set of query constraints of the form var = value into a
* WHERE clause
*
* @param q0
* @param queryConstraints
* @return
*/
protected String addWhereClauseToQuery(String q0,
Map<String, String> queryConstraints, boolean terminate) {
// complete query according to constraints
StringBuffer q = new StringBuffer(q0);
boolean first = true;
if (queryConstraints != null && queryConstraints.size() > 0) {
q.append(" where ");
for (Entry<String, String> entry : queryConstraints.entrySet()) {
if (!first) {
q.append(" and ");
}
q.append(" " + entry.getKey() + " = \'" + entry.getValue() + "\' ");
first = false;
}
}
return q.toString();
}
protected String addOrderByToQuery(String q0, List<String> orderAttr,
boolean terminate) {
// complete query according to constraints
StringBuffer q = new StringBuffer(q0);
boolean first = true;
if (orderAttr != null && orderAttr.size() > 0) {
q.append(" ORDER BY ");
int i = 1;
for (String attr : orderAttr) {
q.append(attr);
if (i++ < orderAttr.size()) {
q.append(",");
}
}
}
return q.toString();
}
/**
* select Var records that satisfy constraints
*/
public List<Var> getVars(Map<String, String> queryConstraints)
throws SQLException {
List<Var> result = new ArrayList<Var>();
String q0 = "SELECT * FROM Var V JOIN WfInstance W ON W.wfnameRef = V.wfInstanceRef";
String q = addWhereClauseToQuery(q0, queryConstraints, true);
List<String> orderAttr = new ArrayList<String>();
orderAttr.add("V.order");
String q1 = addOrderByToQuery(q, orderAttr, true);
Statement stmt = null;
Connection connection = null;
try {
connection = getConnection();
stmt = connection.createStatement();
boolean success = stmt.execute(q1.toString());
if (success) {
ResultSet rs = stmt.getResultSet();
while (rs.next()) {
Var aVar = new Var();
aVar.setWfInstanceRef(rs.getString("WfInstanceRef"));
if (rs.getInt("inputOrOutput") == 1) {
aVar.setInput(true);
} else {
aVar.setInput(false);
}
aVar.setPName(rs.getString("pnameRef"));
aVar.setVName(rs.getString("varName"));
aVar.setType(rs.getString("type"));
aVar.setTypeNestingLevel(rs.getInt("nestingLevel"));
aVar.setActualNestingLevel(rs.getInt("actualNestingLevel"));
aVar.setANLset((rs.getInt("anlSet") == 1 ? true : false));
result.add(aVar);
}
}
} catch (InstantiationException e) {
logger.warn("Could not execute query: " + e);
} catch (IllegalAccessException e) {
logger.warn("Could not execute query: " + e);
} catch (ClassNotFoundException e) {
logger.warn("Could not execute query: " + e);
} finally {
if (connection != null) {
connection.close();
}
}
return result;
}
private List<Var> getVarsNoInstance(Map<String, String> queryConstraints)
throws SQLException {
List<Var> result = new ArrayList<Var>();
String q0 = "SELECT * FROM Var V";
String q = addWhereClauseToQuery(q0, queryConstraints, true);
Statement stmt = null;
Connection connection = null;
try {
connection = getConnection();
stmt = connection.createStatement();
boolean success = stmt.execute(q.toString());
if (success) {
ResultSet rs = stmt.getResultSet();
while (rs.next()) {
Var aVar = new Var();
aVar.setWfInstanceRef(rs.getString("WfInstanceRef"));
if (rs.getInt("inputOrOutput") == 1) {
aVar.setInput(true);
} else {
aVar.setInput(false);
}
aVar.setPName(rs.getString("pnameRef"));
aVar.setVName(rs.getString("varName"));
aVar.setType(rs.getString("type"));
aVar.setTypeNestingLevel(rs.getInt("nestingLevel"));
aVar.setActualNestingLevel(rs.getInt("actualNestingLevel"));
aVar.setANLset((rs.getInt("anlSet") == 1 ? true : false));
result.add(aVar);
}
}
} catch (InstantiationException e) {
logger.warn("Could not execute query: " + e);
} catch (IllegalAccessException e) {
logger.warn("Could not execute query: " + e);
} catch (ClassNotFoundException e) {
logger.warn("Could not execute query: " + e);
} finally {
if (connection != null) {
connection.close();
}
}
return result;
}
public List<String> getVarValues(String wfInstance, String pname,
String vname) throws SQLException {
List<String> result = new ArrayList<String>();
String q0 = "SELECT value FROM VarBinding VB";
Map<String, String> queryConstraints = new HashMap<String, String>();
queryConstraints.put("wfInstanceRef", wfInstance);
queryConstraints.put("PNameRef", pname);
queryConstraints.put("varNameRef", vname);
String q = addWhereClauseToQuery(q0, queryConstraints, true);
Statement stmt = null;
Connection connection = null;
try {
connection = getConnection();
stmt = connection.createStatement();
boolean success = stmt.execute(q.toString());
if (success) {
ResultSet rs = stmt.getResultSet();
while (rs.next()) {
result.add(rs.getString("value"));
}
}
} catch (InstantiationException e) {
logger.warn("Could not execute query: " + e);
} catch (IllegalAccessException e) {
logger.warn("Could not execute query: " + e);
} catch (ClassNotFoundException e) {
logger.warn("Could not execute query: " + e);
} finally {
if (connection != null) {
connection.close();
}
}
return result;
}
/**
* return the input variables for a given processor and a wfInstanceId
*
* @param pname
* @param wfInstanceId
* @return list of input variables
* @throws SQLException
*/
public List<Var> getInputVars(String pname, String wfID, String wfInstanceID)
throws SQLException {
// get (var, proc) from Var to see if it's input/output
Map<String, String> varQueryConstraints = new HashMap<String, String>();
varQueryConstraints.put("V.wfInstanceRef", wfID);
varQueryConstraints.put("V.pnameRef", pname);
varQueryConstraints.put("V.inputOrOutput", "1");
if (wfInstanceID != null) {
varQueryConstraints.put("W.instanceID", wfInstanceID);
return getVars(varQueryConstraints);
} else {
return getVarsNoInstance(varQueryConstraints);
}
}
/**
* return the output variables for a given processor and a wfInstanceId
*
* @param pname
* @param wfInstanceId
* @return list of output variables
* @throws SQLException
*/
public List<Var> getOutputVars(String pname, String wfID, String wfInstanceID)
throws SQLException {
// get (var, proc) from Var to see if it's input/output
Map<String, String> varQueryConstraints = new HashMap<String, String>();
varQueryConstraints.put("V.wfInstanceRef", wfID);
varQueryConstraints.put("V.pnameRef", pname);
varQueryConstraints.put("V.inputOrOutput", "0");
if (wfInstanceID != null) {
varQueryConstraints.put("W.instanceID", wfInstanceID);
}
return getVars(varQueryConstraints);
}
/**
* selects all Arcs
*
* @param queryConstraints
* @return
* @throws SQLException
*/
public List<Arc> getArcs(Map<String, String> queryConstraints)
throws SQLException {
List<Arc> result = new ArrayList<Arc>();
String q0 = "SELECT * FROM Arc A JOIN WfInstance W ON W.wfnameRef = A.wfInstanceRef";
String q = addWhereClauseToQuery(q0, queryConstraints, true);
Statement stmt = null;
Connection connection = null;
try {
connection = getConnection();
stmt = connection.createStatement();
boolean success = stmt.execute(q.toString());
if (success) {
ResultSet rs = stmt.getResultSet();
while (rs.next()) {
Arc aArc = new Arc();
aArc.setWfInstanceRef(rs.getString("WfInstanceRef"));
aArc.setSourcePnameRef(rs.getString("sourcePNameRef"));
aArc.setSourceVarNameRef(rs.getString("sourceVarNameRef"));
aArc.setSinkPnameRef(rs.getString("sinkPNameRef"));
aArc.setSinkVarNameRef(rs.getString("sinkVarNameRef"));
result.add(aArc);
}
}
} catch (InstantiationException e) {
logger.warn("Could not execute query: " + e);
} catch (IllegalAccessException e) {
logger.warn("Could not execute query: " + e);
} catch (ClassNotFoundException e) {
logger.warn("Could not execute query: " + e);
} finally {
if (connection != null) {
connection.close();
}
}
return result;
}
public String getTopLevelWfName(String runID) throws SQLException {
List<String> wfNames = new ArrayList<String>();
List<Workflow> workflows = getWorkflowForRun(runID);
for (Workflow w:workflows) {
if (w.getParentWFname() == null) { return w.getWfname(); }
}
return null;
}
/**
* returns the names of all workflows (top level + nested) for a given runID
* @param runID
* @return
* @throws SQLException
*/
public List<String> getWfNames(String runID) throws SQLException {
List<String> wfNames = new ArrayList<String>();
List<Workflow> workflows = getWorkflowForRun(runID);
for (Workflow w:workflows) { wfNames.add(w.getWfname()); }
return wfNames;
}
/**
* returns the workflows associated to a single runID
* @param runID
* @return
* @throws SQLException
*/
public List<Workflow> getWorkflowForRun(String runID) throws SQLException {
List<Workflow> result = new ArrayList<Workflow>();
String q = "SELECT * FROM WfInstance I join Workflow W on I.wfnameRef = W.wfname where instanceID = ?";
PreparedStatement stmt = null;
Connection connection = null;
try {
connection = getConnection();
stmt = connection.prepareStatement(q);
stmt.setString(1, runID);
boolean success = stmt.execute();
if (success) {
ResultSet rs = stmt.getResultSet();
while (rs.next()) {
Workflow w = new Workflow();
w.setWfName(rs.getString("wfnameRef"));
w.setParentWFname(rs.getString("parentWFName"));
result.add(w);
}
}
} catch (InstantiationException e) {
logger.error("Error finding the workflow reference", e);
} catch (IllegalAccessException e) {
logger.error("Error finding the workflow reference", e);
} catch (ClassNotFoundException e) {
logger.error("Error finding the workflow reference", e);
} finally {
if (connection != null) {
connection.close();
}
}
return result;
}
/**
* @param dataflowID
* @param conditions currently only understands "from" and "to" as timestamps for range queries
* @return
* @throws SQLException
*/
public List<WorkflowInstance> getRuns(String dataflowID, Map<String, String> conditions) throws SQLException {
PreparedStatement ps = null;
Connection connection = null;
List<WorkflowInstance> result = new ArrayList<WorkflowInstance>();
String q = "SELECT * FROM WfInstance I join Workflow W on I.wfnameRef = W.wfname";
List<String> conds = new ArrayList<String>();
if (dataflowID != null) { conds.add("wfnameRef = '"+dataflowID+"'"); }
if (conditions != null) {
if (conditions.get("from") != null) { conds.add("timestamp >= "+conditions.get("from")); }
if (conditions.get("to") != null) { conds.add("timestamp <= "+conditions.get("to")); }
}
if (conds.size()>0) { q = q + " where "+conds.get(0); conds.remove(0); }
while (conds.size()>0) {
q = q + " and '"+conds.get(0)+"'";
conds.remove(0);
}
try {
connection = getConnection();
ps = connection.prepareStatement(q);
logger.debug(q);
boolean success = ps.execute();
if (success) {
ResultSet rs = ps.getResultSet();
while (rs.next()) {
WorkflowInstance i = new WorkflowInstance();
i.setInstanceID(rs.getString("instanceID"));
i.setTimestamp(rs.getString("timestamp"));
i.setWorkflowIdentifier(rs.getString("wfnameRef"));
i.setWorkflowExternalName(rs.getString("externalName"));
Blob blob = rs.getBlob("dataflow");
long length = blob.length();
blob.getBytes(1, (int) length);
i.setDataflowBlob(blob.getBytes(1, (int) length));
result.add(i);
}
}
} catch (InstantiationException e) {
logger.warn("Could not execute query: " + e);
} catch (IllegalAccessException e) {
logger.warn("Could not execute query: " + e);
} catch (ClassNotFoundException e) {
logger.warn("Could not execute query: " + e);
} finally {
if (connection != null) {
connection.close();
}
}
return result;
}
/*
* gets all available run instances, most recent first
* @return a list of pairs <wfanme, wfinstance>
* @see net.sf.taverna.t2.provenance.lineageservice.mysql.ProvenanceQuery#
* getWFInstanceIDs()
*/
public List<String> getWFNamesByTime() throws SQLException {
List<String> result = new ArrayList<String>();
String q = "SELECT instanceID, wfnameRef FROM WfInstance order by timestamp desc";
Statement stmt = null;
Connection connection = null;
try {
connection = getConnection();
stmt = connection.createStatement();
boolean success = stmt.execute(q);
if (success) {
ResultSet rs = stmt.getResultSet();
while (rs.next()) {
result.add(rs.getString("wfnameRef"));
}
}
} catch (InstantiationException e) {
logger.warn("Could not execute query: " + e);
} catch (IllegalAccessException e) {
logger.warn("Could not execute query: " + e);
} catch (ClassNotFoundException e) {
logger.warn("Could not execute query: " + e);
} finally {
if (connection != null) {
connection.close();
}
}
return result;
}
/**
* all ProCBinding records that satisfy the input constraints
*
* @param constraints
* @return
* @throws SQLException
*/
public List<ProcBinding> getProcBindings(Map<String, String> constraints)
throws SQLException {
List<ProcBinding> result = new ArrayList<ProcBinding>();
String q = "SELECT * FROM ProcBinding PB ";
q = addWhereClauseToQuery(q, constraints, true);
Statement stmt = null;
Connection connection = null;
try {
connection = getConnection();
stmt = connection.createStatement();
boolean success = stmt.execute(q);
if (success) {
ResultSet rs = stmt.getResultSet();
while (rs.next()) {
ProcBinding pb = new ProcBinding();
pb.setActName(rs.getString("actName"));
pb.setExecIDRef(rs.getString("execIDRef"));
pb.setIterationVector(rs.getString("iteration"));
pb.setPNameRef(rs.getString("pnameRef"));
pb.setWfNameRef(rs.getString("wfNameRef"));
result.add(pb);
}
}
} catch (InstantiationException e) {
logger.warn("Could not execute query: " + e);
} catch (IllegalAccessException e) {
logger.warn("Could not execute query: " + e);
} catch (ClassNotFoundException e) {
logger.warn("Could not execute query: " + e);
} finally {
if (connection != null) {
connection.close();
}
}
return result;
}
/**
* TODO this currently returns the data value as a string, which is
* incorrect as it is an untyped byte array
*
* @param constraints
* a Map columnName -> value that defines the query constraints.
* Note: columnName must be fully qualified. This is not done
* well at the moment, i.e., PNameRef should be
* VarBinding.PNameRef to avoid ambiguities
* @return
* @throws SQLException
*/
public List<VarBinding> getVarBindings(Map<String, String> constraints)
throws SQLException {
List<VarBinding> result = new ArrayList<VarBinding>();
String q = "SELECT * FROM VarBinding VB join Var V " +
"on (VB.varNameRef = V.varName and VB.PNameRef = V.PNameRef and VB.wfNameRef = V.wfInstanceRef) " +
"JOIN WfInstance W ON VB.wfInstanceRef = W.instanceID and VB.wfNameRef = W.wfnameRef " +
"LEFT OUTER JOIN Data D ON D.wfInstanceID = VB.wfInstanceRef and D.dataReference = VB.value";
q = addWhereClauseToQuery(q, constraints, true);
Statement stmt = null;
Connection connection = null;
try {
connection = getConnection();
stmt = connection.createStatement();
boolean success = stmt.execute(q);
if (success) {
ResultSet rs = stmt.getResultSet();
while (rs.next()) {
VarBinding vb = new VarBinding();
vb.setWfNameRef(rs.getString("wfNameRef"));
vb.setVarNameRef(rs.getString("varNameRef"));
vb.setWfInstanceRef(rs.getString("wfInstanceRef"));
vb.setValue(rs.getString("value"));
if (rs.getString("collIdRef") == null || rs.getString("collIdRef").equals("null")) {
vb.setCollIDRef(null);
} else {
vb.setCollIDRef(rs.getString("collIdRef"));
}
vb.setIterationVector(rs.getString("iteration"));
vb.setPNameRef(rs.getString("PNameRef"));
vb.setPositionInColl(rs.getInt("positionInColl"));
try {
vb.setResolvedValue(rs.getString("D.data"));
} catch (Exception e1) {
// ignore this since D.data is experimental and will be
// removed at some point
}
result.add(vb);
}
}
} catch (Exception e) {
logger.warn("Add VB failed:" + e.getMessage());
} finally {
if (connection != null) {
connection.close();
}
}
return result;
}
public List<NestedListNode> getNestedListNodes(
Map<String, String> constraints) throws SQLException {
List<NestedListNode> result = new ArrayList<NestedListNode>();
String q = "SELECT * FROM Collection C ";
q = addWhereClauseToQuery(q, constraints, true);
Statement stmt = null;
Connection connection = null;
try {
connection = getConnection();
stmt = connection.createStatement();
} catch (InstantiationException e) {
logger.error("Error finding the nested list nodes", e);
} catch (IllegalAccessException e) {
logger.error("Error finding the nested list nodes", e);
} catch (ClassNotFoundException e) {
logger.error("Error finding the nested list nodes", e);
}
boolean success;
try {
success = stmt.execute(q);
if (success) {
ResultSet rs = stmt.getResultSet();
while (rs.next()) {
VarBinding vb = new VarBinding();
NestedListNode nln = new NestedListNode();
nln.setCollId(rs.getString("collId"));
nln.setParentCollIdRef(rs.getString("parentCollIdRef"));
nln.setWfInstanceRef(rs.getString("wfInstanceRef"));
nln.setPNameRef(rs.getString("PNameRef"));
nln.setVarNameRef(rs.getString("varNameRef"));
nln.setIteration(rs.getString("iteration"));
result.add(nln);
}
}
} finally {
if (connection != null) {
connection.close();
}
}
return result;
}
public Map<String, Integer> getPredecessorsCount(String wfInstanceID) {
PreparedStatement ps = null;
Map<String, Integer> result = new HashMap<String, Integer>();
// get all arcs for the entire workflow structure for this particular instance
Connection connection = null;
try {
connection = getConnection();
ps = connection.prepareStatement(
"SELECT A.sourcePNameRef as source , A.sinkPNameRef as sink, A.wfInstanceRef as wfName1, W1.wfName as wfName2, W2.wfName as wfName3 " +
"FROM Arc A join WfInstance I on A.wfInstanceRef = I.wfnameRef " +
"left outer join Workflow W1 on W1.externalName = A.sourcePNameRef " +
"left outer join Workflow W2 on W2.externalName = A.sinkPNameRef " +
"where I.instanceID = ? order by wfInstanceRef");
ps.setString(1, wfInstanceID);
boolean success = ps.execute();
if (success) {
ResultSet rs = ps.getResultSet();
while (rs.next()) {
String sink = rs.getString("sink");
String source = rs.getString("source");
if (result.get(sink) == null) {
result.put(sink, 0);
}
String name1 = rs.getString("wfName1");
String name2 = rs.getString("wfName2");
String name3 = rs.getString("wfName3");
if (isDataflow(source) && name1.equals(name2)) {
continue;
}
if (isDataflow(sink) && name1.equals(name3)) {
continue;
}
result.put(sink, result.get(sink) + 1);
}
}
} catch (InstantiationException e1) {
logger.warn("Could not execute query: " + e1);
} catch (IllegalAccessException e1) {
logger.warn("Could not execute query: " + e1);
} catch (ClassNotFoundException e1) {
logger.warn("Could not execute query: " + e1);
} catch (SQLException e) {
logger.error("Error executing query", e);
} finally {
if (connection != null) {
try {
connection.close();
} catch (SQLException ex) {
logger.error("There was an error closing the database connection", ex);
}
}
}
return result;
}
/**
* new impl of getProcessorsIncomingLinks whicih avoids complications due to nesting, and relies on the wfInstanceID
* rather than the wfnameRef
* @param wfInstanceID
* @return
*/
public Map<String, Integer> getPredecessorsCountOld(String wfInstanceID) {
PreparedStatement ps = null;
Map<String, Integer> result = new HashMap<String, Integer>();
// get all arcs for the entire workflow structure for this particular instance
Connection connection = null;
try {
connection = getConnection();
ps = connection.prepareStatement(
"SELECT sinkPnameRef, P1.type, count(*) as pred " +
" FROM Arc A join WfInstance I on A.wfInstanceRef = I.wfnameRef " +
" join Processor P1 on P1.pname = A.sinkPnameRef " +
" join Processor P2 on P2.pname = A.sourcePnameRef " +
" where I.instanceID = ? " +
" and P2.type <> 'net.sf.taverna.t2.activities.dataflow.DataflowActivity' " +
" and ((P1.type = 'net.sf.taverna.t2.activities.dataflow.DataflowActivity' and P1.wfInstanceRef = A.wfInstanceRef) or " +
" (P1.type <> 'net.sf.taverna.t2.activities.dataflow.DataflowActivity')) " +
" group by A.sinkPnameRef, type");
ps.setString(1, wfInstanceID);
boolean success = ps.execute();
if (success) {
ResultSet rs = ps.getResultSet();
while (rs.next()) {
int cnt = rs.getInt("pred");
result.put(rs.getString("sinkPnameRef"), new Integer(cnt));
}
}
} catch (InstantiationException e1) {
logger.warn("Could not execute query: " + e1);
} catch (IllegalAccessException e1) {
logger.warn("Could not execute query: " + e1);
} catch (ClassNotFoundException e1) {
logger.warn("Could not execute query: " + e1);
} catch (SQLException e) {
logger.warn("Could not execute query: " + e);
} finally {
if (connection != null) {
try {
connection.close();
} catch (SQLException ex) {
logger.error("There was an error closing the database connection", ex);
}
}
}
return result;
}
/**
* used in the toposort phase -- propagation of anl() values through the
* graph
*
* @param wfnameRef
* reference to static wf name
* @return a map <processor name> --> <incoming links count> for each
* processor, without counting the arcs from the dataflow input to
* processors. So a processor is at the root of the graph if it has
* no incoming links, or all of its incoming links are from dataflow
* inputs.<br/>
* Note: this must be checked for processors that are roots of
* sub-flows... are these counted as top-level root nodes??
*/
public Map<String, Integer> getProcessorsIncomingLinks(String wfnameRef)
throws SQLException {
Map<String, Integer> result = new HashMap<String, Integer>();
boolean success;
String currentWorkflowProcessor = null;
PreparedStatement ps = null;
Statement stmt;
Connection connection = null;
try {
connection = getConnection();
ps = connection.prepareStatement(
"SELECT pName, type FROM Processor WHERE wfInstanceRef = ?");
ps.setString(1, wfnameRef);
success = ps.execute();
if (success) {
ResultSet rs = ps.getResultSet();
while (rs.next()) {
// PM CHECK 6/09
if (rs.getString("type").equals("net.sf.taverna.t2.activities.dataflow.DataflowActivity")) {
currentWorkflowProcessor = rs.getString("pName");
logger.info("currentWorkflowProcessor = " + currentWorkflowProcessor);
}
result.put(rs.getString("pName"), new Integer(0));
}
}
} catch (InstantiationException e1) {
logger.warn("Could not execute query: " + e1);
} catch (IllegalAccessException e1) {
logger.warn("Could not execute query: " + e1);
} catch (ClassNotFoundException e1) {
logger.warn("Could not execute query: " + e1);
} finally {
try {
connection.close();
} catch (SQLException ex) {
logger.error("An error occurred closing the database connection", ex);
}
connection = null;
}
// fetch the name of the top-level dataflow. We use this to exclude arcs
// outgoing from its inputs
////////////////
// CHECK below -- gets confused on nested workflows
////////////////
String parentWF = getParentOfWorkflow(wfnameRef);
if (parentWF == null) {
parentWF = wfnameRef; // null parent means we are the top
}
logger.debug("parent WF: " + parentWF);
// get nested dataflows -- we want to avoid these in the toposort algorithm
List<ProvenanceProcessor> procs = getProcessorsShallow(
"net.sf.taverna.t2.activities.dataflow.DataflowActivity",
parentWF);
StringBuffer pNames = new StringBuffer();
pNames.append("(");
boolean first = true;
for (ProvenanceProcessor p : procs) {
if (!first) {
pNames.append(",");
} else {
first = false;
}
pNames.append(" '" + p.getPname() + "' ");
}
pNames.append(")");
// exclude processors connected to inputs -- those have 0 predecessors
// for our purposes
// and we add them later
// PM 6/09 not sure we need to exclude arcs going into sub-flows?? so commented out the condition
String q = "SELECT sinkPNameRef, count(*) as cnt " + "FROM Arc " + "WHERE wfInstanceRef = \'" + wfnameRef + "\' " + "AND sinkPNameRef NOT IN " + pNames + " " // + "AND sourcePNameRef NOT IN " + pNames
+ " GROUP BY sinkPNameRef";
logger.info("executing \n" + q);
try {
connection = getConnection();
stmt = connection.createStatement();
success = stmt.execute(q);
if (success) {
ResultSet rs = stmt.getResultSet();
while (rs.next()) {
if (!rs.getString("sinkPNameRef").equals(currentWorkflowProcessor)) {
result.put(rs.getString("sinkPNameRef"), new Integer(rs.getInt("cnt")));
}
}
result.put(currentWorkflowProcessor, 0);
}
} catch (InstantiationException e) {
logger.warn("Could not execute query: " + e);
} catch (IllegalAccessException e) {
logger.warn("Could not execute query: " + e);
} catch (ClassNotFoundException e) {
logger.warn("Could not execute query: " + e);
} finally {
try {
connection.close();
} catch (SQLException ex) {
logger.error("An error occurred closing the database connection", ex);
}
}
return result;
}
public List<Var> getSuccVars(String pName, String vName,
String wfInstanceRef) throws SQLException {
List<Var> result = new ArrayList<Var>();
PreparedStatement ps = null;
Connection connection = null;
try {
connection = getConnection();
ps = connection.prepareStatement(
"SELECT v.* " + "FROM Arc a JOIN Var v ON a.sinkPNameRef = v.pnameRef " + "AND a.sinkVarNameRef = v.varName " + "AND a.wfInstanceRef = v.wfInstanceRef " + "WHERE sourceVarNameRef = ? AND sourcePNameRef = ?");
ps.setString(1, vName);
ps.setString(2, pName);
boolean success = ps.execute();
if (success) {
ResultSet rs = ps.getResultSet();
while (rs.next()) {
if (wfInstanceRef != null && !rs.getString("v.wfInstanceRef").equals(wfInstanceRef)) {
continue;
}
Var aVar = new Var();
aVar.setWfInstanceRef(rs.getString("WfInstanceRef"));
if (rs.getInt("inputOrOutput") == 1) {
aVar.setInput(true);
} else {
aVar.setInput(false);
}
aVar.setPName(rs.getString("pnameRef"));
aVar.setVName(rs.getString("varName"));
aVar.setType(rs.getString("type"));
aVar.setTypeNestingLevel(rs.getInt("nestingLevel"));
aVar.setActualNestingLevel(rs.getInt("actualNestingLevel"));
aVar.setANLset((rs.getInt("anlSet") == 1 ? true : false));
result.add(aVar);
}
}
} catch (InstantiationException e) {
logger.warn("Could not execute query: " + e);
} catch (IllegalAccessException e) {
logger.warn("Could not execute query: " + e);
} catch (ClassNotFoundException e) {
logger.warn("Could not execute query: " + e);
} finally {
if (connection != null) {
connection.close();
}
}
return result;
}
public List<String> getSuccProcessors(String pName, String wfNameRef, String wfInstanceId)
throws SQLException {
List<String> result = new ArrayList<String>();
PreparedStatement ps = null;
Connection connection = null;
try {
connection = getConnection();
ps = connection.prepareStatement(
"SELECT distinct sinkPNameRef FROM Arc A JOIN Wfinstance I on A.wfInstanceRef = I.wfnameRef " + "WHERE A.wfInstanceRef = ? and I.instanceID = ? AND sourcePNameRef = ?");
ps.setString(1, wfNameRef);
ps.setString(2, wfInstanceId);
ps.setString(3, pName);
boolean success = ps.execute();
if (success) {
ResultSet rs = ps.getResultSet();
while (rs.next()) {
result.add(rs.getString("sinkPNameRef"));
}
}
} catch (InstantiationException e) {
logger.warn("Could not execute query: " + e);
} catch (IllegalAccessException e) {
logger.warn("Could not execute query: " + e);
} catch (ClassNotFoundException e) {
logger.warn("Could not execute query: " + e);
} finally {
if (connection != null) {
connection.close();
}
}
return result;
}
/**
* get all processors of a given type within a structure identified by
* wfnameRef (reference to dataflow). type constraint is ignored if value is null.<br>
* this only returns the processor for the input wfNameRef, without going into any neted workflows
*
* @param wfnameRef
* @param type
* @return a list, that contains at most one element
* @throws SQLException
*/
public List<ProvenanceProcessor> getProcessorsShallow(String type, String wfnameRef)
throws SQLException {
Map<String, String> constraints = new HashMap<String, String>();
constraints.put("P.wfInstanceRef", wfnameRef);
if (type != null) {
constraints.put("P.type", type);
}
return getProcessors(constraints);
}
/**
* this is similar to {@link #getProcessorsShallow(String, String)} but it recursively fetches all processors
* within nested workflows. The result is collected in the form of a map: wfName -> {ProvenanceProcessor}
* @param type
* @param wfnameRef
* @return a map: wfName -> {ProvenanceProcessor} where wfName is the name of a (possibly nested) workflow, and
* the values are the processors within that workflow
*/
public Map<String, List<ProvenanceProcessor>> getProcessorsDeep(String type, String wfnameRef) {
Map<String, List<ProvenanceProcessor>> result = new HashMap<String, List<ProvenanceProcessor>>();
List<ProvenanceProcessor> currentProcs;
try {
currentProcs = getProcessorsShallow(type, wfnameRef);
result.put(wfnameRef, currentProcs);
for (ProvenanceProcessor pp:currentProcs) {
if (pp.getType() == DATAFLOW_TYPE) {
// recurse
Map<String, List<ProvenanceProcessor>> deepProcessors = getProcessorsDeep(type, pp.getWfInstanceRef());
for (Map.Entry<String, List<ProvenanceProcessor>> entry: deepProcessors.entrySet()) {
result.put(entry.getKey(), entry.getValue());
}
}
}
} catch (SQLException e) {
logger.error("Problem getting nested workflow processors for: " + wfnameRef + " : " + e);
}
return result;
}
/**
* generic method to fetch processors subject to additional query constraints
* @param constraints
* @return
* @throws SQLException
*/
public List<ProvenanceProcessor> getProcessors(
Map<String, String> constraints) throws SQLException {
List<ProvenanceProcessor> result = new ArrayList<ProvenanceProcessor>();
String q = "SELECT * FROM Processor P JOIN WfInstance W ON P.wfInstanceRef = W.wfnameRef "+
"JOIN Workflow WF on W.wfnameRef = WF.wfname";
q = addWhereClauseToQuery(q, constraints, true);
Statement stmt = null;
Connection connection = null;
try {
connection = getConnection();
stmt = connection.createStatement();
boolean success = stmt.execute(q);
if (success) {
ResultSet rs = stmt.getResultSet();
while (rs.next()) {
ProvenanceProcessor proc = new ProvenanceProcessor();
proc.setPname(rs.getString("pname"));
proc.setType(rs.getString("type"));
proc.setWfInstanceRef(rs.getString("wfInstanceRef"));
proc.setWorkflowExternalName(rs.getString("externalName"));
result.add(proc);
}
}
} catch (InstantiationException e) {
logger.warn("Could not execute query: " + e);
} catch (IllegalAccessException e) {
logger.warn("Could not execute query: " + e);
} catch (ClassNotFoundException e) {
logger.warn("Could not execute query: " + e);
} finally {
if (connection != null) {
connection.close();
}
}
return result;
}
public String getProcessorForWorkflow(String workflowID) {
PreparedStatement ps = null;
Connection connection = null;
try {
connection = getConnection();
ps = connection.prepareStatement(
"SELECT * from Processor WHERE wfInstanceRef = ?");
ps.setString(1, workflowID);
boolean success = ps.execute();
if (success) {
ResultSet rs = ps.getResultSet();
if (rs.next()) { return rs.getString("pname"); }
}
} catch (SQLException e) {
logger.error("Problem getting processor for workflow: " + workflowID+ " : " + e);
} catch (InstantiationException e) {
logger.error("Problem getting processor for workflow: " + workflowID+ " : " + e);
} catch (IllegalAccessException e) {
logger.error("Problem getting processor for workflow: " + workflowID+ " : " + e);
} catch (ClassNotFoundException e) {
logger.error("Problem getting processor for workflow: " + workflowID+ " : " + e);
} finally {
if (connection != null) {
try {
connection.close();
} catch (SQLException e) {
logger.error("Problem getting processor for workflow: " + workflowID+ " : " + e);
}
}
}
return null;
}
/**
* simplest possible pinpoint query. Uses iteration info straight away. Assumes result is in VarBinding not in Collection
*
* @param wfInstance
* @param pname
* @param vname
* @param iteration
* @return
*/
public LineageSQLQuery simpleLineageQuery(String wfInstance, String wfNameRef, String pname,
String vname, String iteration) {
LineageSQLQuery lq = new LineageSQLQuery();
String q1 = "SELECT * FROM VarBinding VB join Var V " +
"on (VB.varNameRef = V.varName and VB.PNameRef = V.PNameRef and VB.wfNameRef=V.wfInstanceRef) " +
"JOIN WfInstance W ON VB.wfInstanceRef = W.instanceID and VB.wfNameRef = W.wfnameRef ";
// + "LEFT OUTER JOIN Data D ON D.wfInstanceID = VB.wfInstanceRef and D.dataReference = VB.value";
// constraints:
Map<String, String> lineageQueryConstraints = new HashMap<String, String>();
lineageQueryConstraints.put("W.instanceID", wfInstance);
lineageQueryConstraints.put("VB.PNameRef", pname);
lineageQueryConstraints.put("VB.wfNameRef", wfNameRef);
if (vname != null) {
lineageQueryConstraints.put("VB.varNameRef", vname);
}
if (iteration != null) {
lineageQueryConstraints.put("VB.iteration", iteration);
}
q1 = addWhereClauseToQuery(q1, lineageQueryConstraints, false); // false:
// do
// not
// terminate
// query
// add order by clause
List<String> orderAttr = new ArrayList<String>();
orderAttr.add("varNameRef");
orderAttr.add("iteration");
q1 = addOrderByToQuery(q1, orderAttr, true);
logger.debug("Query is: " + q1);
lq.setVbQuery(q1);
return lq;
}
/**
* if var2Path is null this generates a trivial query for the current output
* var and current path
*
* @param wfInstanceID
* @param proc
* @param var2Path
* @param outputVar
* @param path
* @param returnOutputs
* returns inputs *and* outputs if set to true
* @return
*/
public List<LineageSQLQuery> lineageQueryGen(String wfInstanceID, String proc,
Map<Var, String> var2Path, Var outputVar, String path,
boolean returnOutputs) {
// setup
StringBuffer effectivePath = new StringBuffer();
List<LineageSQLQuery> newQueries = new ArrayList<LineageSQLQuery>();
// use the calculated path for each input var
boolean isInput = true;
for (Var v : var2Path.keySet()) {
LineageSQLQuery q = generateSQL2(wfInstanceID, proc, v.getVName(), var2Path.get(v), isInput);
if (q != null) {
newQueries.add(q);
}
}
// is returnOutputs is true, then use proc, path for the output var as well
if (returnOutputs) {
isInput = false;
LineageSQLQuery q = generateSQL2(wfInstanceID, proc, outputVar.getVName(), path, isInput); // && !var2Path.isEmpty());
if (q != null) {
newQueries.add(q);
}
}
return newQueries;
}
protected LineageSQLQuery generateSQL2(String wfInstance, String proc,
String var, String path, boolean returnInput) {
LineageSQLQuery lq = new LineageSQLQuery();
// constraints:
Map<String, String> collQueryConstraints = new HashMap<String, String>();
// base Collection query
String collQuery = "SELECT * FROM Collection C JOIN WfInstance W ON " + "C.wfInstanceRef = W.instanceID " + "JOIN Var V on " + "V.wfInstanceRef = W.wfnameRef and C.PNameRef = V.pnameRef and C.varNameRef = V.varName ";
collQueryConstraints.put("W.instanceID", wfInstance);
collQueryConstraints.put("C.PNameRef", proc);
if (path != null && path.length() > 0) {
collQueryConstraints.put("C.iteration", "[" + path + "]"); // PM 1/09 -- path
}
// inputs or outputs?
if (returnInput) {
collQueryConstraints.put("V.inputOrOutput", "1");
} else {
collQueryConstraints.put("V.inputOrOutput", "0");
}
collQuery = addWhereClauseToQuery(collQuery, collQueryConstraints, false);
lq.setCollQuery(collQuery);
// vb query
Map<String, String> vbQueryConstraints = new HashMap<String, String>();
// base VarBinding query
String vbQuery = "SELECT * FROM VarBinding VB JOIN WfInstance W ON " +
"VB.wfInstanceRef = W.instanceID " +
"JOIN Var V on " +
"V.wfInstanceRef = W.wfnameRef and VB.PNameRef = V.pnameRef and VB.varNameRef = V.varName ";
// "LEFT OUTER JOIN Data D ON D.wfInstanceID = VB.wfInstanceRef and D.dataReference = VB.value";
vbQueryConstraints.put("W.instanceID", wfInstance);
vbQueryConstraints.put("VB.PNameRef", proc);
vbQueryConstraints.put("VB.varNameRef", var);
if (path != null && path.length() > 0) {
vbQueryConstraints.put("VB.iteration", "[" + path + "]"); // PM 1/09 -- path
}
// limit to inputs?
if (returnInput) {
vbQueryConstraints.put("V.inputOrOutput", "1");
} else {
vbQueryConstraints.put("V.inputOrOutput", "0");
}
vbQuery = addWhereClauseToQuery(vbQuery, vbQueryConstraints, false);
List<String> orderAttr = new ArrayList<String>();
orderAttr.add("varNameRef");
orderAttr.add("iteration");
vbQuery = addOrderByToQuery(vbQuery, orderAttr, true);
lq.setVbQuery(vbQuery);
return lq;
}
/**
* if effectivePath is not null: query varBinding using: wfInstanceRef =
* wfInstance, iteration = effectivePath, PNameRef = proc if input vars is
* null, then use the output var this returns the bindings for the set of
* input vars at the correct iteration if effectivePath is null: fetch
* VarBindings for all input vars, without constraint on the iteration<br/>
* added outer join with Data<br/>
* additionally, try querying the collection table first -- if the query succeeds, it means
* the path is pointing to an internal node in the collection, and we just got the right node.
* Otherwise, query VarBinding for the leaves
*
* @param wfInstance
* @param proc
* @param effectivePath
* @param returnOutputs
* returns both inputs and outputs if set to true
* @return
*/
public LineageSQLQuery generateSQL(String wfInstance, String proc,
String effectivePath, boolean returnOutputs) {
LineageSQLQuery lq = new LineageSQLQuery();
// constraints:
Map<String, String> collQueryConstraints = new HashMap<String, String>();
// base Collection query
String collQuery = "SELECT * FROM Collection C JOIN WfInstance W ON " + "C.wfInstanceRef = W.instanceID " + "JOIN Var V on " + "V.wfInstanceRef = W.wfnameRef and C.PNameRef = V.pnameRef and C.varNameRef = V.varName ";
collQueryConstraints.put("W.instanceID", wfInstance);
collQueryConstraints.put("C.PNameRef", proc);
if (effectivePath != null && effectivePath.length() > 0) {
collQueryConstraints.put("C.iteration", "[" + effectivePath.toString() + "]"); // PM 1/09 -- path
}
// limit to inputs?
if (returnOutputs) {
collQueryConstraints.put("V.inputOrOutput", "1");
}
collQuery = addWhereClauseToQuery(collQuery, collQueryConstraints, false);
lq.setCollQuery(collQuery);
// vb query
Map<String, String> vbQueryConstraints = new HashMap<String, String>();
// base VarBinding query
// String vbQuery = "SELECT * FROM VarBinding VB JOIN wfInstance W ON " + "VB.wfInstanceRef = W.instanceID " + "JOIN Var V on " + "V.wfInstanceRef = W.wfnameRef and VB.PNameRef = V.pnameRef and VB.varNameRef = V.varName " + "LEFT OUTER JOIN Data D ON D.wfInstanceID = VB.wfInstanceRef and D.dataReference = VB.value";
String vbQuery = "SELECT * FROM VarBinding VB JOIN WfInstance W ON " +
"VB.wfInstanceRef = W.instanceID " +
"JOIN Var V on " +
"V.wfInstanceRef = W.wfnameRef and VB.PNameRef = V.pnameRef and VB.varNameRef = V.varName ";
vbQueryConstraints.put("W.instanceID", wfInstance);
vbQueryConstraints.put("VB.PNameRef", proc);
if (effectivePath != null && effectivePath.length() > 0) {
vbQueryConstraints.put("VB.iteration", "[" + effectivePath.toString() + "]"); // PM 1/09 -- path
}
// limit to inputs?
if (!returnOutputs) {
vbQueryConstraints.put("V.inputOrOutput", "1");
}
vbQuery = addWhereClauseToQuery(vbQuery, vbQueryConstraints, false);
List<String> orderAttr = new ArrayList<String>();
orderAttr.add("varNameRef");
orderAttr.add("iteration");
vbQuery = addOrderByToQuery(vbQuery, orderAttr, true);
lq.setVbQuery(vbQuery);
return lq;
}
public Dependencies runCollectionQuery(LineageSQLQuery lq) throws SQLException {
String q = lq.getCollQuery();
Dependencies lqr = new Dependencies();
if (q == null) {
return lqr;
}
logger.debug("running collection query: " + q);
Statement stmt = null;
Connection connection = null;
try {
connection = getConnection();
stmt = connection.createStatement();
boolean success = stmt.execute(q);
if (success) {
ResultSet rs = stmt.getResultSet();
while (rs.next()) {
String type = lqr.ATOM_TYPE; // temp -- FIXME
String wfNameRef = rs.getString("wfNameRef");
String wfInstance = rs.getString("wfInstanceRef");
String proc = rs.getString("PNameRef");
String var = rs.getString("varNameRef");
String it = rs.getString("iteration");
String coll = rs.getString("collID");
String parentColl = rs.getString("parentCollIDRef");
lqr.addLineageQueryResultRecord(wfNameRef, proc, var, wfInstance,
it, coll, parentColl, null, null, type, false, true); // true -> is a collection
}
}
} catch (InstantiationException e) {
logger.warn("Could not execute query: " + e);
} catch (IllegalAccessException e) {
logger.warn("Could not execute query: " + e);
} catch (ClassNotFoundException e) {
logger.warn("Could not execute query: " + e);
} finally {
if (connection != null) {
connection.close();
}
}
return lqr;
}
/**
*
* @param lq
* @param includeDataValue IGNORED. always false
* @return
* @throws SQLException
*/
public Dependencies runVBQuery(LineageSQLQuery lq, boolean includeDataValue) throws SQLException {
String q = lq.getVbQuery();
logger.debug("running VB query: " + q);
Statement stmt = null;
Connection connection = null;
try {
connection = getConnection();
stmt = connection.createStatement();
boolean success = stmt.execute(q);
if (success) {
ResultSet rs = stmt.getResultSet();
Dependencies lqr = new Dependencies();
while (rs.next()) {
String type = lqr.ATOM_TYPE; // temp -- FIXME
String wfNameRef = rs.getString("wfNameRef");
String wfInstance = rs.getString("wfInstanceRef");
String proc = rs.getString("PNameRef");
String var = rs.getString("varNameRef");
String it = rs.getString("iteration");
String coll = rs.getString("collIDRef");
String value = rs.getString("value");
boolean isInput = (rs.getInt("inputOrOutput") == 1) ? true
: false;
// FIXME there is no D and no VB - this is in generateSQL,
// not simpleLineageQuery
// commented out as D table no longer available. Need to replace this with deref from DataManager
// if (includeDataValue) {
// String resolvedValue = rs.getString("D.data");
// System.out.println("resolved value: "+resolvedValue);
// lqr.addLineageQueryResultRecord(wfNameRef, proc, var, wfInstance,
// it, coll, null, value, resolvedValue, type, isInput, false); // false -> not a collection
// } else {
// FIXME if the data is required then the query needs
// fixing
lqr.addLineageQueryResultRecord(wfNameRef, proc, var, wfInstance,
it, coll, null, value, null, type, isInput, false);
// }
}
return lqr;
}
} catch (InstantiationException e) {
logger.warn("Could not execute query: " + e);
} catch (IllegalAccessException e) {
logger.warn("Could not execute query: " + e);
} catch (ClassNotFoundException e) {
logger.warn("Could not execute query: " + e);
} finally {
if (connection != null) {
connection.close();
}
}
return null;
}
/**
* executes one of the lineage queries produced by the graph visit algorithm. This first executes the collection query, and then
* if no result is returned, the varBinding query
*
* @param lq
* a lineage query computed during the graph traversal
* @param includeDataValue
* if true, then the referenced value is included in the result.
* This may only be necessary for testing: the data reference in
* field value (which is a misleading field name, and actually
* refers to the data reference) should be sufficient
* @return
* @throws SQLException
*/
public Dependencies runLineageQuery(LineageSQLQuery lq,
boolean includeDataValue) throws SQLException {
Dependencies result = runCollectionQuery(lq);
if (result.getRecords().size() == 0) // query was really VB
{
return runVBQuery(lq, includeDataValue);
}
return result;
}
public List<Dependencies> runLineageQueries(
List<LineageSQLQuery> lqList, boolean includeDataValue)
throws SQLException {
List<Dependencies> allResults = new ArrayList<Dependencies>();
if (lqList == null) {
logger.warn("lineage queries list is NULL, nothing to evaluate");
return allResults;
}
for (LineageSQLQuery lq : lqList) {
if (lq == null) {
continue;
}
allResults.add(runLineageQuery(lq, includeDataValue));
}
return allResults;
}
/**
* takes an ordered set of records for the same variable with iteration
* indexes and builds a collection out of it
*
* @param lqr
* @return a jdom Document with the collection
*/
public Document recordsToCollection(Dependencies lqr) {
// process each var name in turn
// lqr ordered by var name and by iteration number
Document d = new Document(new Element("list"));
String currentVar = null;
for (ListIterator<LineageQueryResultRecord> it = lqr.iterator(); it.hasNext();) {
LineageQueryResultRecord record = it.next();
if (currentVar != null && record.getVname().equals(currentVar)) { // multiple
// occurrences
addToCollection(record, d); // adds record to d in the correct
// position given by the iteration
// vector
}
if (currentVar == null) {
currentVar = record.getVname();
}
}
return d;
}
private void addToCollection(LineageQueryResultRecord record, Document d) {
Element root = d.getRootElement();
String[] itVector = record.getIteration().split(",");
Element currentEl = root;
// each element gives us a corresponding child in the tree
for (int i = 0; i < itVector.length; i++) {
int index = Integer.parseInt(itVector[i]);
List<Element> children = currentEl.getChildren();
if (index < children.size()) { // we already have the child, just
// descend
currentEl = children.get(index);
} else { // create child
if (i == itVector.length - 1) { // this is a leaf --> atomic
// element
currentEl.addContent(new Element(record.getValue()));
} else { // create internal element
currentEl.addContent(new Element("list"));
}
}
}
}
/**
* returns the set of all processors that are structurally contained within
* the wf corresponding to the input dataflow name
* @param dataflowName the name of a processor of type DataFlowActivity
* @return
*/
public List<String> getContainedProcessors(String dataflowName) {
List<String> result = new ArrayList<String>();
// dataflow name -> wfRef
String containerDataflow = getWfNameForDataflow(dataflowName);
// get all processors within containerDataflow
PreparedStatement ps = null;
Connection connection = null;
try {
connection = getConnection();
ps = connection.prepareStatement(
// "SELECT pname FROM Processor P join wfInstance I on P.wfInstanceRef = I.wfnameRef " +
// "where wfInstanceRef = ? and I.instanceID = ?");
"SELECT pname FROM Processor P " +
"where wfInstanceRef = ?");
ps.setString(1, containerDataflow);
// ps.setString(2, instanceID);
boolean success = ps.execute();
if (success) {
ResultSet rs = ps.getResultSet();
while (rs.next()) {
result.add(rs.getString("pname"));
}
}
} catch (InstantiationException e) {
logger.warn("Could not execute query: " + e);
} catch (IllegalAccessException e) {
logger.warn("Could not execute query: " + e);
} catch (ClassNotFoundException e) {
logger.warn("Could not execute query: " + e);
} catch (SQLException e) {
logger.warn("Could not execute query: " + e);
} finally {
try {
connection.close();
} catch (SQLException ex) {
logger.error("An error occurred closing the database connection", ex);
}
}
return result;
}
public String getTopLevelDataflowName(String wfInstanceID) {
PreparedStatement ps = null;
Connection connection = null;
try {
connection = getConnection();
ps = connection.prepareStatement(
"SELECT pname FROM Processor P join WfInstance I on P.wfInstanceRef = I.wfnameRef " +
"where I.instanceID =? and isTopLevel = 1");
ps.setString(1, wfInstanceID);
boolean success = ps.execute();
if (success) {
ResultSet rs = ps.getResultSet();
if (rs.next()) {
return rs.getString("pname");
}
}
} catch (InstantiationException e) {
logger.warn("Could not execute query: " + e);
} catch (IllegalAccessException e) {
logger.warn("Could not execute query: " + e);
} catch (ClassNotFoundException e) {
logger.warn("Could not execute query: " + e);
} catch (SQLException e) {
logger.warn("Could not execute query: " + e);
} finally {
try {
connection.close();
} catch (SQLException ex) {
logger.error("An error occurred closing the database connection", ex);
}
}
return null;
}
/**
* returns the internal ID of a dataflow given its external name
* @param dataflowName
* @param instanceID
* @return
*/
public String getWfNameForDataflow(String dataflowName) {
PreparedStatement ps = null;
Connection connection = null;
try {
connection = getConnection();
ps = connection.prepareStatement(
// "SELECT wfname FROM Workflow W join WfInstance I on W.wfname = I.wfNameRef WHERE W.externalName = ? and I.instanceID = ?");
"SELECT wfname FROM Workflow W WHERE W.externalName = ?");
ps.setString(1, dataflowName);
// ps.setString(2, instanceID);
boolean success = ps.execute();
if (success) {
ResultSet rs = ps.getResultSet();
if (rs.next()) {
return rs.getString("wfname");
}
}
} catch (InstantiationException e) {
logger.warn("Could not execute query: " + e);
} catch (IllegalAccessException e) {
logger.warn("Could not execute query: " + e);
} catch (ClassNotFoundException e) {
logger.warn("Could not execute query: " + e);
} catch (SQLException e) {
logger.error("Could not execute query: " + e);
} finally {
try {
if (connection != null) {
connection.close();
}
} catch (SQLException ex) {
logger.error("An error occurred closing the database connection", ex);
}
}
return null;
}
public List<String> getChildrenOfWorkflow(String parentWFName)
throws SQLException {
List<String> result = new ArrayList<String>();
PreparedStatement ps = null;
Connection connection = null;
try {
connection = getConnection();
ps = connection.prepareStatement(
"SELECT wfname FROM Workflow WHERE parentWFname = ? ");
ps.setString(1, parentWFName);
boolean success = ps.execute();
if (success) {
ResultSet rs = ps.getResultSet();
while (rs.next()) {
result.add(rs.getString("wfname"));
}
}
} catch (InstantiationException e) {
logger.warn("Could not execute query: " + e);
} catch (IllegalAccessException e) {
logger.warn("Could not execute query: " + e);
} catch (ClassNotFoundException e) {
logger.warn("Could not execute query: " + e);
} finally {
if (connection != null) {
connection.close();
}
}
return result;
}
/**
* fetch children of parentWFName from the Workflow table
*
* @return
* @param childWFName
* @throws SQLException
*/
public String getParentOfWorkflow(String childWFName) throws SQLException {
PreparedStatement ps = null;
String result = null;
Connection connection = null;
String q = "SELECT parentWFname FROM Workflow WHERE wfname = ?";
// Statement stmt;
try {
connection = getConnection();
ps = connection.prepareStatement(q);
ps.setString(1, childWFName);
logger.debug("getParentOfWorkflow - query: " + q + " with wfname = " + childWFName);
boolean success = ps.execute();
if (success) {
ResultSet rs = ps.getResultSet();
while (rs.next()) {
result = rs.getString("parentWFname");
logger.debug("result: " + result);
break;
}
}
} catch (InstantiationException e) {
logger.warn("Could not execute query: " + e);
} catch (IllegalAccessException e) {
logger.warn("Could not execute query: " + e);
} catch (ClassNotFoundException e) {
logger.warn("Could not execute query: " + e);
} finally {
if (connection != null) {
connection.close();
}
}
return result;
}
public List<String> getAllWFnames() throws SQLException {
List<String> result = new ArrayList<String>();
String q = "SELECT wfname FROM Workflow";
Statement stmt = null;
Connection connection = null;
try {
connection = getConnection();
stmt = connection.createStatement();
boolean success = stmt.execute(q);
if (success) {
ResultSet rs = stmt.getResultSet();
while (rs.next()) {
result.add(rs.getString("wfname"));
}
}
} catch (InstantiationException e) {
logger.warn("Could not execute query: " + e);
} catch (IllegalAccessException e) {
logger.warn("Could not execute query: " + e);
} catch (ClassNotFoundException e) {
logger.warn("Could not execute query: " + e);
} finally {
if (connection != null) {
connection.close();
}
}
return result;
}
/**
*
* @param procName
* @return true if procName is the external name of a dataflow, false
* otherwise
* @throws SQLException
*/
public boolean isDataflow(String procName) throws SQLException {
PreparedStatement ps = null;
Connection connection = null;
try {
connection = getConnection();
ps = connection.prepareStatement(
"SELECT type FROM Processor WHERE pname = ?");
ps.setString(1, procName);
boolean success = ps.execute();
if (success) {
ResultSet rs = ps.getResultSet();
if (rs.next() && rs.getString("type") != null && rs.getString("type").equals(DATAFLOW_TYPE)) {
return true;
}
}
} catch (InstantiationException e) {
logger.warn("Could not execute query: " + e);
} catch (IllegalAccessException e) {
logger.warn("Could not execute query: " + e);
} catch (ClassNotFoundException e) {
logger.warn("Could not execute query: " + e);
} finally {
if (connection != null) {
connection.close();
}
}
return false;
}
public String getTopDataflow(String wfInstanceID) {
PreparedStatement ps = null;
Connection connection = null;
try {
connection = getConnection();
ps = connection.prepareStatement(
"SELECT * FROM T2Provenance.Processor P join WfInstance I on P.wfInstanceRef = I.wfNameRef " +
" where I.instanceID = ? " +
" and isTopLevel = 1 ");
ps.setString(1, wfInstanceID);
boolean success = ps.execute();
if (success) {
ResultSet rs = ps.getResultSet();
if (rs.next()) {
return rs.getString("PName");
}
}
} catch (SQLException e) {
logger.warn("Could not execute query: " + e);
} catch (InstantiationException e) {
logger.warn("Could not execute query: " + e);
} catch (IllegalAccessException e) {
logger.warn("Could not execute query: " + e);
} catch (ClassNotFoundException e) {
logger.warn("Could not execute query: " + e);
} finally {
try {
if (connection != null) {
connection.close();
}
} catch (SQLException ex) {
logger.error("An error occurred closing the database connection", ex);
}
}
return null;
}
/**
*
* @param p
* pTo processor
* @param var
* vTo
* @param value
* valTo
* @return a set of DDRecord
* @throws SQLException
*/
public List<DDRecord> queryDD(String p, String var, String value,
String iteration, String wfInstance) throws SQLException {
List<DDRecord> result = new ArrayList<DDRecord>();
Map<String, String> queryConstraints = new HashMap<String, String>();
queryConstraints.put("pTo", p);
queryConstraints.put("vTo", var);
if (value != null) {
queryConstraints.put("valTo", value);
}
if (iteration != null) {
queryConstraints.put("iteration", iteration);
}
if (wfInstance != null) {
queryConstraints.put("wfInstance", wfInstance);
}
String q = "SELECT * FROM DD ";
q = addWhereClauseToQuery(q, queryConstraints, true); // true: terminate
// SQL statement
Statement stmt;
Connection connection = null;
try {
connection = getConnection();
stmt = connection.createStatement();
boolean success = stmt.execute(q);
if (success) {
ResultSet rs = stmt.getResultSet();
while (rs.next()) {
DDRecord aDDrecord = new DDRecord();
aDDrecord.setPFrom(rs.getString("pFrom"));
aDDrecord.setVFrom(rs.getString("vFrom"));
aDDrecord.setValFrom(rs.getString("valFrom"));
aDDrecord.setPTo(rs.getString("pTo"));
aDDrecord.setVTo(rs.getString("vTo"));
aDDrecord.setValTo(rs.getString("valTo"));
result.add(aDDrecord);
}
return result;
}
} catch (InstantiationException e) {
logger.warn("Could not execute query: " + e);
} catch (IllegalAccessException e) {
logger.warn("Could not execute query: " + e);
} catch (ClassNotFoundException e) {
logger.warn("Could not execute query: " + e);
} finally {
if (connection != null) {
connection.close();
}
}
return null;
}
public Set<DDRecord> queryArcsForDD(String p, String v, String val,
String wfInstance) throws SQLException {
Set<DDRecord> result = new HashSet<DDRecord>();
PreparedStatement ps = null;
Connection connection = null;
String q = "SELECT DISTINCT A.sourcePNameRef AS p, A.sourceVarNameRef AS var, VB.value AS val " + "FROM Arc A JOIN VarBinding VB ON VB.varNameRef = A.sinkVarNameRef AND VB.PNameRef = A.sinkPNameRef " + "JOIN WfInstance WF ON WF.wfnameRef = A.wfInstanceRef AND WF.instanceID = VB.wfInstanceRef " + "WHERE WF.instanceID = '" + wfInstance + "' AND A.sinkPNameRef = '" + p + "' AND A.sinkVarNameRef = '" + v + "' AND VB.value = '" + val + "' ";
// Statement stmt;
try {
connection = getConnection();
ps = connection.prepareStatement(
"SELECT DISTINCT A.sourcePNameRef AS p, A.sourceVarNameRef AS var, VB.value AS val " + "FROM Arc A JOIN VarBinding VB ON VB.varNameRef = A.sinkVarNameRef AND VB.PNameRef = A.sinkPNameRef " + "JOIN WfInstance WF ON WF.wfnameRef = A.wfInstanceRef AND WF.instanceID = VB.wfInstanceRef " + "WHERE WF.instanceID = ? AND A.sinkPNameRef = ? AND A.sinkVarNameRef = ? AND VB.value = ?");
ps.setString(1, wfInstance);
ps.setString(2, p);
ps.setString(3, v);
ps.setString(4, val);
boolean success = ps.execute();
if (success) {
ResultSet rs = ps.getResultSet();
while (rs.next()) {
DDRecord aDDrecord = new DDRecord();
aDDrecord.setPTo(rs.getString("p"));
aDDrecord.setVTo(rs.getString("var"));
aDDrecord.setValTo(rs.getString("val"));
result.add(aDDrecord);
}
return result;
}
} catch (InstantiationException e) {
logger.warn("Could not execute query: " + e);
} catch (IllegalAccessException e) {
logger.warn("Could not execute query: " + e);
} catch (ClassNotFoundException e) {
logger.warn("Could not execute query: " + e);
} finally {
if (connection != null) {
connection.close();
}
}
return null;
}
public Set<DDRecord> queryAllFromValues(String wfInstance)
throws SQLException {
Set<DDRecord> result = new HashSet<DDRecord>();
PreparedStatement ps = null;
Connection connection = null;
try {
connection = getConnection();
ps = connection.prepareStatement(
"SELECT DISTINCT PFrom, vFrom, valFrom FROM DD where wfInstance = ?");
ps.setString(1, wfInstance);
boolean success = ps.execute();
if (success) {
ResultSet rs = ps.getResultSet();
while (rs.next()) {
DDRecord aDDrecord = new DDRecord();
aDDrecord.setPFrom(rs.getString("PFrom"));
aDDrecord.setVFrom(rs.getString("vFrom"));
aDDrecord.setValFrom(rs.getString("valFrom"));
result.add(aDDrecord);
}
return result;
}
} catch (InstantiationException e) {
logger.warn("Could not execute query: " + e);
} catch (IllegalAccessException e) {
logger.warn("Could not execute query: " + e);
} catch (ClassNotFoundException e) {
logger.warn("Could not execute query: " + e);
} finally {
if (connection != null) {
connection.close();
}
}
return null;
}
public boolean isRootProcessorOfWorkflow(String procName, String wfName,
String wfInstanceId) {
PreparedStatement ps = null;
Connection connection = null;
try {
connection = getConnection();
ps = connection.prepareStatement(
"SELECT * FROM Arc A join WfInstance I on A.wfInstanceRef = I.wfnameRef " +
"join Processor P on P.pname = A.sourcePnameRef where sourcePnameRef = ? " +
"and P.wfInstanceRef <> A.wfInstanceRef " +
"and I.instanceID = ? " +
"and sinkPNameRef = ? ");
ps.setString(1, wfName);
ps.setString(2, wfInstanceId);
ps.setString(3, procName);
boolean success = ps.execute();
if (success) {
ResultSet rs = ps.getResultSet();
if (rs.next()) {
return true;
}
}
} catch (InstantiationException e) {
logger.warn("Could not execute query: " + e);
} catch (IllegalAccessException e) {
logger.warn("Could not execute query: " + e);
} catch (ClassNotFoundException e) {
logger.warn("Could not execute query: " + e);
} catch (SQLException e) {
logger.warn("Could not execute query: " + e);
} finally {
if (connection != null) {
try {
connection.close();
} catch (SQLException ex) {
logger.error("There was an error closing the database connection", ex);
}
}
}
return false;
}
public List<Workflow> getContainingWorkflowsForProcessor(
String pname) {
List<Workflow> wfList = new ArrayList<Workflow>();
PreparedStatement ps = null;
Connection connection = null;
try {
connection = getConnection();
ps = connection.prepareStatement(
"SELECT * FROM T2Provenance.Processor P "+
"join Workflow W on P.wfInstanceRef = W.wfName "+
"where pname = ? ");
ps.setString(1, pname);
boolean success = ps.execute();
if (success) {
ResultSet rs = ps.getResultSet();
while (rs.next()) {
Workflow wf = new Workflow();
wf.setWfName(rs.getString("wfInstanceRef"));
wf.setParentWFname(rs.getString("parentWFName"));
wfList.add(wf);
}
}
} catch (InstantiationException e) {
logger.warn("Could not execute query: " + e);
} catch (IllegalAccessException e) {
logger.warn("Could not execute query: " + e);
} catch (ClassNotFoundException e) {
logger.warn("Could not execute query: " + e);
} catch (SQLException e) {
logger.warn("Could not execute query: " + e);
} finally {
if (connection != null) {
try {
connection.close();
} catch (SQLException ex) {
logger.error("There was an error closing the database connection", ex);
}
}
}
return wfList;
}
public Workflow getWorkflow(String dataflowID) {
PreparedStatement ps = null;
Connection connection = null;
try {
connection = getConnection();
ps = connection.prepareStatement(
"SELECT * FROM T2Provenance.Workflow W "+
"where wfname = ? ");
ps.setString(1, dataflowID);
boolean success = ps.execute();
if (success) {
ResultSet rs = ps.getResultSet();
if (rs.next()) {
Workflow wf = new Workflow();
wf.setWfName(rs.getString("wfname"));
wf.setParentWFname(rs.getString("parentWFName"));
wf.setExternalName(rs.getString("externalName"));
return wf;
}
}
} catch (InstantiationException e) {
logger.warn("Could not execute query: " + e);
} catch (IllegalAccessException e) {
logger.warn("Could not execute query: " + e);
} catch (ClassNotFoundException e) {
logger.warn("Could not execute query: " + e);
} catch (SQLException e) {
logger.warn("Could not execute query: " + e);
} finally {
if (connection != null) {
try {
connection.close();
} catch (SQLException ex) {
logger.error("There was an error closing the database connection", ex);
}
}
}
return null;
}
}