blob: d3328542477d976b299287eb38fd9c26d8e06b92 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.oodt.cas.workflow.engine;
//OODT imports
import org.apache.oodt.cas.metadata.Metadata;
import org.apache.oodt.cas.resource.system.XmlRpcResourceManagerClient;
import org.apache.oodt.cas.workflow.structs.Workflow;
import org.apache.oodt.cas.workflow.structs.WorkflowInstance;
import org.apache.oodt.cas.workflow.structs.WorkflowStatus;
import org.apache.oodt.cas.workflow.structs.WorkflowTask;
import org.apache.oodt.cas.workflow.structs.exceptions.EngineException;
import org.apache.oodt.cas.workflow.structs.exceptions.InstanceRepositoryException;
import org.apache.oodt.cas.workflow.engine.IterativeWorkflowProcessorThread;
import org.apache.oodt.cas.workflow.instrepo.WorkflowInstanceRepository;
import org.apache.oodt.commons.util.DateConvert;
//JDK imports
import java.net.URL;
import java.text.ParseException;
import java.util.Date;
import java.util.HashMap;
import java.util.logging.Level;
import java.util.logging.Logger;
//java.util.concurrent imports
import EDU.oswego.cs.dl.util.concurrent.BoundedBuffer;
import EDU.oswego.cs.dl.util.concurrent.Channel;
import EDU.oswego.cs.dl.util.concurrent.LinkedQueue;
import EDU.oswego.cs.dl.util.concurrent.PooledExecutor;
/**
*
* The ThreadPooling portion of the WorkflowEngine. This class is meant to be an
* extension point for WorkflowEngines that want to implement ThreadPooling.
* This WorkflowEngine provides everything needed to manage a ThreadPool using
* Doug Lea's wonderful java.util.concurrent package that made it into JDK5.
*
* @author mattmann
* @version $Revsion$
*
*/
public class ThreadPoolWorkflowEngine implements WorkflowEngine, WorkflowStatus {
/* our thread pool */
private PooledExecutor pool = null;
/* our worker thread hash mapping worker threads to workflow instance ids */
private HashMap workerMap = null;
/* our log stream */
private static final Logger LOG = Logger
.getLogger(ThreadPoolWorkflowEngine.class.getName());
/* our instance repository */
private WorkflowInstanceRepository instRep = null;
/* our resource manager client */
private XmlRpcResourceManagerClient rClient = null;
/* the URL pointer to the parent Workflow Manager */
private URL wmgrUrl = null;
/**
* Default Constructor.
*
* @param instRep
* The WorkflowInstanceRepository to be used by this engine.
* @param queueSize
* The size of the queue that the workflow engine should use
* (irrelevant if unlimitedQueue is set to true)
* @param maxPoolSize
* The minimum thread pool size.
* @param minPoolSize
* The maximum thread pool size.
* @param threadKeepAliveTime
* The amount of minutes that each thread in the pool should be kept
* alive.
* @param unlimitedQueue
* Whether or not to use a queue whose bounds are dictated by the
* physical memory of the underlying hardware.
* @param resUrl
* A URL pointer to a resource manager. If this is set Tasks will be
* wrapped as Resource Manager {@link Job}s and sent through the
* Resource Manager. If this parameter is not set, local execution
* (the default) will be used
*/
public ThreadPoolWorkflowEngine(WorkflowInstanceRepository instRep,
int queueSize, int maxPoolSize, int minPoolSize,
long threadKeepAliveTime, boolean unlimitedQueue, URL resUrl) {
this.instRep = instRep;
Channel c = null;
if (unlimitedQueue) {
c = new LinkedQueue();
} else {
c = new BoundedBuffer(queueSize);
}
pool = new PooledExecutor(c, maxPoolSize);
pool.setMinimumPoolSize(minPoolSize);
pool.setKeepAliveTime(1000 * 60 * threadKeepAliveTime);
workerMap = new HashMap();
if (resUrl != null)
rClient = new XmlRpcResourceManagerClient(resUrl);
}
/*
* (non-Javadoc)
*
* @see
* org.apache.oodt.cas.workflow.engine.WorkflowEngine#pauseWorkflowInstance
* (java.lang.String)
*/
public synchronized void pauseWorkflowInstance(String workflowInstId) {
// okay, try and look up that worker thread in our hash map
IterativeWorkflowProcessorThread worker = (IterativeWorkflowProcessorThread) workerMap
.get(workflowInstId);
if (worker == null) {
LOG.log(Level.WARNING,
"WorkflowEngine: Attempt to pause workflow instance id: "
+ workflowInstId
+ ", however, this engine is not tracking its execution");
return;
}
// otherwise, all good
worker.pause();
}
/*
* (non-Javadoc)
*
* @see
* org.apache.oodt.cas.workflow.engine.WorkflowEngine#resumeWorkflowInstance
* (java.lang.String)
*/
public synchronized void resumeWorkflowInstance(String workflowInstId) {
// okay, try and look up that worker thread in our hash map
IterativeWorkflowProcessorThread worker = (IterativeWorkflowProcessorThread) workerMap
.get(workflowInstId);
if (worker == null) {
LOG.log(Level.WARNING,
"WorkflowEngine: Attempt to resume workflow instance id: "
+ workflowInstId + ", however, this engine is "
+ "not tracking its execution");
return;
}
// also check to make sure that the worker is currently paused
// only can resume WorkflowInstances that are paused, right?
if (!worker.isPaused()) {
LOG.log(Level.WARNING,
"WorkflowEngine: Attempt to resume a workflow that "
+ "isn't paused currently: instance id: " + workflowInstId);
return;
}
// okay, all good
worker.resume();
}
/*
* (non-Javadoc)
*
* @see
* org.apache.oodt.cas.workflow.engine.WorkflowEngine#startWorkflow(org.apache
* .oodt.cas.workflow.structs.Workflow, org.apache.oodt.cas.metadata.Metadata)
*/
public synchronized WorkflowInstance startWorkflow(Workflow workflow,
Metadata metadata) throws EngineException {
// to start the workflow, we create a default workflow instance
// populate it
// persist it
// add it to the worker map
// start it
WorkflowInstance wInst = new WorkflowInstance();
wInst.setWorkflow(workflow);
wInst.setCurrentTaskId(((WorkflowTask) workflow.getTasks().get(0))
.getTaskId());
wInst.setSharedContext(metadata);
wInst.setStatus(CREATED);
persistWorkflowInstance(wInst);
IterativeWorkflowProcessorThread worker = new IterativeWorkflowProcessorThread(
wInst, instRep, this.wmgrUrl);
worker.setRClient(rClient);
workerMap.put(wInst.getId(), worker);
wInst.setStatus(QUEUED);
persistWorkflowInstance(wInst);
try {
pool.execute(worker);
} catch (InterruptedException e) {
throw new EngineException(e);
}
return wInst;
}
/*
* (non-Javadoc)
*
* @see
* org.apache.oodt.cas.workflow.engine.WorkflowEngine#getInstanceRepository()
*/
public WorkflowInstanceRepository getInstanceRepository() {
return this.instRep;
}
/*
* (non-Javadoc)
*
* @see
* org.apache.oodt.cas.workflow.engine.WorkflowEngine#updateMetadata(java.
* lang.String, org.apache.oodt.cas.metadata.Metadata)
*/
public synchronized boolean updateMetadata(String workflowInstId, Metadata met) {
// okay, try and look up that worker thread in our hash map
IterativeWorkflowProcessorThread worker = (IterativeWorkflowProcessorThread) workerMap
.get(workflowInstId);
if (worker == null) {
LOG.log(Level.WARNING,
"WorkflowEngine: Attempt to update metadata context "
+ "for workflow instance id: " + workflowInstId
+ ", however, this engine is " + "not tracking its execution");
return false;
}
worker.getWorkflowInstance().setSharedContext(met);
try {
persistWorkflowInstance(worker.getWorkflowInstance());
} catch (Exception e) {
LOG.log(
Level.WARNING,
"Exception persisting workflow instance: ["
+ worker.getWorkflowInstance().getId() + "]: Message: "
+ e.getMessage());
return false;
}
return true;
}
/*
* (non-Javadoc)
*
* @see
* org.apache.oodt.cas.workflow.engine.WorkflowEngine#setWorkflowManagerUrl
* (java.net.URL)
*/
public void setWorkflowManagerUrl(URL url) {
this.wmgrUrl = url;
}
/*
* (non-Javadoc)
*
* @see
* org.apache.oodt.cas.workflow.engine.WorkflowEngine#stopWorkflow(java.lang
* .String)
*/
public synchronized void stopWorkflow(String workflowInstId) {
// okay, try and look up that worker thread in our hash map
IterativeWorkflowProcessorThread worker = (IterativeWorkflowProcessorThread) workerMap
.get(workflowInstId);
if (worker == null) {
LOG.log(Level.WARNING,
"WorkflowEngine: Attempt to stop workflow instance id: "
+ workflowInstId + ", however, this engine is "
+ "not tracking its execution");
return;
}
worker.stop();
}
/*
* (non-Javadoc)
*
* @see org.apache.oodt.cas.workflow.engine.WorkflowEngine#
* getCurrentTaskWallClockMinutes(java.lang.String)
*/
public double getCurrentTaskWallClockMinutes(String workflowInstId) {
// get the workflow instance that we're talking about
WorkflowInstance inst = safeGetWorkflowInstanceById(workflowInstId);
return getCurrentTaskWallClockMinutes(inst);
}
/*
* (non-Javadoc)
*
* @see
* org.apache.oodt.cas.workflow.engine.WorkflowEngine#getWorkflowInstanceMetadata
* (java.lang.String)
*/
public Metadata getWorkflowInstanceMetadata(String workflowInstId) {
// okay, try and look up that worker thread in our hash map
IterativeWorkflowProcessorThread worker = (IterativeWorkflowProcessorThread) workerMap
.get(workflowInstId);
if (worker == null) {
// try and get the metadata
// from the workflow instance repository (as it was persisted)
try {
WorkflowInstance inst = instRep.getWorkflowInstanceById(workflowInstId);
return inst.getSharedContext();
} catch (InstanceRepositoryException e) {
LOG.log(Level.FINEST, "WorkflowEngine: Attempt to get metadata "
+ "for workflow instance id: " + workflowInstId
+ ", however, this engine is "
+ "not tracking its execution and the id: [" + workflowInstId
+ "] " + "was never persisted to " + "the instance repository");
e.printStackTrace();
return new Metadata();
}
}
return worker.getWorkflowInstance().getSharedContext();
}
/*
* (non-Javadoc)
*
* @see
* org.apache.oodt.cas.workflow.engine.WorkflowEngine#getWallClockMinutes(
* java.lang.String)
*/
public double getWallClockMinutes(String workflowInstId) {
// get the workflow instance that we're talking about
WorkflowInstance inst = safeGetWorkflowInstanceById(workflowInstId);
return getWallClockMinutes(inst);
}
protected static double getWallClockMinutes(WorkflowInstance inst) {
if (inst == null) {
return 0.0;
}
Date currentDateOrStopTime = (inst.getEndDateTimeIsoStr() != null
&& !inst.getEndDateTimeIsoStr().equals("") && !inst
.getEndDateTimeIsoStr().equals("null")) ? safeDateConvert(inst
.getEndDateTimeIsoStr()) : new Date();
Date workflowStartDateTime = null;
if (inst.getStartDateTimeIsoStr() == null
|| (inst.getStartDateTimeIsoStr() != null && (inst
.getStartDateTimeIsoStr().equals("") || inst
.getStartDateTimeIsoStr().equals("null")))) {
return 0.0;
}
try {
workflowStartDateTime = DateConvert.isoParse(inst
.getStartDateTimeIsoStr());
} catch (ParseException e) {
return 0.0;
}
long diffMs = currentDateOrStopTime.getTime()
- workflowStartDateTime.getTime();
double diffSecs = (diffMs * 1.0 / 1000.0);
double diffMins = diffSecs / 60.0;
return diffMins;
}
protected static double getCurrentTaskWallClockMinutes(WorkflowInstance inst) {
if (inst == null) {
return 0.0;
}
Date currentDateOrStopTime = (inst.getCurrentTaskEndDateTimeIsoStr() != null
&& !inst.getCurrentTaskEndDateTimeIsoStr().equals("") && !inst
.getCurrentTaskEndDateTimeIsoStr().equals("null")) ? safeDateConvert(inst
.getCurrentTaskEndDateTimeIsoStr()) : new Date();
Date workflowTaskStartDateTime = null;
if (inst.getCurrentTaskStartDateTimeIsoStr() == null
|| (inst.getCurrentTaskStartDateTimeIsoStr() != null && (inst
.getCurrentTaskStartDateTimeIsoStr().equals("") || inst
.getCurrentTaskStartDateTimeIsoStr().equals("null")))) {
return 0.0;
}
try {
workflowTaskStartDateTime = DateConvert.isoParse(inst
.getCurrentTaskStartDateTimeIsoStr());
} catch (ParseException e) {
return 0.0;
}
// should never be in this state, so return 0
if (workflowTaskStartDateTime.after(currentDateOrStopTime)) {
LOG.log(
Level.WARNING,
"Start date time: ["
+ DateConvert.isoFormat(workflowTaskStartDateTime)
+ " of workflow inst [" + inst.getId() + "] is AFTER "
+ "End date time: ["
+ DateConvert.isoFormat(currentDateOrStopTime)
+ "] of workflow inst.");
return 0.0;
}
long diffMs = currentDateOrStopTime.getTime()
- workflowTaskStartDateTime.getTime();
double diffSecs = (diffMs * 1.0 / 1000.0);
double diffMins = diffSecs / 60.0;
return diffMins;
}
private synchronized void persistWorkflowInstance(WorkflowInstance wInst)
throws EngineException {
try {
if (wInst.getId() == null
|| (wInst.getId() != null && wInst.getId().equals(""))) {
// we have to persist it by adding it
// rather than updating it
instRep.addWorkflowInstance(wInst);
} else {
// persist by update
instRep.updateWorkflowInstance(wInst);
}
} catch (InstanceRepositoryException e) {
e.printStackTrace();
throw new EngineException(e.getMessage());
}
}
private WorkflowInstance safeGetWorkflowInstanceById(String workflowInstId) {
try {
return instRep.getWorkflowInstanceById(workflowInstId);
} catch (Exception e) {
return null;
}
}
private static Date safeDateConvert(String isoTimeStr) {
try {
return DateConvert.isoParse(isoTimeStr);
} catch (Exception ignore) {
ignore.printStackTrace();
return null;
}
}
}