blob: cfe09db6bd4b150a727c007517c90a601041ae6c [file] [log] [blame]
/*******************************************************************************
* Copyright (C) 2007 The University of Manchester
*
* Modifications to the initial code base are copyright of their
* respective authors, or their employers as appropriate.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU Lesser General Public License
* as published by the Free Software Foundation; either version 2.1 of
* the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful, but
* WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* Lesser General Public License for more details.
*
* You should have received a copy of the GNU Lesser General Public
* License along with this program; if not, write to the Free Software
* Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
******************************************************************************/
package net.sf.taverna.t2.provenance.connector;
import java.io.IOException;
import java.sql.Connection;
import java.sql.SQLException;
import java.util.List;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import net.sf.taverna.t2.invocation.InvocationContext;
import net.sf.taverna.t2.provenance.item.ProvenanceItem;
import net.sf.taverna.t2.provenance.item.WorkflowProvenanceItem;
import net.sf.taverna.t2.provenance.lineageservice.EventProcessor;
import net.sf.taverna.t2.provenance.lineageservice.LineageQueryResultRecord;
import net.sf.taverna.t2.provenance.lineageservice.Provenance;
import net.sf.taverna.t2.provenance.lineageservice.ProvenanceAnalysis;
import net.sf.taverna.t2.provenance.lineageservice.ProvenanceQuery;
import net.sf.taverna.t2.provenance.lineageservice.ProvenanceWriter;
import net.sf.taverna.t2.provenance.lineageservice.WorkflowDataProcessor;
import net.sf.taverna.t2.provenance.reporter.ProvenanceReporter;
import net.sf.taverna.t2.reference.ReferenceService;
import org.apache.log4j.Logger;
/**
* Collects {@link ProvenanceItem}s as it travels up and down the dispatch stack
* inside the InvocationContext
*
* @author Ian Dunlop
* @author Stuart Owen
*
*/
public abstract class ProvenanceConnector implements ProvenanceReporter {
private static Logger logger = Logger.getLogger(ProvenanceConnector.class);
private String saveEvents;
private ProvenanceAnalysis provenanceAnalysis;
private ExecutorService executor = Executors.newSingleThreadExecutor();
private boolean finished = false;
private String sessionID;
private InvocationContext invocationContext;
private ReferenceService referenceService;
private Provenance provenance;
private ProvenanceWriter writer;
private ProvenanceQuery query;
private WorkflowDataProcessor wfdp;
private EventProcessor eventProcessor;
public ProvenanceConnector() { }
/**
* Set up the the {@link EventProcessor}, {@link ProvenanceWriter} &
* {@link ProvenanceQuery}. Since it is an SPI you don't want any code
* cluttering the default constructor. Call this method after instantiation
* and after the dbURL has been set.
*/
public void init() {
createDatabase();
try {
setWfdp(new WorkflowDataProcessor());
getWfdp().setPq(getQuery());
getWfdp().setPw(getWriter());
setEventProcessor(new EventProcessor());
getEventProcessor().setPw(getWriter());
getEventProcessor().setPq(getQuery());
getEventProcessor().setWfdp(getWfdp());
setProvenanceAnalysis(new ProvenanceAnalysis(getQuery()));
setProvenance(new Provenance(getEventProcessor()));
} catch (InstantiationException e) {
logger.error("Problem with provenance initialisation: ",e);
} catch (IllegalAccessException e) {
logger.error("Problem with provenance initialisation: ",e);
} catch (ClassNotFoundException e) {
logger.error("Problem with provenance initialisation: ",e);
} catch (SQLException e) {
logger.error("Problem with provenance initialisation: ",e);
}
}
/**
* @return the invocationContext
*/
public InvocationContext getInvocationContext() {
return invocationContext;
}
/**
* @param invocationContext the invocationContext to set
*/
public void setInvocationContext(InvocationContext invocationContext) {
this.invocationContext = invocationContext;
}
/**
* @return the referenceService
*/
public ReferenceService getReferenceService() {
return referenceService;
}
/**
* @param referenceService the referenceService to set
*/
public void setReferenceService(ReferenceService referenceService) {
this.referenceService = referenceService;
}
/**
* Uses a {@link ScheduledThreadPoolExecutor} to process events in a Thread
* safe manner
*/
public synchronized void addProvenanceItem(
final ProvenanceItem provenanceItem) {
// Runnable runnable = new Runnable() {
//
// public void run() {
try {
getProvenance().acceptRawProvenanceEvent(
provenanceItem.getEventType(), provenanceItem);
} catch (SQLException e) {
logger.warn("Could not add provenance for " + provenanceItem.getEventType() + " " + provenanceItem.getIdentifier() + " " + e);
} catch (IOException e) {
logger.warn("Could not add provenance for " + provenanceItem.getEventType() + " " + provenanceItem.getIdentifier() + " " + e);
}
//
// }
// };
// getExecutor().execute(runnable);
}
protected Connection getConnection() throws InstantiationException,
IllegalAccessException, ClassNotFoundException, SQLException {
return JDBCConnector.getConnection();
}
/**
* Used by database backed provenance stores. Ask the implementation to
* create the database. Requires each datbase type to create all its own
* tables
*/
public abstract void createDatabase();
public void clearDatabase() { clearDatabase(true); }
/**
* Clear all the values in the database but keep the db there
*/
public void clearDatabase(boolean isClearDB) {
if (isClearDB) {
logger.info("clearing DB");
try {
getWriter().clearDBStatic();
Set<String> danglingDataRefs = getWriter().clearDBDynamic();
logger.info("references collected during removeRun:");
for (String s:danglingDataRefs) {
logger.info(s);
}
} catch (SQLException e) {
logger.error("Problem clearing database: " + e);
}
} else {
System.out.println("clearDB is FALSE: not clearing");
}
// String q = null;
// Connection connection = null;
// Statement stmt = null;
// try {
// connection = getConnection();
// stmt = connection.createStatement();
// } catch (SQLException e) {
// logger.warn("Could not create database statement :" + e);
// } catch (InstantiationException e) {
// logger.warn("Could not create database statement :" + e);
// } catch (IllegalAccessException e) {
// logger.warn("Could not create database statement :" + e);
// } catch (ClassNotFoundException e) {
// logger.warn("Could not create database statement :" + e);
// }
// q = "DELETE FROM Workflow";
// try {
// stmt.executeUpdate(q);
// } catch (SQLException e) {
// logger.warn("Could not execute statement " + q + " :" + e);
// }
// q = "DELETE FROM Processor";
// try {
// stmt.executeUpdate(q);
// } catch (SQLException e) {
// logger.warn("Could not execute statement " + q + " :" + e);
// }
// q = "DELETE FROM Arc";
// try {
// stmt.executeUpdate(q);
// } catch (SQLException e) {
// logger.warn("Could not execute statement " + q + " :" + e);
// }
// q = "DELETE FROM Var";
// try {
// stmt.executeUpdate(q);
// } catch (SQLException e) {
// logger.warn("Could not execute statement " + q + " :" + e);
// }
// q = "DELETE FROM WfInstance";
// try {
// stmt.executeUpdate(q);
// } catch (SQLException e) {
// logger.warn("Could not execute statement " + q + " :" + e);
// }
// q = "DELETE FROM ProcBinding";
// try {
// stmt.executeUpdate(q);
// } catch (SQLException e) {
// logger.warn("Could not execute statement " + q + " :" + e);
// }
// q = "DELETE FROM VarBinding";
// try {
// stmt.executeUpdate(q);
// } catch (SQLException e) {
// logger.warn("Could not execute statement " + q + " :" + e);
// }
// q = "DELETE FROM Collection";
// try {
// stmt.executeUpdate(q);
// } catch (SQLException e) {
// logger.warn("Could not execute statement " + q + " :" + e);
// }
// q = "DELETE FROM Data";
// try {
// stmt.executeUpdate(q);
// } catch (SQLException e) {
// logger.warn("Could not execute statement " + q + " :" + e);
// }
// if (connection!=null) try {
// connection.close();
// } catch (SQLException ex) {
// logger.error("Error closing connection",ex);
// }
}
/**
* The name for this type of provenance connector. Is used by the workbench
* to ensure it adds the correct one to the InvocationContext
*
* @return
*/
public abstract String getName();
/**
* A unique identifier for this run of provenance, should correspond to the
* initial {@link WorkflowProvenanceItem} idenifier that gets sent through
*
* @param identifier
*/
public void setSessionID(String sessionID) {
this.sessionID = sessionID;
}
/**
* What is the unique identifier used by this connector
*
* @return
*/
public String getSessionID() {
return sessionID;
}
public List<LineageQueryResultRecord> computeLineage(String wfInstance,
String var, String proc, String path, Set<String> selectedProcessors) {
return null;
}
public String getDataflowInstance(String dataflowId) {
String instanceID = null;
try {
instanceID = (getProvenance()).getPq().getRuns(dataflowId, null).get(0).getInstanceID();
} catch (SQLException e) {
logger.error("Error finding the dataflow instance", e);
}
return instanceID;
}
/**
* @return the saveEvents
*/
public String getSaveEvents() {
return saveEvents;
}
/**
* @param saveEvents
* the saveEvents to set
*/
public void setSaveEvents(String saveEvents) {
this.saveEvents = saveEvents;
}
public void setProvenance(Provenance provenance) {
this.provenance = provenance;
}
public Provenance getProvenance() {
return provenance;
}
public void setFinished(boolean finished) {
this.finished = finished;
}
public boolean isFinished() {
return finished;
}
public void setExecutor(ExecutorService executor) {
this.executor = executor;
}
public synchronized ExecutorService getExecutor() {
return executor;
}
public void setProvenanceAnalysis(ProvenanceAnalysis provenanceAnalysis) {
this.provenanceAnalysis = provenanceAnalysis;
}
/**
* Use this {@link ProvenanceAnalysis} to carry out lineage queries on the
* provenance
*
* @return
*/
public ProvenanceAnalysis getProvenanceAnalysis() {
return provenanceAnalysis;
}
/**
* @return the writer
*/
public ProvenanceWriter getWriter() {
return writer;
}
/**
* @param writer the writer to set
*/
protected void setWriter(ProvenanceWriter writer) {
this.writer = writer;
}
/**
* @return the query
*/
public ProvenanceQuery getQuery() {
return query;
}
/**
* @param query the query to set
*/
protected void setQuery(ProvenanceQuery query) {
this.query = query;
}
/**
* @return the wfdp
*/
public WorkflowDataProcessor getWfdp() {
return wfdp;
}
/**
* @param wfdp the wfdp to set
*/
public void setWfdp(WorkflowDataProcessor wfdp) {
this.wfdp = wfdp;
}
/**
* @return the eventProcessor
*/
public EventProcessor getEventProcessor() {
return eventProcessor;
}
/**
* @param eventProcessor the eventProcessor to set
*/
public void setEventProcessor(EventProcessor eventProcessor) {
this.eventProcessor = eventProcessor;
}
}