| /******************************************************************************* |
| * Copyright (C) 2007 The University of Manchester |
| * |
| * Modifications to the initial code base are copyright of their |
| * respective authors, or their employers as appropriate. |
| * |
| * This program is free software; you can redistribute it and/or |
| * modify it under the terms of the GNU Lesser General Public License |
| * as published by the Free Software Foundation; either version 2.1 of |
| * the License, or (at your option) any later version. |
| * |
| * This program is distributed in the hope that it will be useful, but |
| * WITHOUT ANY WARRANTY; without even the implied warranty of |
| * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU |
| * Lesser General Public License for more details. |
| * |
| * You should have received a copy of the GNU Lesser General Public |
| * License along with this program; if not, write to the Free Software |
| * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 |
| ******************************************************************************/ |
| package net.sf.taverna.t2.workflowmodel.processor.dispatch.layers; |
| |
| import static net.sf.taverna.t2.workflowmodel.processor.dispatch.description.DispatchMessageType.ERROR; |
| import static net.sf.taverna.t2.workflowmodel.processor.dispatch.description.DispatchMessageType.RESULT; |
| import static net.sf.taverna.t2.workflowmodel.processor.dispatch.description.DispatchMessageType.RESULT_COMPLETION; |
| |
| import java.lang.Thread.UncaughtExceptionHandler; |
| import java.sql.Date; |
| import java.util.HashMap; |
| import java.util.HashSet; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.Set; |
| import java.util.UUID; |
| |
| import net.sf.taverna.t2.invocation.InvocationContext; |
| import net.sf.taverna.t2.monitor.MonitorManager; |
| import net.sf.taverna.t2.monitor.MonitorableProperty; |
| import net.sf.taverna.t2.provenance.item.InvocationStartedProvenanceItem; |
| import net.sf.taverna.t2.provenance.item.IterationProvenanceItem; |
| import net.sf.taverna.t2.provenance.reporter.ProvenanceReporter; |
| import net.sf.taverna.t2.reference.ReferenceService; |
| import net.sf.taverna.t2.reference.T2Reference; |
| import net.sf.taverna.t2.workflowmodel.ControlBoundary; |
| import net.sf.taverna.t2.workflowmodel.OutputPort; |
| import net.sf.taverna.t2.workflowmodel.processor.activity.Activity; |
| import net.sf.taverna.t2.workflowmodel.processor.activity.AsynchronousActivity; |
| import net.sf.taverna.t2.workflowmodel.processor.activity.AsynchronousActivityCallback; |
| import net.sf.taverna.t2.workflowmodel.processor.activity.MonitorableAsynchronousActivity; |
| import net.sf.taverna.t2.workflowmodel.processor.dispatch.AbstractDispatchLayer; |
| import net.sf.taverna.t2.workflowmodel.processor.dispatch.DispatchLayer; |
| import net.sf.taverna.t2.workflowmodel.processor.dispatch.description.DispatchLayerJobReaction; |
| import net.sf.taverna.t2.workflowmodel.processor.dispatch.events.DispatchCompletionEvent; |
| import net.sf.taverna.t2.workflowmodel.processor.dispatch.events.DispatchErrorEvent; |
| import net.sf.taverna.t2.workflowmodel.processor.dispatch.events.DispatchErrorType; |
| import net.sf.taverna.t2.workflowmodel.processor.dispatch.events.DispatchJobEvent; |
| import net.sf.taverna.t2.workflowmodel.processor.dispatch.events.DispatchResultEvent; |
| |
| import org.apache.log4j.Logger; |
| |
| import com.fasterxml.jackson.databind.JsonNode; |
| |
| /** |
| * Context free invoker layer, does not pass index arrays of jobs into activity |
| * instances. |
| * <p> |
| * This layer will invoke the first invokable activity in the activity list, so |
| * any sane dispatch stack will have narrowed this down to a single item list by |
| * this point, i.e. by the insertion of a failover layer. |
| * <p> |
| * Currently only handles activities implementing {@link AsynchronousActivity}. |
| * |
| * @author Tom Oinn |
| * @author Stian Soiland-Reyes |
| * |
| */ |
| @DispatchLayerJobReaction(emits = { ERROR, RESULT_COMPLETION, RESULT }, relaysUnmodified = false, stateEffects = {}) |
| @ControlBoundary |
| public class Invoke extends AbstractDispatchLayer<JsonNode> { |
| |
| public static final String URI = "http://ns.taverna.org.uk/2010/scufl2/taverna/dispatchlayer/Invoke"; |
| |
| private static Logger logger = Logger.getLogger(Invoke.class); |
| |
| private static Long invocationCount = 0L; |
| |
| private static String getNextProcessID() { |
| synchronized (invocationCount) { |
| invocationCount = invocationCount + 1L; |
| } |
| return "invocation" + invocationCount; |
| } |
| |
| public Invoke() { |
| super(); |
| } |
| |
| public void configure(JsonNode config) { |
| // No configuration, do nothing |
| } |
| |
| public JsonNode getConfiguration() { |
| return null; |
| } |
| |
| /** |
| * Receive a job from the layer above and pick the first concrete activity |
| * from the list to invoke. Invoke this activity, creating a callback which |
| * will wrap up the result messages in the appropriate collection depth |
| * before sending them on (in general activities are not aware of their |
| * invocation context and should not be responsible for providing correct |
| * index arrays for results) |
| * <p> |
| * This layer will invoke the first invokable activity in the activity list, |
| * so any sane dispatch stack will have narrowed this down to a single item |
| * list by this point, i.e. by the insertion of a failover layer. |
| */ |
| @Override |
| public void receiveJob(final DispatchJobEvent jobEvent) { |
| for (Activity<?> activity : jobEvent.getActivities()) { |
| |
| if (activity instanceof AsynchronousActivity) { |
| // Register with the monitor |
| final String invocationProcessIdentifier = jobEvent |
| .pushOwningProcess(getNextProcessID()) |
| .getOwningProcess(); |
| MonitorManager.getInstance().registerNode(activity, |
| invocationProcessIdentifier, |
| new HashSet<MonitorableProperty<?>>()); |
| MonitorManager.getInstance().registerNode(jobEvent, |
| invocationProcessIdentifier, |
| new HashSet<MonitorableProperty<?>>()); |
| |
| // The activity is an AsynchronousActivity so we invoke it with |
| // an AsynchronousActivityCallback object containing appropriate |
| // callback methods to push results, completions and failures |
| // back to the invocation layer. |
| final AsynchronousActivity<?> asyncActivity = (AsynchronousActivity<?>) activity; |
| |
| // Get the registered DataManager for this process. In most |
| // cases this will just be a single DataManager for the entire |
| // workflow system but it never hurts to generalize |
| |
| InvocationContext context = jobEvent.getContext(); |
| final ReferenceService refService = context |
| .getReferenceService(); |
| |
| InvocationStartedProvenanceItem invocationItem = null; |
| ProvenanceReporter provenanceReporter = context.getProvenanceReporter(); |
| if (provenanceReporter != null) { |
| IntermediateProvenance intermediateProvenance = findIntermediateProvenance(); |
| if (intermediateProvenance != null) { |
| invocationItem = new InvocationStartedProvenanceItem(); |
| IterationProvenanceItem parentItem = intermediateProvenance.getIterationProvItem(jobEvent); |
| invocationItem.setIdentifier(UUID.randomUUID().toString()); |
| invocationItem.setActivity(asyncActivity); |
| invocationItem.setProcessId(jobEvent.getOwningProcess()); |
| invocationItem.setInvocationProcessId(invocationProcessIdentifier); |
| invocationItem.setParentId(parentItem.getIdentifier()); |
| invocationItem.setWorkflowId(parentItem.getWorkflowId()); |
| invocationItem.setInvocationStarted(new Date(System.currentTimeMillis())); |
| provenanceReporter.addProvenanceItem(invocationItem); |
| } |
| } |
| |
| // Create a Map of EntityIdentifiers named appropriately given |
| // the activity mapping |
| Map<String, T2Reference> inputData = new HashMap<String, T2Reference>(); |
| for (String inputName : jobEvent.getData().keySet()) { |
| String activityInputName = asyncActivity |
| .getInputPortMapping().get(inputName); |
| if (activityInputName != null) { |
| inputData.put(activityInputName, jobEvent.getData() |
| .get(inputName)); |
| } |
| } |
| |
| // Create a callback object to receive events, completions and |
| // failure notifications from the activity |
| AsynchronousActivityCallback callback = new InvokeCallBack( |
| jobEvent, refService, invocationProcessIdentifier, |
| asyncActivity); |
| |
| if (asyncActivity instanceof MonitorableAsynchronousActivity<?>) { |
| // Monitorable activity so get the monitorable properties |
| // and push them into the state tree after launching the job |
| MonitorableAsynchronousActivity<?> maa = (MonitorableAsynchronousActivity<?>) asyncActivity; |
| Set<MonitorableProperty<?>> props = maa |
| .executeAsynchWithMonitoring(inputData, callback); |
| MonitorManager.getInstance().addPropertiesToNode( |
| invocationProcessIdentifier.split(":"), props); |
| } else { |
| // Run the job, passing in the callback we've just created |
| // along with the (possibly renamed) input data map |
| asyncActivity.executeAsynch(inputData, callback); |
| } |
| return; |
| } |
| } |
| } |
| |
| protected IntermediateProvenance findIntermediateProvenance() { |
| List<DispatchLayer<?>> layers = getProcessor().getDispatchStack().getLayers(); |
| for (DispatchLayer<?> layer : layers) { |
| if (layer instanceof IntermediateProvenance) { |
| return (IntermediateProvenance) layer; |
| } |
| } |
| return null; |
| } |
| |
| protected class InvokeCallBack implements AsynchronousActivityCallback { |
| protected final AsynchronousActivity<?> asyncActivity; |
| protected final String invocationProcessIdentifier; |
| protected final DispatchJobEvent jobEvent; |
| protected final ReferenceService refService; |
| protected boolean sentJob = false; |
| |
| protected InvokeCallBack(DispatchJobEvent jobEvent, |
| ReferenceService refService, |
| String invocationProcessIdentifier, |
| AsynchronousActivity<?> asyncActivity) { |
| this.jobEvent = jobEvent; |
| this.refService = refService; |
| this.invocationProcessIdentifier = invocationProcessIdentifier; |
| this.asyncActivity = asyncActivity; |
| } |
| |
| public void fail(String message) { |
| fail(message, null); |
| } |
| |
| public void fail(String message, Throwable t) { |
| fail(message, t, DispatchErrorType.INVOCATION); |
| } |
| |
| public void fail(String message, Throwable t, |
| DispatchErrorType errorType) { |
| logger.warn("Failed (" + errorType + ") invoking " + asyncActivity |
| + " for job " + jobEvent + ": " + message, t); |
| MonitorManager.getInstance().deregisterNode( |
| invocationProcessIdentifier); |
| getAbove().receiveError( |
| new DispatchErrorEvent(jobEvent.getOwningProcess(), |
| jobEvent.getIndex(), jobEvent.getContext(), |
| message, t, errorType, asyncActivity)); |
| } |
| |
| public InvocationContext getContext() { |
| return jobEvent.getContext(); |
| } |
| |
| public String getParentProcessIdentifier() { |
| return invocationProcessIdentifier; |
| } |
| |
| public void receiveCompletion(int[] completionIndex) { |
| if (completionIndex.length == 0) { |
| // Final result, clean up monitor state |
| MonitorManager.getInstance().deregisterNode( |
| invocationProcessIdentifier); |
| } |
| if (sentJob) { |
| int[] newIndex; |
| if (completionIndex.length == 0) { |
| newIndex = jobEvent.getIndex(); |
| } else { |
| newIndex = new int[jobEvent.getIndex().length |
| + completionIndex.length]; |
| int i = 0; |
| for (int indexValue : jobEvent.getIndex()) { |
| newIndex[i++] = indexValue; |
| } |
| for (int indexValue : completionIndex) { |
| newIndex[i++] = indexValue; |
| } |
| } |
| DispatchCompletionEvent c = new DispatchCompletionEvent( |
| jobEvent.getOwningProcess(), newIndex, jobEvent |
| .getContext()); |
| getAbove().receiveResultCompletion(c); |
| } else { |
| // We haven't sent any 'real' data prior to |
| // completing a stream. This in effect means we're |
| // sending an empty top level collection so we need |
| // to register empty collections for each output |
| // port with appropriate depth (by definition if |
| // we're streaming all outputs are collection types |
| // of some kind) |
| Map<String, T2Reference> emptyListMap = new HashMap<String, T2Reference>(); |
| for (OutputPort op : asyncActivity.getOutputPorts()) { |
| String portName = op.getName(); |
| int portDepth = op.getDepth(); |
| emptyListMap.put(portName, refService.getListService() |
| .registerEmptyList(portDepth, jobEvent.getContext()).getId()); |
| } |
| receiveResult(emptyListMap, new int[0]); |
| } |
| |
| } |
| |
| public void receiveResult(Map<String, T2Reference> data, int[] index) { |
| |
| // Construct a new result map using the activity mapping |
| // (activity output name to processor output name) |
| Map<String, T2Reference> resultMap = new HashMap<String, T2Reference>(); |
| for (String outputName : data.keySet()) { |
| String processorOutputName = asyncActivity |
| .getOutputPortMapping().get(outputName); |
| if (processorOutputName != null) { |
| resultMap.put(processorOutputName, data.get(outputName)); |
| } |
| } |
| // Construct a new index array if the specified index is |
| // non zero length, otherwise just use the original |
| // job's index array (means we're not streaming) |
| int[] newIndex; |
| boolean streaming = false; |
| if (index.length == 0) { |
| newIndex = jobEvent.getIndex(); |
| } else { |
| streaming = true; |
| newIndex = new int[jobEvent.getIndex().length + index.length]; |
| int i = 0; |
| for (int indexValue : jobEvent.getIndex()) { |
| newIndex[i++] = indexValue; |
| } |
| for (int indexValue : index) { |
| newIndex[i++] = indexValue; |
| } |
| } |
| DispatchResultEvent resultEvent = new DispatchResultEvent(jobEvent |
| .getOwningProcess(), newIndex, jobEvent.getContext(), |
| resultMap, streaming); |
| if (!streaming) { |
| MonitorManager.getInstance().registerNode(resultEvent, |
| invocationProcessIdentifier, |
| new HashSet<MonitorableProperty<?>>()); |
| // Final result, clean up monitor state |
| MonitorManager.getInstance().deregisterNode( |
| invocationProcessIdentifier); |
| } |
| // Push the modified data to the layer above in the |
| // dispatch stack |
| getAbove().receiveResult(resultEvent); |
| |
| sentJob = true; |
| } |
| |
| public void requestRun(Runnable runMe) { |
| String newThreadName = jobEvent.toString(); |
| Thread thread = new Thread(runMe, newThreadName); |
| thread.setContextClassLoader(asyncActivity.getClass().getClassLoader() ); |
| thread.setUncaughtExceptionHandler(new UncaughtExceptionHandler(){ |
| public void uncaughtException(Thread t, Throwable e) { |
| fail("Uncaught exception while invoking " + asyncActivity, e); |
| }}); |
| thread.start(); |
| } |
| } |
| |
| } |