blob: ae672ccf9eef1f1a078583df76d760b7ce153f84 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.oodt.cas.workflow.instrepo;
//OODT imports
import org.apache.oodt.cas.metadata.Metadata;
import org.apache.oodt.cas.workflow.instrepo.WorkflowInstanceRepository;
import org.apache.oodt.cas.workflow.structs.WorkflowInstance;
import org.apache.oodt.cas.workflow.structs.WorkflowTask;
import org.apache.oodt.cas.workflow.structs.exceptions.InstanceRepositoryException;
import org.apache.oodt.cas.workflow.util.DbStructFactory;
//JDK imports
import java.net.URLDecoder;
import java.net.URLEncoder;
import java.sql.Connection;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.Iterator;
import java.util.List;
import java.util.Vector;
import java.util.logging.Level;
import java.util.logging.Logger;
import javax.sql.DataSource;
/**
* @author mattmann
* @version $Revision$
*
* <p>
* A {@link WorkflowInstanceRepository} that persists {@link WorkflowInstance}s
* to a JDBC-accessible DBMS.
* </p>.
*/
public class DataSourceWorkflowInstanceRepository extends
AbstractPaginatibleInstanceRepository {
/* our data source */
private DataSource dataSource = null;
/* our log stream */
private static final Logger LOG = Logger
.getLogger(DataSourceWorkflowInstanceRepository.class.getName());
/* should we quote fields or not */
private boolean quoteFields = false;
public DataSourceWorkflowInstanceRepository(DataSource ds,
boolean quoteFields, int pageSize) {
this.dataSource = ds;
this.quoteFields = quoteFields;
this.pageSize = pageSize;
}
/*
* (non-Javadoc)
*
* @see org.apache.oodt.cas.workflow.engine.WorkflowInstanceRepository#addWorkflowInstance(org.apache.oodt.cas.workflow.structs.WorkflowInstance)
*/
public synchronized void addWorkflowInstance(WorkflowInstance wInst)
throws InstanceRepositoryException {
Connection conn = null;
Statement statement = null;
ResultSet rs = null;
try {
conn = dataSource.getConnection();
conn.setAutoCommit(false);
statement = conn.createStatement();
String startWorkflowSql = null;
String taskIdField = null;
String workflowIdField = null;
if (quoteFields) {
taskIdField = "'"
+ ((WorkflowTask) wInst.getWorkflow().getTasks().get(0))
.getTaskId() + "'";
workflowIdField = "'" + wInst.getWorkflow().getId() + "'";
} else {
taskIdField = ((WorkflowTask) wInst.getWorkflow().getTasks()
.get(0)).getTaskId();
workflowIdField = wInst.getWorkflow().getId();
}
startWorkflowSql = "INSERT INTO workflow_instances "
+ "(workflow_instance_status, workflow_id, current_task_id,"
+ "start_date_time, end_date_time, current_task_start_date_time,"
+ "current_task_end_date_time, priority, times_blocked) " + "VALUES ('"
+ wInst.getStatus() + "', " + workflowIdField + ","
+ taskIdField + ", '" + wInst.getStartDateTimeIsoStr()
+ "','" + wInst.getEndDateTimeIsoStr() + "','"
+ wInst.getCurrentTaskStartDateTimeIsoStr() + "','"
+ wInst.getCurrentTaskEndDateTimeIsoStr() + "', "+wInst.getPriority().getValue()+", "
+ wInst.getTimesBlocked() + ")";
LOG.log(Level.FINE, "sql: Executing: " + startWorkflowSql);
statement.execute(startWorkflowSql);
String workflowInstId = new String();
synchronized (workflowInstId) {
String getWorkflowInstIdSql = "SELECT MAX(workflow_instance_id) "
+ "AS max_id FROM workflow_instances";
rs = statement.executeQuery(getWorkflowInstIdSql);
while (rs.next()) {
workflowInstId = String.valueOf(rs.getInt("max_id"));
}
}
conn.commit();
wInst.setId(workflowInstId);
// now add its metadata
addWorkflowInstanceMetadata(wInst);
} catch (Exception e) {
e.printStackTrace();
LOG.log(Level.WARNING, "Exception starting workflow. Message: "
+ e.getMessage());
try {
conn.rollback();
} catch (SQLException e2) {
LOG.log(Level.SEVERE,
"Unable to rollback startWorkflow transaction. Message: "
+ e2.getMessage());
}
throw new InstanceRepositoryException(e.getMessage());
} finally {
if (rs != null) {
try {
rs.close();
} catch (SQLException ignore) {
}
rs = null;
}
if (statement != null) {
try {
statement.close();
} catch (SQLException ignore) {
}
statement = null;
}
if (conn != null) {
try {
conn.close();
} catch (SQLException ignore) {
}
conn = null;
}
}
}
/*
* (non-Javadoc)
*
* @see org.apache.oodt.cas.workflow.engine.WorkflowInstanceRepository#updateWorkflowInstance(org.apache.oodt.cas.workflow.structs.WorkflowInstance)
*/
public synchronized void updateWorkflowInstance(WorkflowInstance wInst)
throws InstanceRepositoryException {
Connection conn = null;
Statement statement = null;
String taskIdField = null, workflowIdField = null;
try {
conn = dataSource.getConnection();
conn.setAutoCommit(false);
statement = conn.createStatement();
if (quoteFields) {
taskIdField = "'" + wInst.getCurrentTaskId() + "'";
workflowIdField = "'" + wInst.getWorkflow().getId() + "'";
} else {
taskIdField = wInst.getCurrentTaskId();
workflowIdField = wInst.getWorkflow().getId();
}
String updateStatusSql = "UPDATE workflow_instances SET "
+ "workflow_instance_status='" + wInst.getStatus()
+ "', current_task_id=" + taskIdField + ", workflow_id = "
+ workflowIdField + ",start_date_time='"
+ wInst.getStartDateTimeIsoStr() + "'," + "end_date_time='"
+ wInst.getEndDateTimeIsoStr()
+ "',current_task_start_date_time='"
+ wInst.getCurrentTaskStartDateTimeIsoStr()
+ "',current_task_end_date_time='"
+ wInst.getCurrentTaskEndDateTimeIsoStr()
+ "',priority="
+ wInst.getPriority().getValue()
+ ",times_blocked="
+ wInst.getTimesBlocked()
+" WHERE workflow_instance_id = " + wInst.getId();
LOG.log(Level.FINE, "updateStatusSql: Executing: "
+ updateStatusSql);
statement.execute(updateStatusSql);
conn.commit();
// now update its metadata
removeWorkflowInstanceMetadata(wInst.getId());
addWorkflowInstanceMetadata(wInst);
} catch (Exception e) {
e.printStackTrace();
LOG.log(Level.WARNING,
"Exception updating workflow instance. Message: "
+ e.getMessage());
try {
conn.rollback();
} catch (SQLException e2) {
LOG.log(Level.SEVERE,
"Unable to rollback updateWorkflowInstanceStatus "
+ "transaction. Message: " + e2.getMessage());
}
throw new InstanceRepositoryException(e.getMessage());
} finally {
if (statement != null) {
try {
statement.close();
} catch (SQLException ignore) {
}
statement = null;
}
if (conn != null) {
try {
conn.close();
} catch (SQLException ignore) {
}
conn = null;
}
}
}
/*
* (non-Javadoc)
*
* @see org.apache.oodt.cas.workflow.engine.WorkflowInstanceRepository#removeWorkflowInstance(org.apache.oodt.cas.workflow.structs.WorkflowInstance)
*/
public synchronized void removeWorkflowInstance(WorkflowInstance wInst)
throws InstanceRepositoryException {
Connection conn = null;
Statement statement = null;
try {
conn = dataSource.getConnection();
conn.setAutoCommit(false);
statement = conn.createStatement();
String deleteSql = "DELETE FROM workflow_instances "
+ "WHERE workflow_instance_id = " + wInst.getId();
LOG.log(Level.FINE, "sql: Executing: " + deleteSql);
statement.execute(deleteSql);
conn.commit();
// now remove its metadata
removeWorkflowInstanceMetadata(wInst.getId());
} catch (Exception e) {
e.printStackTrace();
LOG.log(Level.WARNING,
"Exception removing workflow instance. Message: "
+ e.getMessage());
try {
conn.rollback();
} catch (SQLException e2) {
LOG.log(Level.SEVERE,
"Unable to rollback removeWorkflowInstance "
+ "transaction. Message: " + e2.getMessage());
}
throw new InstanceRepositoryException(e.getMessage());
} finally {
if (statement != null) {
try {
statement.close();
} catch (SQLException ignore) {
}
statement = null;
}
if (conn != null) {
try {
conn.close();
} catch (SQLException ignore) {
}
conn = null;
}
}
}
/*
* (non-Javadoc)
*
* @see org.apache.oodt.cas.workflow.engine.WorkflowInstanceRepository#getWorkflowInstanceById(java.lang.String)
*/
public WorkflowInstance getWorkflowInstanceById(String workflowInstId)
throws InstanceRepositoryException {
Connection conn = null;
Statement statement = null;
ResultSet rs = null;
WorkflowInstance workflowInst = null;
try {
conn = dataSource.getConnection();
statement = conn.createStatement();
String getWorkflowSql = "SELECT * from workflow_instances "
+ "WHERE workflow_instance_id = " + workflowInstId;
LOG.log(Level.FINE, "getWorkflowInstanceById: Executing: "
+ getWorkflowSql);
rs = statement.executeQuery(getWorkflowSql);
while (rs.next()) {
workflowInst = DbStructFactory.getWorkflowInstance(rs);
// add its metadata
workflowInst
.setSharedContext(getWorkflowInstanceMetadata(workflowInst
.getId()));
}
} catch (Exception e) {
e.printStackTrace();
LOG.log(Level.WARNING,
"Exception getting workflow instance. Message: "
+ e.getMessage());
try {
conn.rollback();
} catch (SQLException e2) {
LOG.log(Level.SEVERE,
"Unable to rollback getWorkflowInstanceById "
+ "transaction. Message: " + e2.getMessage());
}
throw new InstanceRepositoryException(e.getMessage());
} finally {
if (rs != null) {
try {
rs.close();
} catch (SQLException ignore) {
}
rs = null;
}
if (statement != null) {
try {
statement.close();
} catch (SQLException ignore) {
}
statement = null;
}
if (conn != null) {
try {
conn.close();
} catch (SQLException ignore) {
}
conn = null;
}
}
return workflowInst;
}
/*
* (non-Javadoc)
*
* @see org.apache.oodt.cas.workflow.engine.WorkflowInstanceRepository#getWorkflowInstances()
*/
public List getWorkflowInstances() throws InstanceRepositoryException {
Connection conn = null;
Statement statement = null;
ResultSet rs = null;
List workflowInsts = null;
try {
conn = dataSource.getConnection();
statement = conn.createStatement();
String getWorkflowSql = "SELECT * from workflow_instances "
+ "ORDER BY workflow_instance_id DESC";
LOG.log(Level.FINE, "getWorkflowInstances: Executing: "
+ getWorkflowSql);
rs = statement.executeQuery(getWorkflowSql);
workflowInsts = new Vector();
while (rs.next()) {
WorkflowInstance workflowInst = DbStructFactory
.getWorkflowInstance(rs);
// add its metadata
workflowInst
.setSharedContext(getWorkflowInstanceMetadata(workflowInst
.getId()));
workflowInsts.add(workflowInst);
}
} catch (Exception e) {
e.printStackTrace();
LOG.log(Level.WARNING,
"Exception getting workflow instance. Message: "
+ e.getMessage());
try {
conn.rollback();
} catch (SQLException e2) {
LOG.log(Level.SEVERE,
"Unable to rollback getWorkflowInstances "
+ "transaction. Message: " + e2.getMessage());
}
throw new InstanceRepositoryException(e.getMessage());
} finally {
if (rs != null) {
try {
rs.close();
} catch (SQLException ignore) {
}
rs = null;
}
if (statement != null) {
try {
statement.close();
} catch (SQLException ignore) {
}
statement = null;
}
if (conn != null) {
try {
conn.close();
} catch (SQLException ignore) {
}
conn = null;
}
}
return workflowInsts;
}
/*
* (non-Javadoc)
*
* @see org.apache.oodt.cas.workflow.engine.WorkflowInstanceRepository#getWorkflowInstancesByStatus(java.lang.String)
*/
public List getWorkflowInstancesByStatus(String status)
throws InstanceRepositoryException {
Connection conn = null;
Statement statement = null;
ResultSet rs = null;
List workflowInsts = null;
try {
conn = dataSource.getConnection();
statement = conn.createStatement();
String getWorkflowSql = "SELECT * from workflow_instances "
+ "WHERE workflow_instance_status = '" + status
+ "' ORDER BY workflow_instance_id DESC";
LOG.log(Level.FINE, "getWorkflowInstancesByStatus: Executing: "
+ getWorkflowSql);
rs = statement.executeQuery(getWorkflowSql);
workflowInsts = new Vector();
while (rs.next()) {
WorkflowInstance workflowInst = DbStructFactory
.getWorkflowInstance(rs);
// add its metadata
workflowInst
.setSharedContext(getWorkflowInstanceMetadata(workflowInst
.getId()));
workflowInsts.add(workflowInst);
}
} catch (Exception e) {
e.printStackTrace();
LOG.log(Level.WARNING,
"Exception getting workflow instance. Message: "
+ e.getMessage());
try {
conn.rollback();
} catch (SQLException e2) {
LOG.log(Level.SEVERE,
"Unable to rollback getWorkflowInstancesByStatus "
+ "transaction. Message: " + e2.getMessage());
}
throw new InstanceRepositoryException(e.getMessage());
} finally {
if (rs != null) {
try {
rs.close();
} catch (SQLException ignore) {
}
rs = null;
}
if (statement != null) {
try {
statement.close();
} catch (SQLException ignore) {
}
statement = null;
}
if (conn != null) {
try {
conn.close();
} catch (SQLException ignore) {
}
conn = null;
}
}
return workflowInsts;
}
/*
* (non-Javadoc)
*
* @see org.apache.oodt.cas.workflow.instrepo.WorkflowInstanceRepository#getNumWorkflowInstances()
*/
public int getNumWorkflowInstances() throws InstanceRepositoryException {
Connection conn = null;
Statement statement = null;
ResultSet rs = null;
int numInsts = -1;
try {
conn = dataSource.getConnection();
statement = conn.createStatement();
String getWorkflowSql = "SELECT COUNT(workflow_instance_id) AS num_insts from workflow_instances";
LOG.log(Level.FINE, "getNumWorkflowInstances: Executing: "
+ getWorkflowSql);
rs = statement.executeQuery(getWorkflowSql);
while (rs.next()) {
numInsts = rs.getInt("num_insts");
}
} catch (Exception e) {
e.printStackTrace();
LOG.log(Level.WARNING,
"Exception getting num workflow instances. Message: "
+ e.getMessage());
try {
conn.rollback();
} catch (SQLException e2) {
LOG.log(Level.SEVERE,
"Unable to rollback getNumWorkflowInstances "
+ "transaction. Message: " + e2.getMessage());
}
throw new InstanceRepositoryException(e.getMessage());
} finally {
if (rs != null) {
try {
rs.close();
} catch (SQLException ignore) {
}
rs = null;
}
if (statement != null) {
try {
statement.close();
} catch (SQLException ignore) {
}
statement = null;
}
if (conn != null) {
try {
conn.close();
} catch (SQLException ignore) {
}
conn = null;
}
}
return numInsts;
}
/*
* (non-Javadoc)
*
* @see org.apache.oodt.cas.workflow.instrepo.WorkflowInstanceRepository#getNumWorkflowInstancesByStatus(java.lang.String)
*/
public int getNumWorkflowInstancesByStatus(String status)
throws InstanceRepositoryException {
Connection conn = null;
Statement statement = null;
ResultSet rs = null;
int numInsts = -1;
try {
conn = dataSource.getConnection();
statement = conn.createStatement();
String getWorkflowSql = "SELECT COUNT(workflow_instance_id) AS num_insts from workflow_instances "
+ "WHERE workflow_instance_status = '" + status + "'";
LOG.log(Level.FINE, "getNumWorkflowInstancesByStatus: Executing: "
+ getWorkflowSql);
rs = statement.executeQuery(getWorkflowSql);
while (rs.next()) {
numInsts = rs.getInt("num_insts");
}
} catch (Exception e) {
e.printStackTrace();
LOG.log(Level.WARNING,
"Exception getting num workflow instances by status. Message: "
+ e.getMessage());
try {
conn.rollback();
} catch (SQLException e2) {
LOG.log(Level.SEVERE,
"Unable to rollback getNumWorkflowInstancesByStatus "
+ "transaction. Message: " + e2.getMessage());
}
throw new InstanceRepositoryException(e.getMessage());
} finally {
if (rs != null) {
try {
rs.close();
} catch (SQLException ignore) {
}
rs = null;
}
if (statement != null) {
try {
statement.close();
} catch (SQLException ignore) {
}
statement = null;
}
if (conn != null) {
try {
conn.close();
} catch (SQLException ignore) {
}
conn = null;
}
}
return numInsts;
}
protected List paginateWorkflows(int pageNum, String status)
throws InstanceRepositoryException {
Connection conn = null;
Statement statement = null;
ResultSet rs = null;
List wInstIds = null;
int numResults = -1;
if (status == null || (status != null && status.equals(""))) {
numResults = getNumWorkflowInstances();
} else {
numResults = getNumWorkflowInstancesByStatus(status);
}
try {
conn = dataSource.getConnection();
statement = conn.createStatement(ResultSet.TYPE_SCROLL_INSENSITIVE,
ResultSet.CONCUR_READ_ONLY);
String getWorkflowSql = "SELECT workflow_instance_id FROM workflow_instances ";
if (status != null && !status.equals("")) {
getWorkflowSql += "WHERE workflow_instance_status = '" + status
+ "'";
}
getWorkflowSql += "ORDER BY workflow_instance_id DESC ";
LOG.log(Level.FINE, "workflow instance paged query: executing: "
+ getWorkflowSql);
rs = statement.executeQuery(getWorkflowSql);
wInstIds = new Vector();
int startNum = (pageNum - 1) * pageSize;
if (startNum > numResults) {
startNum = 0;
}
// must call next first, or else no relative cursor
if (rs.next()) {
// grab the first one
int numGrabbed = -1;
if(pageNum == 1){
numGrabbed = 1;
wInstIds.add(rs.getString("workflow_instance_id"));
}
else{
numGrabbed = 0;
}
if(pageNum != 1){
// now move the cursor to the correct position
rs.relative(startNum);
}
// grab the rest
while (rs.next() && numGrabbed < pageSize) {
String wInstId = rs.getString("workflow_instance_id");
wInstIds.add(wInstId);
numGrabbed++;
}
}
if (wInstIds.size() == 0) {
wInstIds = null;
}
} catch (Exception e) {
e.printStackTrace();
LOG.log(Level.WARNING, "Exception performing query. Message: "
+ e.getMessage());
try {
conn.rollback();
} catch (SQLException e2) {
LOG.log(Level.SEVERE,
"Unable to rollback query transaction. Message: "
+ e2.getMessage());
}
throw new InstanceRepositoryException(e.getMessage());
} finally {
if (rs != null) {
try {
rs.close();
} catch (SQLException ignore) {
}
rs = null;
}
if (statement != null) {
try {
statement.close();
} catch (SQLException ignore) {
}
statement = null;
}
if (conn != null) {
try {
conn.close();
} catch (SQLException ignore) {
}
conn = null;
}
}
return wInstIds;
}
private Metadata getWorkflowInstanceMetadata(String workflowInstId)
throws InstanceRepositoryException {
Connection conn = null;
Statement statement = null;
ResultSet rs = null;
Metadata met = new Metadata();
try {
conn = dataSource.getConnection();
statement = conn.createStatement();
String getWorkflowSql = "SELECT * from workflow_instance_metadata "
+ "WHERE workflow_instance_id = " + workflowInstId;
LOG.log(Level.FINE, "Executing: " + getWorkflowSql);
rs = statement.executeQuery(getWorkflowSql);
while (rs.next()) {
met.addMetadata(rs.getString("workflow_met_key"), URLDecoder.decode(rs
.getString("workflow_met_val"), "UTF-8"));
}
} catch (Exception e) {
e.printStackTrace();
LOG.log(Level.WARNING,
"Exception getting workflow instance metadata. Message: "
+ e.getMessage());
try {
conn.rollback();
} catch (SQLException e2) {
LOG.log(Level.SEVERE,
"Unable to rollback getWorkflowInstancesMetadata "
+ "transaction. Message: " + e2.getMessage());
}
throw new InstanceRepositoryException(e.getMessage());
} finally {
if (rs != null) {
try {
rs.close();
} catch (SQLException ignore) {
}
rs = null;
}
if (statement != null) {
try {
statement.close();
} catch (SQLException ignore) {
}
statement = null;
}
if (conn != null) {
try {
conn.close();
} catch (SQLException ignore) {
}
conn = null;
}
}
return met;
}
private synchronized void addWorkflowInstanceMetadata(WorkflowInstance inst)
throws InstanceRepositoryException {
if (inst.getSharedContext() != null
&& inst.getSharedContext().getHashtable().keySet().size() > 0) {
for (Iterator i = inst.getSharedContext().getHashtable().keySet()
.iterator(); i.hasNext();) {
String key = (String) i.next();
List vals = inst.getSharedContext().getAllMetadata(key);
if (vals != null && vals.size() > 0) {
for (Iterator j = vals.iterator(); j.hasNext();) {
String val = (String) j.next();
if (val != null && !val.equals("")) {
addMetadataValue(inst.getId(), key, val);
}
}
}
}
}
}
private synchronized void addMetadataValue(String wInstId, String key,
String val) throws InstanceRepositoryException {
Connection conn = null;
Statement statement = null;
try {
conn = dataSource.getConnection();
conn.setAutoCommit(false);
statement = conn.createStatement();
String addMetSql = "INSERT INTO workflow_instance_metadata"
+ " (workflow_instance_id,workflow_met_key,workflow_met_val) VALUES ("
+ wInstId + ",'" + key + "','" + URLEncoder.encode(val, "UTF-8") + "')";
LOG.log(Level.FINE, "sql: Executing: " + addMetSql);
statement.execute(addMetSql);
conn.commit();
} catch (Exception e) {
e.printStackTrace();
LOG.log(Level.WARNING, "Exception adding metadata [" + key + "=>"
+ val + "] to workflow inst: [" + wInstId + "]. Message: "
+ e.getMessage());
try {
conn.rollback();
} catch (SQLException e2) {
LOG.log(Level.SEVERE,
"Unable to rollback addMetadataValue transaction. Message: "
+ e2.getMessage());
}
throw new InstanceRepositoryException(e.getMessage());
} finally {
if (statement != null) {
try {
statement.close();
} catch (SQLException ignore) {
}
statement = null;
}
if (conn != null) {
try {
conn.close();
} catch (SQLException ignore) {
}
conn = null;
}
}
}
private synchronized void removeWorkflowInstanceMetadata(
String workflowInstId) throws InstanceRepositoryException {
Connection conn = null;
Statement statement = null;
try {
conn = dataSource.getConnection();
conn.setAutoCommit(false);
statement = conn.createStatement();
String deleteSql = "DELETE FROM workflow_instance_metadata "
+ "WHERE workflow_instance_id = " + workflowInstId;
LOG.log(Level.FINE, "sql: Executing: " + deleteSql);
statement.execute(deleteSql);
conn.commit();
} catch (Exception e) {
e.printStackTrace();
LOG.log(Level.WARNING,
"Exception removing workflow instance metadata. Message: "
+ e.getMessage());
try {
conn.rollback();
} catch (SQLException e2) {
LOG.log(Level.SEVERE,
"Unable to rollback removeWorkflowInstanceMetadata "
+ "transaction. Message: " + e2.getMessage());
}
throw new InstanceRepositoryException(e.getMessage());
} finally {
if (statement != null) {
try {
statement.close();
} catch (SQLException ignore) {
}
statement = null;
}
if (conn != null) {
try {
conn.close();
} catch (SQLException ignore) {
}
conn = null;
}
}
}
}