blob: 0767a3e7fc315aea5dcf27b3a9b93a4b2c96446d [file] [log] [blame]
/*******************************************************************************
* Copyright (C) 2007 The University of Manchester
*
* Modifications to the initial code base are copyright of their
* respective authors, or their employers as appropriate.
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU Lesser General Public License
* as published by the Free Software Foundation; either version 2.1 of
* the License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful, but
* WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* Lesser General Public License for more details.
*
* You should have received a copy of the GNU Lesser General Public
* License along with this program; if not, write to the Free Software
* Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307
******************************************************************************/
package net.sf.taverna.t2.workflowmodel.processor.iteration;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import net.sf.taverna.t2.invocation.TreeCache;
import net.sf.taverna.t2.reference.T2Reference;
import net.sf.taverna.t2.workflowmodel.processor.activity.Job;
/**
* Matches jobs where the index array of the job on index 0 is the prefix of the
* index array of the job on index 1. This node can only ever have exactly two
* child nodes!
*
* @author Tom Oinn
*
*/
public class PrefixDotProduct extends DotProduct {
@Override
protected synchronized final void cleanUp(String owningProcess) {
ownerToCache.remove(owningProcess);
}
@Override
public void innerReceiveJob(int inputIndex, Job newJob) {
String owningProcess = newJob.getOwningProcess();
TreeCache[] caches;
synchronized (ownerToCache) {
caches = ownerToCache.get(owningProcess);
// Create the caches if not already initialized
if (caches == null) {
caches = new TreeCache[getChildCount()];
for (int i = 0; i < getChildCount(); i++) {
caches[i] = new TreeCache();
}
ownerToCache.put(owningProcess, caches);
}
}
// Store the job
caches[inputIndex].insertJob(newJob);
// If this job came in on index 0 we have to find all jobs in the cache
// for index 1 which have the index array as a prefix. Fortunately this
// is quite easy due to the tree structure of the cache, we can just ask
// for all nodes in the cache with that index.
if (inputIndex == 0) {
int[] prefixIndexArray = newJob.getIndex();
List<Job> matchingJobs;
synchronized (caches[1]) {
// Match all jobs and remove them so other calls can't produce
// duplicates
matchingJobs = caches[1].jobsWithPrefix(prefixIndexArray);
caches[1].cut(prefixIndexArray);
}
for (Job job : matchingJobs) {
Map<String, T2Reference> newDataMap = new HashMap<String, T2Reference>();
newDataMap.putAll(newJob.getData());
newDataMap.putAll(job.getData());
Job mergedJob = new Job(owningProcess, job.getIndex(),
newDataMap, newJob.getContext());
pushJob(mergedJob);
}
}
// If the job came in on index 1 we have to find the job on index 0 that
// matches the first 'n' indices, where 'n' is determined by the depth
// of jobs on the cache for index 0.
else if (inputIndex == 1) {
// Only act if we've received jobs on the cache at index 0
if (caches[0].getIndexLength() > 0) {
int[] prefix = new int[caches[0].getIndexLength()];
for (int i = 0; i < prefix.length; i++) {
prefix[i] = newJob.getIndex()[i];
}
Job j = caches[0].get(prefix);
if (j != null) {
Map<String, T2Reference> newDataMap = new HashMap<String, T2Reference>();
newDataMap.putAll(j.getData());
newDataMap.putAll(newJob.getData());
Job mergedJob = new Job(owningProcess, newJob.getIndex(),
newDataMap, newJob.getContext());
pushJob(mergedJob);
}
}
}
}
}