| /******************************************************************************* |
| * Copyright (C) 2008 The University of Manchester |
| * |
| * Modifications to the initial code base are copyright of their |
| * respective authors, or their employers as appropriate. |
| * |
| * This program is free software; you can redistribute it and/or |
| * modify it under the terms of the GNU Lesser General Public License |
| * as published by the Free Software Foundation; either version 2.1 of |
| * the License, or (at your option) any later version. |
| * |
| * This program is distributed in the hope that it will be useful, but |
| * WITHOUT ANY WARRANTY; without even the implied warranty of |
| * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU |
| * Lesser General Public License for more details. |
| * |
| * You should have received a copy of the GNU Lesser General Public |
| * License along with this program; if not, write to the Free Software |
| * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 |
| ******************************************************************************/ |
| package net.sf.taverna.t2.workflowmodel.processor.dispatch.layers; |
| |
| import java.lang.Thread.UncaughtExceptionHandler; |
| import java.util.ArrayList; |
| import java.util.Arrays; |
| import java.util.HashMap; |
| import java.util.Map; |
| import java.util.Set; |
| |
| import net.sf.taverna.t2.invocation.InvocationContext; |
| import net.sf.taverna.t2.reference.ReferenceService; |
| import net.sf.taverna.t2.reference.T2Reference; |
| import net.sf.taverna.t2.workflowmodel.Processor; |
| import net.sf.taverna.t2.workflowmodel.processor.activity.AbstractAsynchronousActivity; |
| import net.sf.taverna.t2.workflowmodel.processor.activity.Activity; |
| import net.sf.taverna.t2.workflowmodel.processor.activity.ActivityInputPort; |
| import net.sf.taverna.t2.workflowmodel.processor.activity.AsynchronousActivityCallback; |
| import net.sf.taverna.t2.workflowmodel.processor.dispatch.AbstractDispatchLayer; |
| import net.sf.taverna.t2.workflowmodel.processor.dispatch.events.AbstractDispatchEvent; |
| import net.sf.taverna.t2.workflowmodel.processor.dispatch.events.DispatchCompletionEvent; |
| import net.sf.taverna.t2.workflowmodel.processor.dispatch.events.DispatchErrorEvent; |
| import net.sf.taverna.t2.workflowmodel.processor.dispatch.events.DispatchErrorType; |
| import net.sf.taverna.t2.workflowmodel.processor.dispatch.events.DispatchJobEvent; |
| import net.sf.taverna.t2.workflowmodel.processor.dispatch.events.DispatchJobQueueEvent; |
| import net.sf.taverna.t2.workflowmodel.processor.dispatch.events.DispatchResultEvent; |
| |
| import org.apache.log4j.Logger; |
| |
| import com.fasterxml.jackson.databind.JsonNode; |
| import com.fasterxml.jackson.databind.node.JsonNodeFactory; |
| |
| /** |
| * A layer that allows while-style loops. |
| * <p> |
| * The layer is configured with a {@link LoopConfiguration}, where an activity |
| * has been set as the |
| * {@link LoopConfiguration#setCondition(net.sf.taverna.t2.workflowmodel.processor.activity.Activity) |
| * condition}. |
| * <p> |
| * After a job has been successful further down the dispatch stack, the loop |
| * layer will invoke the conditional activity to determine if the job will be |
| * invoked again. If {@link LoopConfiguration#isRunFirst()} is false, this test |
| * will be performed even before the first invocation. (The default |
| * runFirst=true is equivalent to a do..while construct, while runFirst=false is |
| * equivalent to a while.. construct.) |
| * <p> |
| * A job will be resent down the dispatch stack only if the conditional activity |
| * returns a reference to a string equal to "true" on its output port "loop". |
| * <p> |
| * If a job or the conditional activity fails, the while-loop is interrupted and |
| * the error is sent further up. |
| * <p> |
| * Note that the LoopLayer will be invoked for each item in an iteration, if you |
| * want to do the loop for the whole collection (ie. re-iterating if the |
| * loop-condition fails after processing the full list) - create a nested |
| * workflow with the desired depths on it's input ports and insert this |
| * LoopLayer in the stack of the nested workflow's processor in parent workflow. |
| * <p> |
| * It is recommended that the LoopLayer is to be inserted after the |
| * {@link ErrorBounce} layer, as this layer is needed for registering errors |
| * produced by the LoopLayer. If the user requires {@link Retry retries} and |
| * {@link Failover failovers} before checking the while condition, such layers |
| * should be below LoopLayer. |
| * |
| * @author Stian Soiland-Reyes |
| */ |
| // FIXME Doesn't work |
| @SuppressWarnings({"unchecked","rawtypes"}) |
| public class Loop extends AbstractDispatchLayer<JsonNode> { |
| public static final String URI = "http://ns.taverna.org.uk/2010/scufl2/taverna/dispatchlayer/Loop"; |
| private static Logger logger = Logger.getLogger(Loop.class); |
| |
| private JsonNode config = JsonNodeFactory.instance.objectNode(); |
| |
| protected Map<String, AbstractDispatchEvent> incomingJobs = new HashMap<>(); |
| protected Map<String, AbstractDispatchEvent> outgoingJobs = new HashMap<>(); |
| |
| @Override |
| public void configure(JsonNode config) { |
| this.config = config; |
| } |
| |
| @Override |
| public void finishedWith(String owningProcess) { |
| String prefix = owningProcess + "["; |
| synchronized (outgoingJobs) { |
| for (String key : new ArrayList<>(outgoingJobs.keySet())) |
| if (key.startsWith(prefix)) |
| outgoingJobs.remove(key); |
| } |
| synchronized (incomingJobs) { |
| for (String key : new ArrayList<>(incomingJobs.keySet())) |
| if (key.startsWith(prefix)) |
| incomingJobs.remove(key); |
| } |
| } |
| |
| @Override |
| public JsonNode getConfiguration() { |
| return config; |
| } |
| |
| @Override |
| public void receiveJob(DispatchJobEvent jobEvent) { |
| synchronized (incomingJobs) { |
| incomingJobs.put(jobIdentifier(jobEvent), jobEvent); |
| } |
| if (config.get("runFirst").asBoolean()) { |
| // We'll do the conditional in receiveResult instead |
| super.receiveJob(jobEvent); |
| return; |
| } |
| checkCondition(jobEvent); |
| } |
| |
| @Override |
| public void receiveJobQueue(DispatchJobQueueEvent jobQueueEvent) { |
| synchronized (incomingJobs) { |
| incomingJobs.put(jobIdentifier(jobQueueEvent), jobQueueEvent); |
| } |
| if (config.get("runFirst").asBoolean()) { |
| // We'll do the conditional in receiveResult instead |
| super.receiveJobQueue(jobQueueEvent); |
| return; |
| } |
| checkCondition(jobQueueEvent); |
| } |
| |
| private Activity<?> getCondition() { |
| //return config.getCondition(); |
| return null; |
| } |
| |
| @Override |
| public void receiveResult(DispatchResultEvent resultEvent) { |
| Activity<?> condition = getCondition(); |
| if (condition == null) { |
| super.receiveResult(resultEvent); |
| return; |
| } |
| synchronized (outgoingJobs) { |
| outgoingJobs.put(jobIdentifier(resultEvent), resultEvent); |
| } |
| checkCondition(resultEvent); |
| } |
| |
| @Override |
| public void receiveResultCompletion(DispatchCompletionEvent completionEvent) { |
| Activity<?> condition = getCondition(); |
| if (condition == null) { |
| super.receiveResultCompletion(completionEvent); |
| return; |
| } |
| synchronized (outgoingJobs) { |
| outgoingJobs.put(jobIdentifier(completionEvent), completionEvent); |
| } |
| checkCondition(completionEvent); |
| } |
| |
| private void checkCondition(AbstractDispatchEvent event) { |
| Activity<?> condition = getCondition(); |
| if (condition == null) { |
| super.receiveError(new DispatchErrorEvent(event.getOwningProcess(), |
| event.getIndex(), event.getContext(), |
| "Can't invoke condition service: null", null, |
| DispatchErrorType.INVOCATION, condition)); |
| return; |
| } |
| if (!(condition instanceof AbstractAsynchronousActivity)) { |
| DispatchErrorEvent errorEvent = new DispatchErrorEvent( |
| event.getOwningProcess(), |
| event.getIndex(), |
| event.getContext(), |
| "Can't invoke condition service " |
| + condition |
| + " is not an instance of AbstractAsynchronousActivity", |
| null, DispatchErrorType.INVOCATION, condition); |
| super.receiveError(errorEvent); |
| return; |
| } |
| AbstractAsynchronousActivity asyncCondition = (AbstractAsynchronousActivity) condition; |
| String jobIdentifier = jobIdentifier(event); |
| Map<String, T2Reference> inputs = prepareInputs(asyncCondition, |
| jobIdentifier); |
| AsynchronousActivityCallback callback = new ConditionCallBack( |
| jobIdentifier); |
| asyncCondition.executeAsynch(inputs, callback); |
| } |
| |
| private Map<String, T2Reference> prepareInputs( |
| AbstractAsynchronousActivity asyncCondition, String jobIdentifier) { |
| Map<String, T2Reference> inputs = new HashMap<>(); |
| Map<String, T2Reference> inData = getInData(jobIdentifier); |
| Map<String, T2Reference> outData = getOutData(jobIdentifier); |
| |
| Set<ActivityInputPort> inputPorts = asyncCondition.getInputPorts(); |
| for (ActivityInputPort conditionIn : inputPorts) { |
| String conditionPort = conditionIn.getName(); |
| if (outData.containsKey(conditionPort)) |
| // Copy from previous output |
| inputs.put(conditionPort, outData.get(conditionPort)); |
| else if (inData.containsKey(conditionPort)) |
| // Copy from original input |
| inputs.put(conditionPort, inData.get(conditionPort)); |
| } |
| return inputs; |
| } |
| |
| private Map<String, T2Reference> getInData(String jobIdentifier) { |
| AbstractDispatchEvent inEvent; |
| synchronized (incomingJobs) { |
| inEvent = incomingJobs.get(jobIdentifier); |
| } |
| Map<String, T2Reference> inData = new HashMap<>(); |
| if (inEvent instanceof DispatchJobEvent) |
| inData = ((DispatchJobEvent) inEvent).getData(); |
| return inData; |
| } |
| |
| private Map<String, T2Reference> getOutData(String jobIdentifier) { |
| AbstractDispatchEvent outEvent; |
| synchronized (outgoingJobs) { |
| outEvent = outgoingJobs.get(jobIdentifier); |
| } |
| Map<String, T2Reference> outData = new HashMap<>(); |
| if (outEvent instanceof DispatchResultEvent) |
| outData = ((DispatchResultEvent) outEvent).getData(); |
| return outData; |
| } |
| |
| private String jobIdentifier(AbstractDispatchEvent event) { |
| String jobId = event.getOwningProcess() |
| + Arrays.toString(event.getIndex()); |
| return jobId; |
| } |
| |
| public static final String LOOP_PORT = "loop"; |
| |
| public class ConditionCallBack implements AsynchronousActivityCallback { |
| private InvocationContext context; |
| private final String jobIdentifier; |
| private String processId; |
| |
| public ConditionCallBack(String jobIdentifier) { |
| this.jobIdentifier = jobIdentifier; |
| AbstractDispatchEvent originalEvent; |
| synchronized (incomingJobs) { |
| originalEvent = incomingJobs.get(jobIdentifier); |
| } |
| context = originalEvent.getContext(); |
| processId = originalEvent.getOwningProcess() + ":condition"; |
| } |
| |
| @Override |
| public void fail(String message) { |
| fail(message, null, DispatchErrorType.INVOCATION); |
| } |
| |
| @Override |
| public void fail(String message, Throwable t) { |
| fail(message, t, DispatchErrorType.INVOCATION); |
| } |
| |
| @Override |
| public void fail(String message, Throwable t, |
| DispatchErrorType errorType) { |
| logger.warn("Failed (" + errorType + ") invoking condition service " |
| + jobIdentifier + ":" + message, t); |
| |
| AbstractDispatchEvent originalEvent; |
| synchronized (incomingJobs) { |
| originalEvent = incomingJobs.get(jobIdentifier); |
| } |
| receiveError(new DispatchErrorEvent(originalEvent |
| .getOwningProcess(), originalEvent.getIndex(), |
| originalEvent.getContext(), |
| "Can't invoke condition service ", t, |
| DispatchErrorType.INVOCATION, null)); |
| } |
| |
| @Override |
| public InvocationContext getContext() { |
| return context; |
| } |
| |
| @Override |
| public String getParentProcessIdentifier() { |
| return processId; |
| } |
| |
| @Override |
| public void receiveCompletion(int[] completionIndex) { |
| // Ignore streaming |
| } |
| |
| @Override |
| public void receiveResult(Map<String, T2Reference> data, int[] index) { |
| if (index.length > 0) { |
| // Ignore streaming |
| return; |
| } |
| T2Reference loopRef = data.get(LOOP_PORT); |
| if (loopRef == null) { |
| fail("Condition service didn't contain output port " + LOOP_PORT); |
| return; |
| } |
| if (loopRef.containsErrors()) { |
| fail("Condition service failed: " + loopRef); |
| return; |
| } |
| if (loopRef.getDepth() != 0) { |
| fail("Condition service output " + LOOP_PORT |
| + " depth is not 0, but " + loopRef.getDepth()); |
| } |
| ReferenceService referenceService = context.getReferenceService(); |
| String loop = (String) referenceService.renderIdentifier(loopRef, |
| String.class, context); |
| |
| if (Boolean.parseBoolean(loop)) { |
| // Push it down again |
| AbstractDispatchEvent dispatchEvent; |
| synchronized (incomingJobs) { |
| dispatchEvent = incomingJobs.get(jobIdentifier); |
| } |
| if (dispatchEvent == null) { |
| fail("Unknown job identifier " + jobIdentifier); |
| } |
| if (dispatchEvent instanceof DispatchJobEvent) { |
| DispatchJobEvent newJobEvent = prepareNewJobEvent(data, |
| dispatchEvent); |
| getBelow().receiveJob(newJobEvent); |
| } else if (dispatchEvent instanceof DispatchJobQueueEvent) { |
| getBelow().receiveJobQueue( |
| (DispatchJobQueueEvent) dispatchEvent); |
| } else { |
| fail("Unknown type of incoming event " + dispatchEvent); |
| } |
| return; |
| |
| } else { |
| // We'll push it up, end of loop for now |
| |
| AbstractDispatchEvent outgoingEvent; |
| synchronized (outgoingJobs) { |
| outgoingEvent = outgoingJobs.get(jobIdentifier); |
| } |
| if (outgoingEvent == null && !config.get("runFirst").asBoolean()) { |
| fail("Initial loop condition failed"); |
| } |
| if (outgoingEvent instanceof DispatchCompletionEvent) { |
| getAbove().receiveResultCompletion( |
| (DispatchCompletionEvent) outgoingEvent); |
| } else if (outgoingEvent instanceof DispatchResultEvent) { |
| getAbove().receiveResult( |
| (DispatchResultEvent) outgoingEvent); |
| } else { |
| fail("Unknown type of outgoing event " + outgoingEvent); |
| } |
| } |
| |
| } |
| |
| private DispatchJobEvent prepareNewJobEvent( |
| Map<String, T2Reference> data, |
| AbstractDispatchEvent dispatchEvent) { |
| DispatchJobEvent dispatchJobEvent = (DispatchJobEvent) dispatchEvent; |
| Map<String, T2Reference> newInputs = new HashMap<String, T2Reference>( |
| dispatchJobEvent.getData()); |
| newInputs.putAll(data); |
| DispatchJobEvent newJobEvent = new DispatchJobEvent(dispatchEvent |
| .getOwningProcess(), dispatchEvent.getIndex(), |
| dispatchEvent.getContext(), newInputs, |
| ((DispatchJobEvent) dispatchEvent).getActivities()); |
| /* |
| * TODO: Should this be registered as an incomingJobs? If so the |
| * conditional could even feed to itself, and we should also keep a |
| * list of originalJobs. |
| */ |
| return newJobEvent; |
| } |
| |
| @Override |
| public void requestRun(Runnable runMe) { |
| String newThreadName = "Condition service " |
| + getParentProcessIdentifier(); |
| Thread thread = new Thread(runMe, newThreadName); |
| thread.setUncaughtExceptionHandler(new UncaughtExceptionHandler() { |
| @Override |
| public void uncaughtException(Thread t, Throwable e) { |
| fail("Uncaught exception while invoking " + jobIdentifier, |
| e); |
| } |
| }); |
| thread.start(); |
| } |
| } |
| |
| @Override |
| public Processor getProcessor() { |
| if (dispatchStack == null) |
| return null; |
| return dispatchStack.getProcessor(); |
| } |
| } |