blob: fdff4b16386c766cbf8f6237737f17ceebbdddcf [file] [log] [blame]
/*******************************************************************************
* Copyright (C) 2007 The University of Manchester
*
* Modifications to the initial code base are copyright of their
* respective authors, or their employers as appropriate.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU Lesser General Public License
* as published by the Free Software Foundation; either version 2.1 of
* the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful, but
* WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* Lesser General Public License for more details.
*
* You should have received a copy of the GNU Lesser General Public
* License along with this program; if not, write to the Free Software
* Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
******************************************************************************/
package net.sf.taverna.t2.provenance.connector;
import java.sql.Connection;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.List;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.FutureTask;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import net.sf.taverna.t2.provenance.item.ProvenanceItem;
import net.sf.taverna.t2.provenance.item.WorkflowProvenanceItem;
import net.sf.taverna.t2.provenance.lineageservice.EventProcessor;
import net.sf.taverna.t2.provenance.lineageservice.LineageQueryResult;
import net.sf.taverna.t2.provenance.lineageservice.LineageQueryResultRecord;
import net.sf.taverna.t2.provenance.lineageservice.Provenance;
import net.sf.taverna.t2.provenance.lineageservice.ProvenanceQuery;
import net.sf.taverna.t2.provenance.lineageservice.ProvenanceWriter;
import net.sf.taverna.t2.provenance.lineageservice.utils.ProvenanceAnalysis;
import net.sf.taverna.t2.provenance.reporter.ProvenanceReporter;
import org.apache.log4j.Logger;
/**
* Collects {@link ProvenanceItem}s as it travels up and down the dispatch stack
* inside the InvocationContext
*
* @author Ian Dunlop
*
*/
public abstract class ProvenanceConnector implements ProvenanceReporter {
private static Logger logger = Logger.getLogger(ProvenanceConnector.class);
private String saveEvents;
protected Connection connection;
private ProvenanceAnalysis provenanceAnalysis;
private ExecutorService executor = Executors.newSingleThreadExecutor();
private boolean isClearDB = false;
private Provenance provenance;
private String dbURL;
private boolean finished = false;
private String sessionID;
public ProvenanceConnector() {
}
public ProvenanceConnector(Provenance provenance,
ProvenanceAnalysis provenanceAnalysis, String dbURL,
boolean isClearDB, String saveEvents) {
setProvenance(provenance);
this.setProvenanceAnalysis(provenanceAnalysis);
this.dbURL = dbURL;
this.isClearDB = isClearDB;
this.saveEvents = saveEvents;
getProvenance().setSaveEvents(this.saveEvents);
}
protected Connection getConnection() throws InstantiationException,
IllegalAccessException, ClassNotFoundException {
if (connection == null) {
openConnection();
}
return connection;
}
/**
* Used by database backed provenance stores. Ask the implementation to
* create the database. Requires each datbase type to create all its own
* tables
*/
public abstract void createDatabase();
/**
* Used by database backed provenance stores. Ask the implementation to
* delete the database.
*/
public abstract void deleteDatabase();
/**
* Clear all the values in the database but keep the db there
*/
public void clearDatabase() {
String q = null;
Statement stmt = null;
try {
stmt = getConnection().createStatement();
} catch (SQLException e) {
logger.warn("Could not create database statement :" + e);
} catch (InstantiationException e) {
logger.warn("Could not create database statement :" + e);
} catch (IllegalAccessException e) {
logger.warn("Could not create database statement :" + e);
} catch (ClassNotFoundException e) {
logger.warn("Could not create database statement :" + e);
}
q = "DELETE FROM Workflow";
try {
stmt.executeUpdate(q);
} catch (SQLException e) {
logger.warn("Could not execute statement " + q + " :" + e);
}
q = "DELETE FROM Processor";
try {
stmt.executeUpdate(q);
} catch (SQLException e) {
logger.warn("Could not execute statement " + q + " :" + e);
}
q = "DELETE FROM Arc";
try {
stmt.executeUpdate(q);
} catch (SQLException e) {
logger.warn("Could not execute statement " + q + " :" + e);
}
q = "DELETE FROM Var";
try {
stmt.executeUpdate(q);
} catch (SQLException e) {
logger.warn("Could not execute statement " + q + " :" + e);
}
q = "DELETE FROM WfInstance";
try {
stmt.executeUpdate(q);
} catch (SQLException e) {
logger.warn("Could not execute statement " + q + " :" + e);
}
q = "DELETE FROM ProcBinding";
try {
stmt.executeUpdate(q);
} catch (SQLException e) {
logger.warn("Could not execute statement " + q + " :" + e);
}
q = "DELETE FROM VarBinding";
try {
stmt.executeUpdate(q);
} catch (SQLException e) {
logger.warn("Could not execute statement " + q + " :" + e);
}
q = "DELETE FROM Collection";
try {
stmt.executeUpdate(q);
} catch (SQLException e) {
logger.warn("Could not execute statement " + q + " :" + e);
}
q = "DELETE FROM Data";
try {
stmt.executeUpdate(q);
} catch (SQLException e) {
logger.warn("Could not execute statement " + q + " :" + e);
}
}
/**
* The name for this type of provenance connector. Is used by the workbench
* to ensure it adds the correct one to the InvocationContext
*
* @return
*/
public abstract String getName();
/**
* A unique identifier for this run of provenance, should correspond to the
* initial {@link WorkflowProvenanceItem} idenifier that gets sent through
*
* @param identifier
*/
public void setSessionID(String sessionID) {
this.sessionID = sessionID;
}
/**
* What is the unique identifier used by this connector
*
* @return
*/
public String getSessionID() {
return sessionID;
}
public boolean isClearDB() {
return isClearDB;
}
public void setClearDB(boolean isClearDB) {
this.isClearDB = isClearDB;
}
/**
* Set up the the {@link EventProcessor}, {@link ProvenanceWriter} &
* {@link ProvenanceQuery}. Since it is an SPI you don't want any code
* cluttering the default constructor. Call this method after instantiation
* and after the dbURL has been set.
*/
public abstract void init();
public List<LineageQueryResultRecord> getIntermediateValues(
final String wfInstance, final String pname, final String vname,
final String iteration) throws Exception {
LineageQueryResult fetchIntermediateResult = getProvenanceAnalysis()
.fetchIntermediateResult(wfInstance, pname, vname, iteration);
LineageQueryResult result = null;
FutureTask<LineageQueryResult> future = new FutureTask<LineageQueryResult>(
new Callable<LineageQueryResult>() {
public LineageQueryResult call() throws Exception {
try {
// LineageSQLQuery simpleLineageQuery = provenance
// .getPq().simpleLineageQuery(wfInstance,
// pname, vname, iteration);
LineageQueryResult runLineageQuery = getProvenanceAnalysis()
.fetchIntermediateResult(wfInstance, pname,
vname, iteration);
// runLineageQuery = provenance.getPq()
// .runLineageQuery(simpleLineageQuery);
return runLineageQuery;
} catch (SQLException e) {
throw e;
}
}
});
getExecutor().submit(future);
try {
return future.get().getRecords();
} catch (InterruptedException e1) {
throw e1;
} catch (ExecutionException e1) {
throw e1;
}
}
public List<LineageQueryResultRecord> computeLineage(String wfInstance,
String var, String proc, String path, Set<String> selectedProcessors) {
return null;
}
public String getDataflowInstance(String dataflowId) {
String instanceID = null;
try {
instanceID = (getProvenance()).getPq().getWFInstanceID(dataflowId);
} catch (SQLException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
return instanceID;
}
/**
* @return the saveEvents
*/
public String getSaveEvents() {
return saveEvents;
}
/**
* @param saveEvents
* the saveEvents to set
*/
public void setSaveEvents(String saveEvents) {
this.saveEvents = saveEvents;
}
protected abstract void openConnection() throws InstantiationException,
IllegalAccessException, ClassNotFoundException;
public void setDbURL(String dbURL) {
this.dbURL = dbURL;
}
public String getDbURL() {
return dbURL;
}
public void setProvenance(Provenance provenance) {
this.provenance = provenance;
}
public Provenance getProvenance() {
return provenance;
}
public void setFinished(boolean finished) {
this.finished = finished;
}
public boolean isFinished() {
return finished;
}
public void setExecutor(ExecutorService executor) {
this.executor = executor;
}
public synchronized ExecutorService getExecutor() {
return executor;
}
public void setProvenanceAnalysis(ProvenanceAnalysis provenanceAnalysis) {
this.provenanceAnalysis = provenanceAnalysis;
}
/**
* Use this {@link ProvenanceAnalysis} to carry out lineage queries on the
* provenance
*
* @return
*/
public ProvenanceAnalysis getProvenanceAnalysis() {
return provenanceAnalysis;
}
}