blob: 7c32a5e450f1dd05be11c467f763b2cbc1d17a6f [file] [log] [blame]
/*******************************************************************************
* Copyright (C) 2007 The University of Manchester
*
* Modifications to the initial code base are copyright of their
* respective authors, or their employers as appropriate.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU Lesser General Public License
* as published by the Free Software Foundation; either version 2.1 of
* the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful, but
* WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* Lesser General Public License for more details.
*
* You should have received a copy of the GNU Lesser General Public
* License along with this program; if not, write to the Free Software
* Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
******************************************************************************/
package net.sf.taverna.t2.workflowmodel.processor.dispatch;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.log4j.Logger;
import net.sf.taverna.t2.workflowmodel.processor.dispatch.events.DispatchCompletionEvent;
import net.sf.taverna.t2.workflowmodel.processor.dispatch.events.DispatchErrorEvent;
import net.sf.taverna.t2.workflowmodel.processor.dispatch.events.DispatchJobEvent;
import net.sf.taverna.t2.workflowmodel.processor.dispatch.events.DispatchResultEvent;
/**
* Superclass of error handling dispatch layers (for example retry and
* failover). Provides generic functionality required by this class of layers.
*
* @author Tom Oinn
* @author Stian Soiland-Reyes
*
*/
public abstract class AbstractErrorHandlerLayer<ConfigurationType> extends
AbstractDispatchLayer<ConfigurationType> {
private static Logger logger = Logger
.getLogger(AbstractErrorHandlerLayer.class);
/**
* Compare two arrays of ints, return true if they are the same length and
* if at every index the two integer values are equal
*
* @param a
* @param b
* @return
*/
private static boolean identicalIndex(int[] a, int[] b) {
if (a.length != b.length) {
return false;
}
for (int i = 0; i < a.length; i++) {
if (a[i] != b[i]) {
return false;
}
}
return true;
}
/**
* Map of process name -> list of state models. Note that all access to this
* map must be synchronized on the stateMap, and access to the lists inside
* it must be synchronized on the list.
*
* @see #addJobToStateList(DispatchJobEvent)
* @see #removeJob(String, JobState)
* @see #getJobsDefault(String)
* @see #getJobsCopy(String)
*/
private Map<String, List<JobState>> stateMap = new HashMap<String, List<JobState>>();
protected AbstractErrorHandlerLayer() {
super();
}
/**
* Clear cached state for the specified process when notified by the
* dispatch stack
*/
@Override
public void finishedWith(String owningProcess) {
synchronized (stateMap) {
stateMap.remove(owningProcess);
}
}
/**
* If an error occurs we can either handle the error or send it to the layer
* above for further processing.
*/
@Override
public void receiveError(DispatchErrorEvent errorEvent) {
String owningProcess = errorEvent.getOwningProcess();
List<JobState> activeJobs = getJobsCopy(owningProcess);
for (JobState rs : activeJobs) {
if (identicalIndex(rs.jobEvent.getIndex(), errorEvent.getIndex())) {
boolean handled = rs.handleError();
if (!handled) {
removeJob(owningProcess, rs);
getAbove().receiveError(errorEvent);
return;
}
}
}
}
/**
* Receive a job from the layer above, store it for later retries and pass
* it down to the next layer
*/
@Override
public void receiveJob(DispatchJobEvent jobEvent) {
addJobToStateList(jobEvent);
getBelow().receiveJob(jobEvent);
}
/**
* If we see a result with an index matching one of those in the current
* retry state we can safely forget that state object
*/
@Override
public void receiveResult(DispatchResultEvent j) {
forget(j.getOwningProcess(), j.getIndex());
getAbove().receiveResult(j);
}
/**
* If we see a completion event with an index matching one of those in the
* current retry state we can safely forget that state object
*/
@Override
public void receiveResultCompletion(DispatchCompletionEvent c) {
forget(c.getOwningProcess(), c.getIndex());
getAbove().receiveResultCompletion(c);
}
/**
* Remove the specified pending retry job from the cache
*
* @param owningProcess
* Owning process identifier as returned by
* {@link DispatchJobEvent#getOwningProcess()}
* @param index
* Index of the job as returned by
* {@link DispatchJobEvent#getIndex()}
*/
protected void forget(String owningProcess, int[] index) {
for (JobState jobState : getJobsCopy(owningProcess)) {
if (identicalIndex(jobState.jobEvent.getIndex(), index)) {
removeJob(owningProcess, jobState);
return;
}
}
logger.error("Could not forget " + Arrays.asList(index));
}
protected void addJobToStateList(DispatchJobEvent jobEvent) {
List<JobState> stateList = null;
stateList = getJobsDefault(jobEvent.getOwningProcess());
synchronized (stateList) {
stateList.add(getStateObject(jobEvent));
}
}
/**
* Get a copy of the list of {@link JobState}s for the owning process, or an
* empty list if the owning process is unknown or have been
* {@link #forget(String, int[]) forgotten}.
* <p>
* This list can safely be iterated over without synchronizing. If you need
* to modify the list, either synchronize over the returned list from
* {@link #getJobsDefault(String)} or use
* {@link #removeJob(String, JobState)}.
*
* @param owningProcess
* Owning process identifier as returned by
* {@link DispatchJobEvent#getOwningProcess()}
* @return A copy of the list of known JobState {@link JobState}s for the
* owning process,
*/
protected List<JobState> getJobsCopy(String owningProcess) {
List<JobState> activeJobs;
synchronized (stateMap) {
activeJobs = stateMap.get(owningProcess);
}
if (activeJobs == null) {
logger.error("Could not find any active jobs for " + owningProcess);
return Collections.emptyList();
}
// Take a copy of the list so we don't modify it while iterating over it
List<JobState> activeJobsCopy;
synchronized (activeJobs) {
activeJobsCopy = new ArrayList<JobState>(activeJobs);
}
return activeJobsCopy;
}
/**
* Get the list of {@link JobState}s for the owning process, creating and
* adding it to the state map if necessary.
* <p>
* Note that all access to the returned list must be synchronized on the
* list to avoid threading issues.
* <p>
* If you are going to iterate over the list, use
* {@link #getJobsCopy(String)} instead.
*
* @see #getJobsCopy(String)
* @param owningProcess
* Owning process identifier as returned by
* {@link DispatchJobEvent#getOwningProcess()}
*
* @return List of {@link JobState}s for the owning process
*/
protected List<JobState> getJobsDefault(String owningProcess) {
List<JobState> stateList;
synchronized (stateMap) {
stateList = stateMap.get(owningProcess);
if (stateList == null) {
stateList = new ArrayList<JobState>();
stateMap.put(owningProcess, stateList);
}
}
return stateList;
}
/**
* Generate an appropriate state object from the specified job event. The
* state object is a concrete subclass of JobState.
*
* @return
*/
protected abstract JobState getStateObject(DispatchJobEvent jobEvent);
protected void removeJob(String owningProcess, JobState jobState) {
List<JobState> activeJobs;
synchronized (stateMap) {
activeJobs = stateMap.get(owningProcess);
}
if (activeJobs == null) {
logger.error("Could not find active jobs for " + owningProcess);
return;
}
synchronized (activeJobs) {
activeJobs.remove(jobState);
}
}
/**
* Abstract superclass of all state models for pending failure handlers.
* This object is responsible for handling failure messages if they occur
* and represents the current state of the failure handling algorithm on a
* per job basis.
*
* @author Tom Oinn
*
*/
protected abstract class JobState {
protected DispatchJobEvent jobEvent;
protected JobState(DispatchJobEvent jobEvent) {
this.jobEvent = jobEvent;
}
/**
* Called when the layer below pushes an error up and where the error
* index and owning process matches that of this state object. The
* implementation must deal with the error, either by handling it and
* pushing a new job down the stack or by rejecting it. If this method
* returns false the error has not been dealt with and MUST be pushed up
* the stack by the active dispatch layer. In this case the layer will
* be a subclass of AbstractErrorHandlerLayer and the logic to do this
* is already included in the receive methods for results, errors and
* completion events.
*
* @return true if the error was handled.
*/
public abstract boolean handleError();
}
}