blob: 669bfb23626487789bb0b5d62f2793f7d2a015ce [file] [log] [blame]
/*******************************************************************************
* Copyright (C) 2007 The University of Manchester
*
* Modifications to the initial code base are copyright of their
* respective authors, or their employers as appropriate.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU Lesser General Public License
* as published by the Free Software Foundation; either version 2.1 of
* the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful, but
* WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* Lesser General Public License for more details.
*
* You should have received a copy of the GNU Lesser General Public
* License along with this program; if not, write to the Free Software
* Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
******************************************************************************/
package net.sf.taverna.t2.provenance.lineageservice;
import java.io.IOException;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import net.sf.taverna.t2.provenance.connector.ProvenanceConnector;
import net.sf.taverna.t2.provenance.item.ProvenanceItem;
import net.sf.taverna.t2.provenance.item.WorkflowProvenanceItem;
import net.sf.taverna.t2.provenance.vocabulary.SharedVocabulary;
import net.sf.taverna.t2.workflowmodel.Dataflow;
import org.apache.log4j.Logger;
/**
* Implemented by the database class that a {@link ProvenanceConnector}
* implementation uses for storage purposes
*
* @author Paolo Missier
* @author Ian Dunlop
*
*/
//FIXME is this class really needed. Can't we just push the
//acceptRawProvanceEvent up into the ProvenanceConnector?
public class Provenance {
@SuppressWarnings("unused")
private static Logger logger = Logger.getLogger(Provenance.class);
protected ProvenanceQuery pq;
protected ProvenanceWriter pw;
protected EventProcessor ep;
private String saveEvents;
private volatile boolean firstWorkflowStructure = true;
public boolean isFirstWorkflowStructure() {
return firstWorkflowStructure;
}
public void setFirstWorkflowStructure(boolean firstWorkflowStructure) {
this.firstWorkflowStructure = firstWorkflowStructure;
}
private List<String> workflowIDStack = Collections.synchronizedList(new ArrayList<String>());
private Map<String, String> workflowIDMap = new ConcurrentHashMap<String, String>();
public Provenance() { }
public Provenance(EventProcessor eventProcessor) {
this.ep = eventProcessor;
this.pq = ep.getPq();
this.pw = ep.getPw();
}
public void clearDB() throws SQLException {
getPw().clearDBStatic();
getPw().clearDBDynamic();
}
/**
* @return the saveEvents
*/
public String getSaveEvents() {
return saveEvents;
}
/**
* @param saveEvents
* the saveEvents to set
*/
public void setSaveEvents(String saveEvents) {
this.saveEvents = saveEvents;
}
// FIXME I think the provenance query and writer should both come from the
// EventProcessor
// seems silly setting the ep, pq and pw separately.
public void setPq(ProvenanceQuery pq) {
this.pq = pq;
}
public ProvenanceQuery getPq() {
return pq;
}
public void setPw(ProvenanceWriter pw) {
this.pw = pw;
}
public ProvenanceWriter getPw() {
return pw;
}
public void setEp(EventProcessor ep) {
this.ep = ep;
}
public EventProcessor getEp() {
return ep;
}
/**
* maps each incoming event to an insert query into the provenance store
*
* @param eventType
* @param content
* @throws SQLException
* @throws IOException
*/
public void acceptRawProvenanceEvent(SharedVocabulary eventType,
ProvenanceItem provenanceItem) throws SQLException, IOException {
processEvent(provenanceItem, eventType);
}
/**
* parse d and generate SQL insert calls into the provenance DB
*
* @param d
* DOM for the event
* @param eventType
* see {@link SharedVocabulary}
* @throws SQLException
* @throws IOException
*/
@SuppressWarnings("unchecked")
protected void processEvent(ProvenanceItem provenanceItem,
SharedVocabulary eventType) throws SQLException, IOException {
// only attempt to save the data events, since the workflow itself may not be XMLEncode-able
if (!eventType.equals(SharedVocabulary.WORKFLOW_EVENT_TYPE)) {
// saveEvent for debugging / testing
if ("all".equals(getSaveEvents())) {
getEp().saveEvent(provenanceItem, eventType);
} else if ("iteration".equals(getSaveEvents())) {
if (eventType.equals("iteration"))
getEp().saveEvent(provenanceItem, eventType);
}
}
if (eventType.equals(SharedVocabulary.WORKFLOW_EVENT_TYPE)) {
// process the workflow structure
if (isFirstWorkflowStructure()) {
String dataflowId = ((WorkflowProvenanceItem) provenanceItem).getDataflow().getInternalIdentier();
String instanceId = provenanceItem.getIdentifier();
workflowIDMap.put(instanceId, dataflowId);
// logger.debug("pushed workflowID "+dataflowId);
setFirstWorkflowStructure(false);
// logger.debug("processing event of type "
// + SharedVocabulary.WORKFLOW_EVENT_TYPE);
String processWorkflowStructure = getEp().processWorkflowStructure(provenanceItem);
synchronized(workflowIDStack) {
workflowIDStack.add(0,processWorkflowStructure);
}
// logger.debug("pushed workflowID "+workflowIDStack.get(0));
getEp().propagateANL(provenanceItem.getIdentifier());
} else {
String dataflowId = ((WorkflowProvenanceItem) provenanceItem).getDataflow().getInternalIdentier();
String instanceId = provenanceItem.getIdentifier();
workflowIDMap.put(instanceId, dataflowId);
// logger.debug("pushed workflowID "+dataflowId);
Dataflow df = ((WorkflowProvenanceItem)provenanceItem).getDataflow();
synchronized(workflowIDStack) {
workflowIDStack.add(0,df.getInternalIdentier());
}
}
} else if (provenanceItem.getEventType().equals(SharedVocabulary.END_WORKFLOW_EVENT_TYPE)) {
// String currentWorkflowID = workflowIDStack.get(0);
// workflowIDStack.remove(0);
String currentWorkflowID = workflowIDMap.get(provenanceItem.getParentId());
// logger.debug("popped workflowID "+currentWorkflowID);
getEp().processProcessEvent(provenanceItem, currentWorkflowID);
} else { // all other event types (iteration etc.)
// logger.debug("processEvent of type "+provenanceItem.getEventType()+" for item of type "+provenanceItem.getClass().getName());
String currentWorkflowID = provenanceItem.getWorkflowId();
// String currentWorkflowID = workflowIDMap.get(provenanceItem.getParentId());
// logger.debug("setting currentWorkflowID to "+ currentWorkflowID);
getEp().processProcessEvent(provenanceItem, currentWorkflowID);
// getEp().processProcessEvent(provenanceItem, workflowIDStack.get(0));
}
}
}