blob: 8c859987fc1ca51263650eb562b11987750762e3 [file] [log] [blame]
/*******************************************************************************
* Copyright (C) 2007 The University of Manchester
*
* Modifications to the initial code base are copyright of their
* respective authors, or their employers as appropriate.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU Lesser General Public License
* as published by the Free Software Foundation; either version 2.1 of
* the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful, but
* WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* Lesser General Public License for more details.
*
* You should have received a copy of the GNU Lesser General Public
* License along with this program; if not, write to the Free Software
* Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
******************************************************************************/
package net.sf.taverna.t2.workflowmodel.impl;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import net.sf.taverna.t2.invocation.Completion;
import net.sf.taverna.t2.invocation.InvocationContext;
import net.sf.taverna.t2.invocation.IterationInternalEvent;
import net.sf.taverna.t2.invocation.TreeCache;
import net.sf.taverna.t2.reference.T2Reference;
import net.sf.taverna.t2.workflowmodel.processor.activity.Job;
/**
* Receives Job and Completion events and emits Jobs unaltered. Completion
* events additionally cause registration of lists for each key in the datamap
* of the jobs at immediate child locations in the index structure. These list
* identifiers are sent in place of the Completion events.
* <p>
* State for a given process ID is purged when a final completion event is
* received so there is no need for an explicit cache purge operation in the
* public API (although for termination of partially complete workflows it may
* be sensible for subclasses to provide one)
* <p>
*
* @author Tom Oinn
*/
public abstract class AbstractCrystalizer implements Crystalizer {
private Map<String, CompletionAwareTreeCache> cacheMap = new HashMap<String, CompletionAwareTreeCache>();
public abstract Job getEmptyJob(String owningProcess, int[] index,
InvocationContext context);
/**
* Receive a Job or Completion, Jobs are emitted unaltered and cached,
* Completion events trigger registration of a corresponding list - this may
* be recursive in nature if the completion event's index implies nested
* lists which have not been registered.
* <p>
* If the baseListDepth property is defined then completion events on nodes
* which don't already exist create empty jobs instead and emit those, if
* undefined the completion event is emited intact.
*
* @param e
*/
@SuppressWarnings("unchecked")
// suppressed to avoid jdk1.5 compilation errors caused by the declaration
// IterationInternalEvent<? extends IterationInternalEvent<?>> e
public void receiveEvent(IterationInternalEvent e) {
String owningProcess = e.getOwningProcess();
CompletionAwareTreeCache cache = null;
synchronized (cacheMap) {
if (!cacheMap.containsKey(owningProcess)) {
cache = new CompletionAwareTreeCache(owningProcess, e
.getContext());
cacheMap.put(owningProcess, cache);
} else {
cache = cacheMap.get(owningProcess);
}
}
synchronized (cache) {
if (e instanceof Job) {
// Pass through Job after storing it in the cache
Job j = (Job) e;
cache.insertJob(j);
jobCreated(j);
if (j.getIndex().length == 0) {
cacheMap.remove(j.getOwningProcess());
}
return;
} else if (e instanceof Completion) {
Completion c = (Completion) e;
int[] completionIndex = c.getIndex();
cache.resolveAt(owningProcess, completionIndex);
if (c.getIndex().length == 0) {
cacheMap.remove(c.getOwningProcess());
}
}
}
}
protected class CompletionAwareTreeCache extends TreeCache {
private String owningProcess;
private InvocationContext context;
public CompletionAwareTreeCache(String owningProcess,
InvocationContext context) {
super();
this.context = context;
this.owningProcess = owningProcess;
}
public void resolveAt(String owningProcess, int[] completionIndex) {
NamedNode n = nodeAt(completionIndex);
if (n != null) {
assignNamesTo(n, completionIndex);
} else {
// We know what the list depth should be, so we can
// construct appropriate depth empty lists to fill in the
// gaps.
Job j = getEmptyJob(owningProcess, completionIndex, context);
insertJob(j);
jobCreated(j);
}
}
private void assignNamesTo(NamedNode n, int[] index) {
// Only act if contents of this node undefined
// StringBuffer iString = new StringBuffer();
// for (int foo : index) {
// iString.append(foo+" ");
// }
if (n.contents == null) {
Map<String, List<T2Reference>> listItems = new HashMap<String, List<T2Reference>>();
int pos = 0;
for (NamedNode child : n.children) {
// If child doesn't have a defined name map yet then define
// it
Job j;
if (child == null) {
// happens if we're completing a partially empty
// collection structure
int[] newIndex = new int[index.length + 1];
for (int i = 0; i < index.length; i++) {
newIndex[i] = index[i];
}
newIndex[index.length] = pos++;
j = getEmptyJob(owningProcess, newIndex, context);
AbstractCrystalizer.this.jobCreated(j);
} else {
if (child.contents == null) {
int[] newIndex = new int[index.length + 1];
for (int i = 0; i < index.length; i++) {
newIndex[i] = index[i];
}
newIndex[index.length] = pos++;
assignNamesTo(child, newIndex);
} else {
pos++;
}
j = child.contents;
}
// Now pull the names out of the child job map and push them
// into lists to be registered
for (String outputName : j.getData().keySet()) {
List<T2Reference> items = listItems.get(outputName);
if (items == null) {
items = new ArrayList<T2Reference>();
listItems.put(outputName, items);
}
items.add(j.getData().get(outputName));
}
}
Map<String, T2Reference> newDataMap = new HashMap<String, T2Reference>();
for (String outputName : listItems.keySet()) {
List<T2Reference> idlist = listItems.get(outputName);
newDataMap.put(outputName, context.getReferenceService()
.getListService().registerList(idlist).getId());
}
Job newJob = new Job(owningProcess, index, newDataMap, context);
n.contents = newJob;
// Get rid of the children as we've now named this node
n.children.clear();
AbstractCrystalizer.this.jobCreated(n.contents);
}
}
}
}