Clean up dependency management, squelch many (pointless) warnings.
diff --git a/pom.xml b/pom.xml
index a8d753d..9f8d8d6 100644
--- a/pom.xml
+++ b/pom.xml
@@ -56,14 +56,9 @@
<scope>test</scope>
</dependency>
<dependency>
- <groupId>org.apache.log4j</groupId>
- <artifactId>com.springsource.org.apache.log4j</artifactId>
- <version>${log4j.version}</version>
- </dependency>
- <dependency>
<groupId>commons-io</groupId>
<artifactId>commons-io</artifactId>
- <version>${commons.io.version}</version>
+ <version>${commons.io.version}</version>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
@@ -73,7 +68,7 @@
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>com.springsource.org.apache.commons.lang</artifactId>
- <version>${commons.lang.version}</version>
+ <version>${commons.lang.version}</version>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
@@ -85,6 +80,26 @@
<artifactId>com.springsource.org.jdom</artifactId>
<version>${jdom.version}</version>
</dependency>
+ <dependency>
+ <groupId>com.fasterxml.jackson.core</groupId>
+ <artifactId>jackson-databind</artifactId>
+ <version>${jackson-databind.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>log4j</groupId>
+ <artifactId>log4j</artifactId>
+ <version>${log4j.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>com.thoughtworks.xstream</groupId>
+ <artifactId>com.springsource.com.thoughtworks.xstream</artifactId>
+ <version>${xstream.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.springframework</groupId>
+ <artifactId>org.springframework.transaction</artifactId>
+ <version>${spring.version}</version>
+ </dependency>
</dependencies>
</dependencyManagement>
</project>
diff --git a/reference-api/pom.xml b/reference-api/pom.xml
index 6841e47..8220dd6 100644
--- a/reference-api/pom.xml
+++ b/reference-api/pom.xml
@@ -35,7 +35,6 @@
<dependency>
<groupId>org.springframework</groupId>
<artifactId>org.springframework.transaction</artifactId>
- <version>${spring.version}</version>
</dependency>
</dependencies>
</project>
diff --git a/reference-impl/pom.xml b/reference-impl/pom.xml
index ae5d3ff..01cfdc1 100644
--- a/reference-impl/pom.xml
+++ b/reference-impl/pom.xml
@@ -66,12 +66,12 @@
<dependency>
<groupId>org.springframework</groupId>
<artifactId>org.springframework.transaction</artifactId>
- <version>${spring.version}</version>
</dependency>
<dependency>
<groupId>org.aspectj</groupId>
<artifactId>com.springsource.org.aspectj.weaver</artifactId>
<version>${aspectj.version}</version>
+ <scope>runtime</scope>
</dependency>
<dependency>
<groupId>net.sf.taverna.t2.core</groupId>
@@ -109,14 +109,12 @@
<version>${javax.transaction.version}</version>
</dependency>
<dependency>
- <groupId>org.apache.log4j</groupId>
- <artifactId>com.springsource.org.apache.log4j</artifactId>
- <version>${log4j.version}</version>
+ <groupId>log4j</groupId>
+ <artifactId>log4j</artifactId>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
- <version>${junit.version}</version>
<scope>test</scope>
</dependency>
<dependency>
diff --git a/reference-testhelpers/pom.xml b/reference-testhelpers/pom.xml
index c92146b..9d36420 100644
--- a/reference-testhelpers/pom.xml
+++ b/reference-testhelpers/pom.xml
@@ -24,6 +24,15 @@
environment.
</description>
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.felix</groupId>
+ <artifactId>maven-bundle-plugin</artifactId>
+ <extensions>true</extensions>
+ </plugin>
+ </plugins>
+ </build>
<dependencies>
<!-- Only depend on the t2reference API package here -->
<dependency>
@@ -32,8 +41,8 @@
<version>${project.version}</version>
</dependency>
<dependency>
- <groupId>org.apache.log4j</groupId>
- <artifactId>com.springsource.org.apache.log4j</artifactId>
+ <groupId>log4j</groupId>
+ <artifactId>log4j</artifactId>
</dependency>
</dependencies>
</project>
diff --git a/workflowmodel-api/pom.xml b/workflowmodel-api/pom.xml
index 8b22fa0..3e06a51 100644
--- a/workflowmodel-api/pom.xml
+++ b/workflowmodel-api/pom.xml
@@ -32,7 +32,6 @@
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
- <version>${jackson-databind.version}</version>
</dependency>
<dependency>
<groupId>org.jdom</groupId>
@@ -41,7 +40,6 @@
<dependency>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
- <version>${log4j.version}</version>
</dependency>
<dependency>
<groupId>junit</groupId>
diff --git a/workflowmodel-core-extensions/.gitignore b/workflowmodel-core-extensions/.gitignore
new file mode 100644
index 0000000..b83d222
--- /dev/null
+++ b/workflowmodel-core-extensions/.gitignore
@@ -0,0 +1 @@
+/target/
diff --git a/workflowmodel-core-extensions/pom.xml b/workflowmodel-core-extensions/pom.xml
index 7c6ee59..4c83d02 100644
--- a/workflowmodel-core-extensions/pom.xml
+++ b/workflowmodel-core-extensions/pom.xml
@@ -11,22 +11,20 @@
<packaging>bundle</packaging>
<name>Implementation of core extension points to the workflow model</name>
<dependencies>
- <dependency>
- <groupId>net.sf.taverna.t2.core</groupId>
- <artifactId>workflowmodel-api</artifactId>
- <version>${project.version}</version>
- </dependency>
- <dependency>
- <groupId>com.fasterxml.jackson.core</groupId>
- <artifactId>jackson-databind</artifactId>
- <version>${jackson-databind.version}</version>
- </dependency>
- <dependency>
- <groupId>junit</groupId>
- <artifactId>junit</artifactId>
- <version>${junit.version}</version>
- <scope>test</scope>
- </dependency>
+ <dependency>
+ <groupId>net.sf.taverna.t2.core</groupId>
+ <artifactId>workflowmodel-api</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>com.fasterxml.jackson.core</groupId>
+ <artifactId>jackson-databind</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>junit</groupId>
+ <artifactId>junit</artifactId>
+ <scope>test</scope>
+ </dependency>
</dependencies>
<repositories>
<repository>
diff --git a/workflowmodel-core-extensions/src/main/java/net/sf/taverna/t2/workflowmodel/processor/dispatch/layers/CoreDispatchLayerFactory.java b/workflowmodel-core-extensions/src/main/java/net/sf/taverna/t2/workflowmodel/processor/dispatch/layers/CoreDispatchLayerFactory.java
index cffdee6..0b11627 100644
--- a/workflowmodel-core-extensions/src/main/java/net/sf/taverna/t2/workflowmodel/processor/dispatch/layers/CoreDispatchLayerFactory.java
+++ b/workflowmodel-core-extensions/src/main/java/net/sf/taverna/t2/workflowmodel/processor/dispatch/layers/CoreDispatchLayerFactory.java
@@ -47,7 +47,6 @@
* @author David Withers
*/
public class CoreDispatchLayerFactory implements DispatchLayerFactory {
-
private static final URI parallelizeLayer = URI.create(Parallelize.URI);
private static final URI errorBounceLayer = URI.create(ErrorBounce.URI);
private static final URI failoverLayer = URI.create(Failover.URI);
@@ -72,25 +71,23 @@
@Override
public DispatchLayer<?> createDispatchLayer(URI uri) {
- DispatchLayer<?> dispatchLayer = null;
- if (parallelizeLayer.equals(uri)) {
- dispatchLayer = new Parallelize();
- } else if (errorBounceLayer.equals(uri)) {
- dispatchLayer = new ErrorBounce();
- } else if (failoverLayer.equals(uri)) {
- dispatchLayer = new Failover();
- } else if (retryLayer.equals(uri)) {
- dispatchLayer = new Retry();
- } else if (invokeLayer.equals(uri)) {
- dispatchLayer = new Invoke();
- } else if (loopLayer.equals(uri)) {
- dispatchLayer = new Loop();
- } else if (intermediateProvenanceLayer.equals(uri)) {
- dispatchLayer = new IntermediateProvenance();
- } else if (stopLayer.equals(uri)) {
- dispatchLayer = new Stop();
- }
- return dispatchLayer;
+ if (parallelizeLayer.equals(uri))
+ return new Parallelize();
+ else if (errorBounceLayer.equals(uri))
+ return new ErrorBounce();
+ else if (failoverLayer.equals(uri))
+ return new Failover();
+ else if (retryLayer.equals(uri))
+ return new Retry();
+ else if (invokeLayer.equals(uri))
+ return new Invoke();
+ else if (loopLayer.equals(uri))
+ return new Loop();
+ else if (intermediateProvenanceLayer.equals(uri))
+ return new IntermediateProvenance();
+ else if (stopLayer.equals(uri))
+ return new Stop();
+ return null;
}
@Override
@@ -103,5 +100,4 @@
public Set<URI> getDispatchLayerTypes() {
return dispatchLayerURIs;
}
-
}
diff --git a/workflowmodel-core-extensions/src/main/java/net/sf/taverna/t2/workflowmodel/processor/dispatch/layers/ErrorBounce.java b/workflowmodel-core-extensions/src/main/java/net/sf/taverna/t2/workflowmodel/processor/dispatch/layers/ErrorBounce.java
index 86ae0bb..dfde240 100644
--- a/workflowmodel-core-extensions/src/main/java/net/sf/taverna/t2/workflowmodel/processor/dispatch/layers/ErrorBounce.java
+++ b/workflowmodel-core-extensions/src/main/java/net/sf/taverna/t2/workflowmodel/processor/dispatch/layers/ErrorBounce.java
@@ -38,6 +38,7 @@
import net.sf.taverna.t2.invocation.Event;
import net.sf.taverna.t2.monitor.MonitorableProperty;
import net.sf.taverna.t2.monitor.NoSuchPropertyException;
+import net.sf.taverna.t2.reference.ErrorDocument;
import net.sf.taverna.t2.reference.ReferenceService;
import net.sf.taverna.t2.reference.T2Reference;
import net.sf.taverna.t2.workflowmodel.OutputPort;
@@ -64,9 +65,9 @@
* layer should be placed immediately below the parallelize layer in most
* default cases (this will guarantee the processor never sees a failure message
* though, which may or may not be desirable)
- *
+ *
* @author Tom Oinn
- *
+ *
*/
@DispatchLayerErrorReaction(emits = { RESULT }, relaysUnmodified = false, stateEffects = {
CREATE_PROCESS_STATE, UPDATE_PROCESS_STATE })
@@ -77,26 +78,23 @@
@SupportsStreamedResult
public class ErrorBounce extends AbstractDispatchLayer<JsonNode> implements
PropertyContributingDispatchLayer<JsonNode> {
-
public static final String URI = "http://ns.taverna.org.uk/2010/scufl2/taverna/dispatchlayer/ErrorBounce";
/**
* Track the number of reflected and translated errors handled by this error
* bounce instance
*/
- private Map<String, ErrorBounceState> state = new ConcurrentHashMap<String, ErrorBounceState>();
+ private Map<String, ErrorBounceState> state = new ConcurrentHashMap<>();
private int totalTranslatedErrors = 0;
private int totalReflectedErrors = 0;
private synchronized ErrorBounceState getState(String owningProcess) {
- if (state.containsKey(owningProcess)) {
+ if (state.containsKey(owningProcess))
return state.get(owningProcess);
- } else {
- ErrorBounceState ebs = new ErrorBounceState();
- state.put(owningProcess, ebs);
- return ebs;
- }
+ ErrorBounceState ebs = new ErrorBounceState();
+ state.put(owningProcess, ebs);
+ return ebs;
}
/**
@@ -106,21 +104,17 @@
*/
@Override
public void receiveJob(DispatchJobEvent jobEvent) {
- Set<T2Reference> errorReferences = new HashSet<T2Reference>();
- for (T2Reference ei : jobEvent.getData().values()) {
- if (ei.containsErrors()) {
+ Set<T2Reference> errorReferences = new HashSet<>();
+ for (T2Reference ei : jobEvent.getData().values())
+ if (ei.containsErrors())
errorReferences.add(ei);
- }
- }
- if (errorReferences.isEmpty()) {
+ if (errorReferences.isEmpty())
// relay the message down...
getBelow().receiveJob(jobEvent);
- } else {
- getState(jobEvent.getOwningProcess())
- .incrementErrorsReflected();
+ else {
+ getState(jobEvent.getOwningProcess()).incrementErrorsReflected();
sendErrorOutput(jobEvent, null, errorReferences);
}
-
}
/**
@@ -137,48 +131,55 @@
/**
* Construct and send a new result message with error documents in place of
* all outputs at the appropriate depth
- *
+ *
* @param event
* @param cause
* @param errorReferences
*/
- private void sendErrorOutput(Event<?> event, Throwable cause, Set<T2Reference> errorReferences) {
+ private void sendErrorOutput(Event<?> event, Throwable cause,
+ Set<T2Reference> errorReferences) {
ReferenceService rs = event.getContext().getReferenceService();
Processor p = dispatchStack.getProcessor();
- Map<String, T2Reference> outputDataMap = new HashMap<String, T2Reference>();
+ Map<String, T2Reference> outputDataMap = new HashMap<>();
String[] owningProcessArray = event.getOwningProcess().split(":");
String processor = owningProcessArray[owningProcessArray.length - 1];
for (OutputPort op : p.getOutputPorts()) {
- String message = "Processor '" + processor + "' - Port '" + op.getName() + "'";
- if (event instanceof DispatchErrorEvent) {
+ String message = "Processor '" + processor + "' - Port '"
+ + op.getName() + "'";
+ if (event instanceof DispatchErrorEvent)
message += ": " + ((DispatchErrorEvent) event).getMessage();
- }
- if (cause != null) {
- outputDataMap.put(op.getName(), rs.getErrorDocumentService()
- .registerError(message, cause, op.getDepth(), event.getContext()).getId());
- } else {
- outputDataMap.put(op.getName(), rs.getErrorDocumentService()
- .registerError(message, errorReferences, op.getDepth(), event.getContext()).getId());
- }
+ ErrorDocument ed;
+ if (cause != null)
+ ed = rs.getErrorDocumentService().registerError(message, cause,
+ op.getDepth(), event.getContext());
+ else
+ ed = rs.getErrorDocumentService().registerError(message,
+ errorReferences, op.getDepth(), event.getContext());
+ outputDataMap.put(op.getName(), ed.getId());
}
- DispatchResultEvent dre = new DispatchResultEvent(event.getOwningProcess(),
- event.getIndex(), event.getContext(), outputDataMap, false);
+ DispatchResultEvent dre = new DispatchResultEvent(
+ event.getOwningProcess(), event.getIndex(), event.getContext(),
+ outputDataMap, false);
getAbove().receiveResult(dre);
}
+ @Override
public void configure(JsonNode config) {
// Do nothing - no configuration required
}
+ @Override
public JsonNode getConfiguration() {
// Layer has no configuration associated
return null;
}
+ @Override
public void finishedWith(final String owningProcess) {
- // Delay the removal of the state to give the monitor
- // a chance to poll
+ /*
+ * Delay the removal of the state to give the monitor a chance to poll
+ */
cleanupTimer.schedule(new TimerTask() {
@Override
public void run() {
@@ -194,92 +195,97 @@
* downstream in the stack that have been re-written as complete results
* containing error documents.
*/
+ @Override
public void injectPropertiesFor(final String owningProcess) {
-
MonitorableProperty<Integer> errorsReflectedProperty = new MonitorableProperty<Integer>() {
+ @Override
public Date getLastModified() {
return new Date();
}
+ @Override
public String[] getName() {
return new String[] { "dispatch", "errorbounce", "reflected" };
}
+ @Override
public Integer getValue() throws NoSuchPropertyException {
ErrorBounceState ebs = state.get(owningProcess);
- if (ebs == null) {
+ if (ebs == null)
return 0;
- } else {
- return ebs.getErrorsReflected();
- }
+ return ebs.getErrorsReflected();
}
};
dispatchStack.receiveMonitorableProperty(errorsReflectedProperty,
owningProcess);
MonitorableProperty<Integer> errorsTranslatedProperty = new MonitorableProperty<Integer>() {
+ @Override
public Date getLastModified() {
return new Date();
}
+ @Override
public String[] getName() {
return new String[] { "dispatch", "errorbounce", "translated" };
}
+ @Override
public Integer getValue() throws NoSuchPropertyException {
ErrorBounceState ebs = state.get(owningProcess);
- if (ebs == null) {
+ if (ebs == null)
return 0;
- } else {
- return ebs.getErrorsTranslated();
- }
+ return ebs.getErrorsTranslated();
}
};
dispatchStack.receiveMonitorableProperty(errorsTranslatedProperty,
owningProcess);
MonitorableProperty<Integer> totalTranslatedTranslatedProperty = new MonitorableProperty<Integer>() {
+ @Override
public Date getLastModified() {
return new Date();
}
+ @Override
public String[] getName() {
- return new String[] { "dispatch", "errorbounce", "totalTranslated" };
+ return new String[] { "dispatch", "errorbounce",
+ "totalTranslated" };
}
+ @Override
public Integer getValue() throws NoSuchPropertyException {
return totalTranslatedErrors;
}
};
- dispatchStack.receiveMonitorableProperty(totalTranslatedTranslatedProperty,
- owningProcess);
+ dispatchStack.receiveMonitorableProperty(
+ totalTranslatedTranslatedProperty, owningProcess);
MonitorableProperty<Integer> totalReflectedTranslatedProperty = new MonitorableProperty<Integer>() {
+ @Override
public Date getLastModified() {
return new Date();
}
+ @Override
public String[] getName() {
- return new String[] { "dispatch", "errorbounce", "totalReflected" };
+ return new String[] { "dispatch", "errorbounce",
+ "totalReflected" };
}
+ @Override
public Integer getValue() throws NoSuchPropertyException {
return totalReflectedErrors;
}
};
- dispatchStack.receiveMonitorableProperty(totalReflectedTranslatedProperty,
- owningProcess);
-
-
-
+ dispatchStack.receiveMonitorableProperty(
+ totalReflectedTranslatedProperty, owningProcess);
}
-
public class ErrorBounceState {
private int errorsReflected = 0;
private int errorsTranslated = 0;
-
/**
* Number of times the bounce layer has converted an incoming job event
* where the input data contained error tokens into a result event
@@ -294,26 +300,25 @@
* event into a result containing error tokens
*/
int getErrorsTranslated() {
- return this.errorsTranslated;
+ return errorsTranslated;
}
void incrementErrorsReflected() {
- synchronized(this) {
- errorsReflected++;
- }
- synchronized(ErrorBounce.this) {
+ synchronized (this) {
+ errorsReflected++;
+ }
+ synchronized (ErrorBounce.this) {
totalReflectedErrors++;
}
}
void incrementErrorsTranslated() {
- synchronized(this) {
+ synchronized (this) {
errorsTranslated++;
}
- synchronized(ErrorBounce.this) {
+ synchronized (ErrorBounce.this) {
totalTranslatedErrors++;
}
}
}
-
}
diff --git a/workflowmodel-core-extensions/src/main/java/net/sf/taverna/t2/workflowmodel/processor/dispatch/layers/Failover.java b/workflowmodel-core-extensions/src/main/java/net/sf/taverna/t2/workflowmodel/processor/dispatch/layers/Failover.java
index a93d413..1c5ef03 100644
--- a/workflowmodel-core-extensions/src/main/java/net/sf/taverna/t2/workflowmodel/processor/dispatch/layers/Failover.java
+++ b/workflowmodel-core-extensions/src/main/java/net/sf/taverna/t2/workflowmodel/processor/dispatch/layers/Failover.java
@@ -44,18 +44,15 @@
* in the original list and so on. If a failure is received and there are no
* further activities to use the job fails and the failure is sent back up to
* the layer above.
- *
+ *
* @author Tom Oinn
* @author Stian Soiland-Reyes
- *
- *
*/
@DispatchLayerErrorReaction(emits = { JOB }, relaysUnmodified = true, stateEffects = {
UPDATE_LOCAL_STATE, REMOVE_LOCAL_STATE })
@DispatchLayerJobReaction(emits = {}, relaysUnmodified = true, stateEffects = { CREATE_LOCAL_STATE })
@DispatchLayerResultReaction(emits = {}, relaysUnmodified = true, stateEffects = { REMOVE_LOCAL_STATE })
public class Failover extends AbstractErrorHandlerLayer<JsonNode> {
-
public static final String URI = "http://ns.taverna.org.uk/2010/scufl2/taverna/dispatchlayer/Failover";
@Override
@@ -71,7 +68,7 @@
@Override
public void receiveJob(DispatchJobEvent jobEvent) {
addJobToStateList(jobEvent);
- List<Activity<?>> newActivityList = new ArrayList<Activity<?>>();
+ List<Activity<?>> newActivityList = new ArrayList<>();
newActivityList.add(jobEvent.getActivities().get(0));
getBelow().receiveJob(
new DispatchJobEvent(jobEvent.getOwningProcess(), jobEvent
@@ -80,7 +77,6 @@
}
class FailoverState extends JobState {
-
int currentActivityIndex = 0;
public FailoverState(DispatchJobEvent jobEvent) {
@@ -90,27 +86,26 @@
@Override
public boolean handleError() {
currentActivityIndex++;
- if (currentActivityIndex == jobEvent.getActivities().size()) {
+ if (currentActivityIndex == jobEvent.getActivities().size())
return false;
- } else {
- List<Activity<?>> newActivityList = new ArrayList<Activity<?>>();
- newActivityList.add(jobEvent.getActivities().get(
- currentActivityIndex));
- getBelow().receiveJob(
- new DispatchJobEvent(jobEvent.getOwningProcess(),
- jobEvent.getIndex(), jobEvent.getContext(),
- jobEvent.getData(), newActivityList));
- return true;
- }
+ List<Activity<?>> newActivityList = new ArrayList<>();
+ newActivityList.add(jobEvent.getActivities().get(
+ currentActivityIndex));
+ getBelow().receiveJob(
+ new DispatchJobEvent(jobEvent.getOwningProcess(), jobEvent
+ .getIndex(), jobEvent.getContext(), jobEvent
+ .getData(), newActivityList));
+ return true;
}
}
+ @Override
public void configure(JsonNode config) {
// Do nothing - there is no configuration to do
}
+ @Override
public JsonNode getConfiguration() {
return null;
}
-
}
diff --git a/workflowmodel-core-extensions/src/main/java/net/sf/taverna/t2/workflowmodel/processor/dispatch/layers/IntermediateProvenance.java b/workflowmodel-core-extensions/src/main/java/net/sf/taverna/t2/workflowmodel/processor/dispatch/layers/IntermediateProvenance.java
index 93f055c..718079a 100644
--- a/workflowmodel-core-extensions/src/main/java/net/sf/taverna/t2/workflowmodel/processor/dispatch/layers/IntermediateProvenance.java
+++ b/workflowmodel-core-extensions/src/main/java/net/sf/taverna/t2/workflowmodel/processor/dispatch/layers/IntermediateProvenance.java
@@ -20,6 +20,8 @@
******************************************************************************/
package net.sf.taverna.t2.workflowmodel.processor.dispatch.layers;
+import static java.lang.System.currentTimeMillis;
+
import java.beans.XMLDecoder;
import java.beans.XMLEncoder;
import java.io.ByteArrayInputStream;
@@ -66,27 +68,20 @@
*
*/
public class IntermediateProvenance extends AbstractDispatchLayer<String> {
-
public static final String URI = "http://ns.taverna.org.uk/2010/scufl2/taverna/dispatchlayer/IntermediateProvenance";
-
- Logger logger = Logger.getLogger(IntermediateProvenance.class);
+ private static final Logger logger = Logger.getLogger(IntermediateProvenance.class);
private ProvenanceReporter reporter;
+ private Map<String, Map<String, IterationProvenanceItem>> processToIndexes = new HashMap<>();
+ private Map<ActivityProvenanceItem, List<Object>> activityProvenanceItemMap = new HashMap<>();
+ private Map<InputDataProvenanceItem, List<Object>> inputDataProvenanceItemMap = new HashMap<>();
- Map<String, Map<String, IterationProvenanceItem>> processToIndexes = new HashMap<String, Map<String, IterationProvenanceItem>>();
-
- private Map<ActivityProvenanceItem, List<Object>> activityProvenanceItemMap = new HashMap<ActivityProvenanceItem, List<Object>>();
-
- private Map<InputDataProvenanceItem, List<Object>> inputDataProvenanceItemMap = new HashMap<InputDataProvenanceItem, List<Object>>();
-
- // private List<ActivityProvenanceItem> activityProvenanceItemList = new
- // ArrayList<ActivityProvenanceItem>();
- //
- // private List<InputDataProvenanceItem> inputDataProvenanceItemList = new
- // ArrayList<InputDataProvenanceItem>();
+ // private List<ActivityProvenanceItem> activityProvenanceItemList = new ArrayList<>();
+ // private List<InputDataProvenanceItem> inputDataProvenanceItemList = new ArrayList<>();
private WorkflowProvenanceItem workflowItem;
+ @Override
public void configure(String o) {
}
@@ -100,6 +95,7 @@
processToIndexes.remove(owningProcess);
}
+ @Override
public String getConfiguration() {
return null;
}
@@ -110,7 +106,7 @@
Map<String, IterationProvenanceItem> indexes = processToIndexes
.get(owningProcess);
if (indexes == null) {
- indexes = new HashMap<String, IterationProvenanceItem>();
+ indexes = new HashMap<>();
processToIndexes.put(owningProcess, indexes);
}
return indexes;
@@ -133,11 +129,14 @@
try {
index = removeLastIndex(index);
iterationProvenanceItem = indexes.get(indexStr(index));
- // if we have a 'parent' iteration then create a new
- // iteration for the original index and link it to the
- // activity and the input data
- // FIXME should this be linked to the parent iteration
- // instead?
+ /*
+ * if we have a 'parent' iteration then create a new
+ * iteration for the original index and link it to the
+ * activity and the input data
+ *
+ * FIXME should this be linked to the parent iteration
+ * instead?
+ */
if (iterationProvenanceItem != null) {
// set the index to the one from the event
IterationProvenanceItem iterationProvenanceItem1 = new IterationProvenanceItem();
@@ -160,10 +159,9 @@
//
// if (owningProcess.equalsIgnoreCase(owner)
// && indexString
-// .equalsIgnoreCase(indexString2)) {
+// .equalsIgnoreCase(indexString2))
// iterationProvenanceItem1.setParentId(entrySet
// .getKey().getIdentifier());
-// }
// }
// for (Entry<InputDataProvenanceItem, List<Object>> entrySet : inputDataProvenanceItemMap
// .entrySet()) {
@@ -174,11 +172,9 @@
// String indexString2 = indexStr(index);
// if (owningProcess.equalsIgnoreCase(owner)
// && indexString
-// .equalsIgnoreCase(indexString2)) {
+// .equalsIgnoreCase(indexString2))
// iterationProvenanceItem1
// .setInputDataItem(entrySet.getKey());
-// }
-//
// }
// for (ActivityProvenanceItem item :
@@ -206,12 +202,11 @@
iterationProvenanceItem1);
return iterationProvenanceItem1;
}
- // if we have not found an iteration items and the index
- // is
- // [] then something is wrong
- // remove the last index in the int array before we go
- // back
- // through the while
+ /*
+ * if we have not found an iteration items and the index is
+ * [] then something is wrong remove the last index in the
+ * int array before we go back through the while
+ */
} catch (IllegalStateException e) {
logger
.warn("Cannot find a parent iteration with index [] for owning process: "
@@ -250,11 +245,10 @@
}
private String indexStr(int[] index) {
- String indexStr = "";
- for (int ind : index) {
- indexStr += ":" + ind;
- }
- return indexStr;
+ StringBuilder indexStr = new StringBuilder();
+ for (int ind : index)
+ indexStr.append(":").append(ind);
+ return indexStr.toString();
}
/**
@@ -265,14 +259,8 @@
*/
@SuppressWarnings("unused")
private String[] stripLastIndex(int[] index) {
- String indexStr = "";
- for (int ind : index) {
- indexStr += ":" + ind;
- }
// will be in form :1:2:3
- String[] split = indexStr.split(":");
-
- return split;
+ return indexStr(index).split(":");
}
/**
@@ -282,17 +270,19 @@
* @return
*/
private int[] removeLastIndex(int[] index) {
- if (index.length == 0) {
+ if (index.length == 0)
throw new IllegalStateException(
"There is no parent iteration of index [] for this result");
- }
int[] newIntArray = new int[index.length - 1];
- for (int i = 0; i < index.length - 1; i++) {
+ for (int i = 0; i < index.length - 1; i++)
newIntArray[i] = index[i];
- }
return newIntArray;
}
+ private static String uuid() {
+ return UUID.randomUUID().toString();
+ }
+
/**
* Create an {@link ErrorProvenanceItem} and send across to the
* {@link ProvenanceConnector}
@@ -310,7 +300,7 @@
errorItem.setMessage(errorEvent.getMessage());
errorItem.setProcessId(errorEvent.getOwningProcess());
- errorItem.setIdentifier(UUID.randomUUID().toString());
+ errorItem.setIdentifier(uuid());
errorItem.setParentId(iterationProvItem.getIdentifier());
// iterationProvItem.setErrorItem(errorItem);
// FIXME don't need to add to the processor item earlier
@@ -337,14 +327,14 @@
provenanceItem.setFacadeID(split[0]);
provenanceItem.setDataflowID(split[1]);
provenanceItem.setProcessId(jobEvent.getOwningProcess());
- provenanceItem.setIdentifier(UUID.randomUUID().toString());
+ provenanceItem.setIdentifier(uuid());
provenanceItem.setParentId(workflowItem.getIdentifier());
ProcessorProvenanceItem processorProvItem;
processorProvItem = new ProcessorProvenanceItem();
processorProvItem.setWorkflowId(parentDataflowId);
processorProvItem.setProcessId(jobEvent
.getOwningProcess());
- processorProvItem.setIdentifier(UUID.randomUUID().toString());
+ processorProvItem.setIdentifier(uuid());
processorProvItem.setParentId(provenanceItem.getIdentifier());
provenanceItem.setProcessId(jobEvent.getOwningProcess());
getReporter().addProvenanceItem(provenanceItem);
@@ -354,8 +344,7 @@
iterationProvItem = new IterationProvenanceItem();
iterationProvItem.setWorkflowId(parentDataflowId);
iterationProvItem.setIteration(jobEvent.getIndex());
- iterationProvItem.setIdentifier(UUID.randomUUID().toString());
-
+ iterationProvItem.setIdentifier(uuid());
ReferenceService referenceService = jobEvent.getContext()
.getReferenceService();
@@ -363,11 +352,11 @@
InputDataProvenanceItem inputDataItem = new InputDataProvenanceItem();
inputDataItem.setDataMap(jobEvent.getData());
inputDataItem.setReferenceService(referenceService);
- inputDataItem.setIdentifier(UUID.randomUUID().toString());
+ inputDataItem.setIdentifier(uuid());
inputDataItem.setParentId(iterationProvItem.getIdentifier());
inputDataItem.setProcessId(jobEvent.getOwningProcess());
- List<Object> inputIndexOwnerList = new ArrayList<Object>();
+ List<Object> inputIndexOwnerList = new ArrayList<>();
inputIndexOwnerList.add(jobEvent.getIndex());
inputIndexOwnerList.add(jobEvent.getOwningProcess());
inputDataProvenanceItemMap.put(inputDataItem, inputIndexOwnerList);
@@ -377,17 +366,17 @@
iterationProvItem.setIteration(jobEvent.getIndex());
iterationProvItem.setProcessId(jobEvent.getOwningProcess());
- for (Activity<?> activity : jobEvent.getActivities()) {
+ for (Activity<?> activity : jobEvent.getActivities())
if (activity instanceof AsynchronousActivity) {
ActivityProvenanceItem activityProvItem = new ActivityProvenanceItem();
activityProvItem.setWorkflowId(parentDataflowId);
- activityProvItem.setIdentifier(UUID.randomUUID().toString());
+ activityProvItem.setIdentifier(uuid());
iterationProvItem.setParentId(activityProvItem.getIdentifier());
// getConnector().addProvenanceItem(iterationProvItem);
activityProvItem.setParentId(processorProvItem.getIdentifier());
// processorProvItem.setActivityProvenanceItem(activityProvItem);
activityProvItem.setProcessId(jobEvent.getOwningProcess());
- List<Object> activityIndexOwnerList = new ArrayList<Object>();
+ List<Object> activityIndexOwnerList = new ArrayList<>();
activityIndexOwnerList.add(jobEvent.getOwningProcess());
activityIndexOwnerList.add(jobEvent.getIndex());
activityProvenanceItemMap.put(activityProvItem,
@@ -397,10 +386,9 @@
getReporter().addProvenanceItem(activityProvItem);
break;
}
- }
getIndexesByProcess(jobEvent.getOwningProcess()).put(
indexStr(jobEvent.getIndex()), iterationProvItem);
- iterationProvItem.setEnactmentStarted(new Timestamp(System.currentTimeMillis()));
+ iterationProvItem.setEnactmentStarted(new Timestamp(currentTimeMillis()));
getReporter().addProvenanceItem(iterationProvItem);
} catch (RuntimeException ex) {
logger.error("Could not store provenance for " + jobEvent, ex);
@@ -411,7 +399,6 @@
@Override
public void receiveJobQueue(DispatchJobQueueEvent jobQueueEvent) {
-
super.receiveJobQueue(jobQueueEvent);
}
@@ -425,16 +412,15 @@
try {
// FIXME use the connector from the result event context
IterationProvenanceItem iterationProvItem = getIterationProvItem(resultEvent);
- iterationProvItem.setEnactmentEnded(new Timestamp(System.currentTimeMillis()));
+ iterationProvItem.setEnactmentEnded(new Timestamp(currentTimeMillis()));
ReferenceService referenceService = resultEvent.getContext()
.getReferenceService();
-
-
+
OutputDataProvenanceItem outputDataItem = new OutputDataProvenanceItem();
outputDataItem.setDataMap(resultEvent.getData());
outputDataItem.setReferenceService(referenceService);
- outputDataItem.setIdentifier(UUID.randomUUID().toString());
+ outputDataItem.setIdentifier(uuid());
outputDataItem.setProcessId(resultEvent.getOwningProcess());
outputDataItem.setParentId(iterationProvItem.getIdentifier());
iterationProvItem.setOutputDataItem(outputDataItem);
@@ -446,7 +432,6 @@
// add xencoding of data value here??
// Map<String, T2Reference> inputDataMap = iterationProvItem.getInputDataItem().getDataMap();
// for(Map.Entry<String, T2Reference> entry:inputDataMap.entrySet()) {
- //
// // create a simpler bean that we can serialize?
//
// T2Reference ref = entry.getValue();
@@ -472,11 +457,8 @@
super.receiveResult(resultEvent);
}
-
-
@Override
public void receiveResultCompletion(DispatchCompletionEvent completionEvent) {
- // TODO Auto-generated method stub
super.receiveResultCompletion(completionEvent);
}
@@ -505,21 +487,22 @@
this.workflowItem = workflowItem;
}
-
- public static String SerializeParam(Object ParamValue) {
- ByteArrayOutputStream BStream = new ByteArrayOutputStream();
- XMLEncoder encoder = new XMLEncoder(BStream);
- encoder.writeObject(ParamValue);
- encoder.close();
- return BStream.toString();
- }
-
- public static Object DeserializeParam (String SerializedParam) {
- InputStream IStream = new ByteArrayInputStream(SerializedParam.getBytes());
- XMLDecoder decoder = new XMLDecoder(IStream);
- Object output = decoder.readObject();
- decoder.close();
- return output;
- }
-
+ // TODO is this unused?
+ public static String SerializeParam(Object ParamValue) {
+ ByteArrayOutputStream BStream = new ByteArrayOutputStream();
+ XMLEncoder encoder = new XMLEncoder(BStream);
+ encoder.writeObject(ParamValue);
+ encoder.close();
+ return BStream.toString();
+ }
+
+ // TODO is this unused?
+ public static Object DeserializeParam(String SerializedParam) {
+ InputStream IStream = new ByteArrayInputStream(
+ SerializedParam.getBytes());
+ XMLDecoder decoder = new XMLDecoder(IStream);
+ Object output = decoder.readObject();
+ decoder.close();
+ return output;
+ }
}
diff --git a/workflowmodel-core-extensions/src/main/java/net/sf/taverna/t2/workflowmodel/processor/dispatch/layers/Invoke.java b/workflowmodel-core-extensions/src/main/java/net/sf/taverna/t2/workflowmodel/processor/dispatch/layers/Invoke.java
index 5397112..f8403df 100644
--- a/workflowmodel-core-extensions/src/main/java/net/sf/taverna/t2/workflowmodel/processor/dispatch/layers/Invoke.java
+++ b/workflowmodel-core-extensions/src/main/java/net/sf/taverna/t2/workflowmodel/processor/dispatch/layers/Invoke.java
@@ -28,7 +28,6 @@
import java.sql.Date;
import java.util.HashMap;
import java.util.HashSet;
-import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
@@ -77,28 +76,31 @@
@DispatchLayerJobReaction(emits = { ERROR, RESULT_COMPLETION, RESULT }, relaysUnmodified = false, stateEffects = {})
@ControlBoundary
public class Invoke extends AbstractDispatchLayer<JsonNode> {
-
public static final String URI = "http://ns.taverna.org.uk/2010/scufl2/taverna/dispatchlayer/Invoke";
-
private static Logger logger = Logger.getLogger(Invoke.class);
-
private static Long invocationCount = 0L;
+ private MonitorManager monMan;
+
private static String getNextProcessID() {
+ long count;
synchronized (invocationCount) {
- invocationCount = invocationCount + 1L;
+ count = ++invocationCount;
}
- return "invocation" + invocationCount;
+ return "invocation" + count;
}
public Invoke() {
super();
+ monMan = MonitorManager.getInstance();
}
+ @Override
public void configure(JsonNode config) {
// No configuration, do nothing
}
+ @Override
public JsonNode getConfiguration() {
return null;
}
@@ -117,100 +119,103 @@
*/
@Override
public void receiveJob(final DispatchJobEvent jobEvent) {
- for (Activity<?> activity : jobEvent.getActivities()) {
-
+ for (Activity<?> activity : jobEvent.getActivities())
if (activity instanceof AsynchronousActivity) {
- // Register with the monitor
- final String invocationProcessIdentifier = jobEvent
- .pushOwningProcess(getNextProcessID())
- .getOwningProcess();
- MonitorManager.getInstance().registerNode(activity,
- invocationProcessIdentifier,
- new HashSet<MonitorableProperty<?>>());
- MonitorManager.getInstance().registerNode(jobEvent,
- invocationProcessIdentifier,
- new HashSet<MonitorableProperty<?>>());
-
- // The activity is an AsynchronousActivity so we invoke it with
- // an AsynchronousActivityCallback object containing appropriate
- // callback methods to push results, completions and failures
- // back to the invocation layer.
- final AsynchronousActivity<?> asyncActivity = (AsynchronousActivity<?>) activity;
-
- // Get the registered DataManager for this process. In most
- // cases this will just be a single DataManager for the entire
- // workflow system but it never hurts to generalize
-
- InvocationContext context = jobEvent.getContext();
- final ReferenceService refService = context
- .getReferenceService();
-
- InvocationStartedProvenanceItem invocationItem = null;
- ProvenanceReporter provenanceReporter = context.getProvenanceReporter();
- if (provenanceReporter != null) {
- IntermediateProvenance intermediateProvenance = findIntermediateProvenance();
- if (intermediateProvenance != null) {
- invocationItem = new InvocationStartedProvenanceItem();
- IterationProvenanceItem parentItem = intermediateProvenance.getIterationProvItem(jobEvent);
- invocationItem.setIdentifier(UUID.randomUUID().toString());
- invocationItem.setActivity(asyncActivity);
- invocationItem.setProcessId(jobEvent.getOwningProcess());
- invocationItem.setInvocationProcessId(invocationProcessIdentifier);
- invocationItem.setParentId(parentItem.getIdentifier());
- invocationItem.setWorkflowId(parentItem.getWorkflowId());
- invocationItem.setInvocationStarted(new Date(System.currentTimeMillis()));
- provenanceReporter.addProvenanceItem(invocationItem);
- }
- }
-
- // Create a Map of EntityIdentifiers named appropriately given
- // the activity mapping
- Map<String, T2Reference> inputData = new HashMap<String, T2Reference>();
- for (String inputName : jobEvent.getData().keySet()) {
- String activityInputName = asyncActivity
- .getInputPortMapping().get(inputName);
- if (activityInputName != null) {
- inputData.put(activityInputName, jobEvent.getData()
- .get(inputName));
- }
- }
-
- // Create a callback object to receive events, completions and
- // failure notifications from the activity
- AsynchronousActivityCallback callback = new InvokeCallBack(
- jobEvent, refService, invocationProcessIdentifier,
- asyncActivity);
-
- if (asyncActivity instanceof MonitorableAsynchronousActivity<?>) {
- // Monitorable activity so get the monitorable properties
- // and push them into the state tree after launching the job
- MonitorableAsynchronousActivity<?> maa = (MonitorableAsynchronousActivity<?>) asyncActivity;
- Set<MonitorableProperty<?>> props = maa
- .executeAsynchWithMonitoring(inputData, callback);
- MonitorManager.getInstance().addPropertiesToNode(
- invocationProcessIdentifier.split(":"), props);
- } else {
- // Run the job, passing in the callback we've just created
- // along with the (possibly renamed) input data map
- asyncActivity.executeAsynch(inputData, callback);
- }
- return;
+ invoke(jobEvent, (AsynchronousActivity<?>) activity);
+ break;
}
+ }
+
+ protected void invoke(final DispatchJobEvent jobEvent, final AsynchronousActivity<?> activity) {
+ // Register with the monitor
+ final String invocationProcessIdentifier = jobEvent.pushOwningProcess(
+ getNextProcessID()).getOwningProcess();
+ monMan.registerNode(activity, invocationProcessIdentifier,
+ new HashSet<MonitorableProperty<?>>());
+ monMan.registerNode(jobEvent, invocationProcessIdentifier,
+ new HashSet<MonitorableProperty<?>>());
+
+ /*
+ * The activity is an AsynchronousActivity so we invoke it with an
+ * AsynchronousActivityCallback object containing appropriate callback
+ * methods to push results, completions and failures back to the
+ * invocation layer.
+ *
+ * Get the registered DataManager for this process. In most cases this
+ * will just be a single DataManager for the entire workflow system but
+ * it never hurts to generalize
+ */
+
+ InvocationContext context = jobEvent.getContext();
+ final ReferenceService refService = context.getReferenceService();
+
+ InvocationStartedProvenanceItem invocationItem = null;
+ ProvenanceReporter provenanceReporter = context.getProvenanceReporter();
+ if (provenanceReporter != null) {
+ IntermediateProvenance intermediateProvenance = findIntermediateProvenance();
+ if (intermediateProvenance != null) {
+ invocationItem = new InvocationStartedProvenanceItem();
+ IterationProvenanceItem parentItem = intermediateProvenance.getIterationProvItem(jobEvent);
+ invocationItem.setIdentifier(UUID.randomUUID().toString());
+ invocationItem.setActivity(activity);
+ invocationItem.setProcessId(jobEvent.getOwningProcess());
+ invocationItem.setInvocationProcessId(invocationProcessIdentifier);
+ invocationItem.setParentId(parentItem.getIdentifier());
+ invocationItem.setWorkflowId(parentItem.getWorkflowId());
+ invocationItem.setInvocationStarted(new Date(System.currentTimeMillis()));
+ provenanceReporter.addProvenanceItem(invocationItem);
+ }
+ }
+
+ /*
+ * Create a Map of EntityIdentifiers named appropriately given the
+ * activity mapping
+ */
+ Map<String, T2Reference> inputData = new HashMap<>();
+ for (String inputName : jobEvent.getData().keySet()) {
+ String activityInputName = activity
+ .getInputPortMapping().get(inputName);
+ if (activityInputName != null)
+ inputData.put(activityInputName, jobEvent.getData()
+ .get(inputName));
+ }
+
+ /*
+ * Create a callback object to receive events, completions and failure
+ * notifications from the activity
+ */
+ AsynchronousActivityCallback callback = new InvokeCallBack(
+ jobEvent, refService, invocationProcessIdentifier,
+ activity);
+
+ if (activity instanceof MonitorableAsynchronousActivity<?>) {
+ /*
+ * Monitorable activity so get the monitorable properties and push
+ * them into the state tree after launching the job
+ */
+ MonitorableAsynchronousActivity<?> maa = (MonitorableAsynchronousActivity<?>) activity;
+ Set<MonitorableProperty<?>> props = maa
+ .executeAsynchWithMonitoring(inputData, callback);
+ monMan.addPropertiesToNode(invocationProcessIdentifier.split(":"), props);
+ } else {
+ /*
+ * Run the job, passing in the callback we've just created along
+ * with the (possibly renamed) input data map
+ */
+ activity.executeAsynch(inputData, callback);
}
}
protected IntermediateProvenance findIntermediateProvenance() {
- List<DispatchLayer<?>> layers = getProcessor().getDispatchStack().getLayers();
- for (DispatchLayer<?> layer : layers) {
- if (layer instanceof IntermediateProvenance) {
+ for (DispatchLayer<?> layer : getProcessor().getDispatchStack()
+ .getLayers())
+ if (layer instanceof IntermediateProvenance)
return (IntermediateProvenance) layer;
- }
- }
return null;
}
protected class InvokeCallBack implements AsynchronousActivityCallback {
- protected final AsynchronousActivity<?> asyncActivity;
+ protected final AsynchronousActivity<?> activity;
protected final String invocationProcessIdentifier;
protected final DispatchJobEvent jobEvent;
protected final ReferenceService refService;
@@ -223,72 +228,74 @@
this.jobEvent = jobEvent;
this.refService = refService;
this.invocationProcessIdentifier = invocationProcessIdentifier;
- this.asyncActivity = asyncActivity;
+ this.activity = asyncActivity;
}
+ @Override
public void fail(String message) {
fail(message, null);
}
+ @Override
public void fail(String message, Throwable t) {
fail(message, t, DispatchErrorType.INVOCATION);
}
+ @Override
public void fail(String message, Throwable t,
DispatchErrorType errorType) {
- logger.warn("Failed (" + errorType + ") invoking " + asyncActivity
+ logger.warn("Failed (" + errorType + ") invoking " + activity
+ " for job " + jobEvent + ": " + message, t);
- MonitorManager.getInstance().deregisterNode(
+ monMan.deregisterNode(
invocationProcessIdentifier);
getAbove().receiveError(
new DispatchErrorEvent(jobEvent.getOwningProcess(),
jobEvent.getIndex(), jobEvent.getContext(),
- message, t, errorType, asyncActivity));
+ message, t, errorType, activity));
}
+ @Override
public InvocationContext getContext() {
return jobEvent.getContext();
}
+ @Override
public String getParentProcessIdentifier() {
return invocationProcessIdentifier;
}
+ @Override
public void receiveCompletion(int[] completionIndex) {
- if (completionIndex.length == 0) {
+ if (completionIndex.length == 0)
// Final result, clean up monitor state
- MonitorManager.getInstance().deregisterNode(
- invocationProcessIdentifier);
- }
+ monMan.deregisterNode(invocationProcessIdentifier);
if (sentJob) {
int[] newIndex;
- if (completionIndex.length == 0) {
+ if (completionIndex.length == 0)
newIndex = jobEvent.getIndex();
- } else {
+ else {
newIndex = new int[jobEvent.getIndex().length
+ completionIndex.length];
int i = 0;
- for (int indexValue : jobEvent.getIndex()) {
+ for (int indexValue : jobEvent.getIndex())
newIndex[i++] = indexValue;
- }
- for (int indexValue : completionIndex) {
+ for (int indexValue : completionIndex)
newIndex[i++] = indexValue;
- }
}
DispatchCompletionEvent c = new DispatchCompletionEvent(
jobEvent.getOwningProcess(), newIndex, jobEvent
.getContext());
getAbove().receiveResultCompletion(c);
} else {
- // We haven't sent any 'real' data prior to
- // completing a stream. This in effect means we're
- // sending an empty top level collection so we need
- // to register empty collections for each output
- // port with appropriate depth (by definition if
- // we're streaming all outputs are collection types
- // of some kind)
- Map<String, T2Reference> emptyListMap = new HashMap<String, T2Reference>();
- for (OutputPort op : asyncActivity.getOutputPorts()) {
+ /*
+ * We haven't sent any 'real' data prior to completing a stream.
+ * This in effect means we're sending an empty top level
+ * collection so we need to register empty collections for each
+ * output port with appropriate depth (by definition if we're
+ * streaming all outputs are collection types of some kind)
+ */
+ Map<String, T2Reference> emptyListMap = new HashMap<>();
+ for (OutputPort op : activity.getOutputPorts()) {
String portName = op.getName();
int portDepth = op.getDepth();
emptyListMap.put(portName, refService.getListService()
@@ -296,67 +303,67 @@
}
receiveResult(emptyListMap, new int[0]);
}
-
}
+ @Override
public void receiveResult(Map<String, T2Reference> data, int[] index) {
-
- // Construct a new result map using the activity mapping
- // (activity output name to processor output name)
- Map<String, T2Reference> resultMap = new HashMap<String, T2Reference>();
+ /*
+ * Construct a new result map using the activity mapping (activity
+ * output name to processor output name)
+ */
+ Map<String, T2Reference> resultMap = new HashMap<>();
for (String outputName : data.keySet()) {
- String processorOutputName = asyncActivity
+ String processorOutputName = activity
.getOutputPortMapping().get(outputName);
- if (processorOutputName != null) {
+ if (processorOutputName != null)
resultMap.put(processorOutputName, data.get(outputName));
- }
}
- // Construct a new index array if the specified index is
- // non zero length, otherwise just use the original
- // job's index array (means we're not streaming)
+ /*
+ * Construct a new index array if the specified index is non zero
+ * length, otherwise just use the original job's index array (means
+ * we're not streaming)
+ */
int[] newIndex;
boolean streaming = false;
- if (index.length == 0) {
+ if (index.length == 0)
newIndex = jobEvent.getIndex();
- } else {
+ else {
streaming = true;
newIndex = new int[jobEvent.getIndex().length + index.length];
int i = 0;
- for (int indexValue : jobEvent.getIndex()) {
+ for (int indexValue : jobEvent.getIndex())
newIndex[i++] = indexValue;
- }
- for (int indexValue : index) {
+ for (int indexValue : index)
newIndex[i++] = indexValue;
- }
}
DispatchResultEvent resultEvent = new DispatchResultEvent(jobEvent
.getOwningProcess(), newIndex, jobEvent.getContext(),
resultMap, streaming);
if (!streaming) {
- MonitorManager.getInstance().registerNode(resultEvent,
- invocationProcessIdentifier,
+ monMan.registerNode(resultEvent, invocationProcessIdentifier,
new HashSet<MonitorableProperty<?>>());
// Final result, clean up monitor state
- MonitorManager.getInstance().deregisterNode(
- invocationProcessIdentifier);
+ monMan.deregisterNode(invocationProcessIdentifier);
}
- // Push the modified data to the layer above in the
- // dispatch stack
+ // Push the modified data to the layer above in the dispatch stack
getAbove().receiveResult(resultEvent);
sentJob = true;
}
+ @Override
public void requestRun(Runnable runMe) {
String newThreadName = jobEvent.toString();
Thread thread = new Thread(runMe, newThreadName);
- thread.setContextClassLoader(asyncActivity.getClass().getClassLoader() );
- thread.setUncaughtExceptionHandler(new UncaughtExceptionHandler(){
+ thread.setContextClassLoader(activity.getClass()
+ .getClassLoader());
+ thread.setUncaughtExceptionHandler(new UncaughtExceptionHandler() {
+ @Override
public void uncaughtException(Thread t, Throwable e) {
- fail("Uncaught exception while invoking " + asyncActivity, e);
- }});
+ fail("Uncaught exception while invoking " + activity, e);
+ }
+ });
thread.start();
}
}
-
}
diff --git a/workflowmodel-core-extensions/src/main/java/net/sf/taverna/t2/workflowmodel/processor/dispatch/layers/Loop.java b/workflowmodel-core-extensions/src/main/java/net/sf/taverna/t2/workflowmodel/processor/dispatch/layers/Loop.java
index 9085268..d5077a4 100644
--- a/workflowmodel-core-extensions/src/main/java/net/sf/taverna/t2/workflowmodel/processor/dispatch/layers/Loop.java
+++ b/workflowmodel-core-extensions/src/main/java/net/sf/taverna/t2/workflowmodel/processor/dispatch/layers/Loop.java
@@ -56,7 +56,6 @@
* has been set as the
* {@link LoopConfiguration#setCondition(net.sf.taverna.t2.workflowmodel.processor.activity.Activity)
* condition}.
- * </p>
* <p>
* After a job has been successful further down the dispatch stack, the loop
* layer will invoke the conditional activity to determine if the job will be
@@ -64,47 +63,39 @@
* will be performed even before the first invocation. (The default
* runFirst=true is equivalent to a do..while construct, while runFirst=false is
* equivalent to a while.. construct.)
- * </p>
* <p>
* A job will be resent down the dispatch stack only if the conditional activity
* returns a reference to a string equal to "true" on its output port "loop".
- * </p>
* <p>
* If a job or the conditional activity fails, the while-loop is interrupted and
* the error is sent further up.
- * </p>
* <p>
* Note that the LoopLayer will be invoked for each item in an iteration, if you
* want to do the loop for the whole collection (ie. re-iterating if the
* loop-condition fails after processing the full list) - create a nested
* workflow with the desired depths on it's input ports and insert this
* LoopLayer in the stack of the nested workflow's processor in parent workflow.
- * </p>
* <p>
* It is recommended that the LoopLayer is to be inserted after the
* {@link ErrorBounce} layer, as this layer is needed for registering errors
* produced by the LoopLayer. If the user requires {@link Retry retries} and
* {@link Failover failovers} before checking the while condition, such layers
* should be below LoopLayer.
- * </p>
*
* @author Stian Soiland-Reyes
- *
*/
-
-@SuppressWarnings("unchecked")
+// FIXME Doesn't work
+@SuppressWarnings({"unchecked","rawtypes"})
public class Loop extends AbstractDispatchLayer<JsonNode> {
-
public static final String URI = "http://ns.taverna.org.uk/2010/scufl2/taverna/dispatchlayer/Loop";
-
private static Logger logger = Logger.getLogger(Loop.class);
private JsonNode config = JsonNodeFactory.instance.objectNode();
- protected Map<String, AbstractDispatchEvent> incomingJobs = new HashMap<String, AbstractDispatchEvent>();
+ protected Map<String, AbstractDispatchEvent> incomingJobs = new HashMap<>();
+ protected Map<String, AbstractDispatchEvent> outgoingJobs = new HashMap<>();
- protected Map<String, AbstractDispatchEvent> outgoingJobs = new HashMap<String, AbstractDispatchEvent>();
-
+ @Override
public void configure(JsonNode config) {
this.config = config;
}
@@ -113,21 +104,18 @@
public void finishedWith(String owningProcess) {
String prefix = owningProcess + "[";
synchronized (outgoingJobs) {
- for (String key : new ArrayList<String>(outgoingJobs.keySet())) {
- if (key.startsWith(prefix)) {
+ for (String key : new ArrayList<>(outgoingJobs.keySet()))
+ if (key.startsWith(prefix))
outgoingJobs.remove(key);
- }
- }
}
synchronized (incomingJobs) {
- for (String key : new ArrayList<String>(incomingJobs.keySet())) {
- if (key.startsWith(prefix)) {
+ for (String key : new ArrayList<>(incomingJobs.keySet()))
+ if (key.startsWith(prefix))
incomingJobs.remove(key);
- }
- }
}
}
+ @Override
public JsonNode getConfiguration() {
return config;
}
@@ -157,10 +145,15 @@
}
checkCondition(jobQueueEvent);
}
+
+ private Activity<?> getCondition() {
+ //return config.getCondition();
+ return null;
+ }
@Override
public void receiveResult(DispatchResultEvent resultEvent) {
- Activity<?> condition = null;//config.getCondition();
+ Activity<?> condition = getCondition();
if (condition == null) {
super.receiveResult(resultEvent);
return;
@@ -173,7 +166,7 @@
@Override
public void receiveResultCompletion(DispatchCompletionEvent completionEvent) {
- Activity<?> condition = null;//config.getCondition();
+ Activity<?> condition = getCondition();
if (condition == null) {
super.receiveResultCompletion(completionEvent);
return;
@@ -185,7 +178,7 @@
}
private void checkCondition(AbstractDispatchEvent event) {
- Activity<?> condition = null;//config.getCondition();
+ Activity<?> condition = getCondition();
if (condition == null) {
super.receiveError(new DispatchErrorEvent(event.getOwningProcess(),
event.getIndex(), event.getContext(),
@@ -216,20 +209,19 @@
private Map<String, T2Reference> prepareInputs(
AbstractAsynchronousActivity asyncCondition, String jobIdentifier) {
- Map<String, T2Reference> inputs = new HashMap<String, T2Reference>();
+ Map<String, T2Reference> inputs = new HashMap<>();
Map<String, T2Reference> inData = getInData(jobIdentifier);
Map<String, T2Reference> outData = getOutData(jobIdentifier);
Set<ActivityInputPort> inputPorts = asyncCondition.getInputPorts();
for (ActivityInputPort conditionIn : inputPorts) {
String conditionPort = conditionIn.getName();
- if (outData.containsKey(conditionPort)) {
+ if (outData.containsKey(conditionPort))
// Copy from previous output
inputs.put(conditionPort, outData.get(conditionPort));
- } else if (inData.containsKey(conditionPort)) {
+ else if (inData.containsKey(conditionPort))
// Copy from original input
inputs.put(conditionPort, inData.get(conditionPort));
- }
}
return inputs;
}
@@ -239,10 +231,9 @@
synchronized (incomingJobs) {
inEvent = incomingJobs.get(jobIdentifier);
}
- Map<String, T2Reference> inData = new HashMap<String, T2Reference>();
- if (inEvent instanceof DispatchJobEvent) {
+ Map<String, T2Reference> inData = new HashMap<>();
+ if (inEvent instanceof DispatchJobEvent)
inData = ((DispatchJobEvent) inEvent).getData();
- }
return inData;
}
@@ -251,10 +242,9 @@
synchronized (outgoingJobs) {
outEvent = outgoingJobs.get(jobIdentifier);
}
- Map<String, T2Reference> outData = new HashMap<String, T2Reference>();
- if (outEvent instanceof DispatchResultEvent) {
+ Map<String, T2Reference> outData = new HashMap<>();
+ if (outEvent instanceof DispatchResultEvent)
outData = ((DispatchResultEvent) outEvent).getData();
- }
return outData;
}
@@ -266,7 +256,6 @@
public static final String LOOP_PORT = "loop";
-
public class ConditionCallBack implements AsynchronousActivityCallback {
private InvocationContext context;
private final String jobIdentifier;
@@ -282,14 +271,17 @@
processId = originalEvent.getOwningProcess() + ":condition";
}
+ @Override
public void fail(String message) {
fail(message, null, DispatchErrorType.INVOCATION);
}
+ @Override
public void fail(String message, Throwable t) {
fail(message, t, DispatchErrorType.INVOCATION);
}
+ @Override
public void fail(String message, Throwable t,
DispatchErrorType errorType) {
logger.warn("Failed (" + errorType + ") invoking condition service "
@@ -306,18 +298,22 @@
DispatchErrorType.INVOCATION, null));
}
+ @Override
public InvocationContext getContext() {
return context;
}
+ @Override
public String getParentProcessIdentifier() {
return processId;
}
+ @Override
public void receiveCompletion(int[] completionIndex) {
// Ignore streaming
}
+ @Override
public void receiveResult(Map<String, T2Reference> data, int[] index) {
if (index.length > 0) {
// Ignore streaming
@@ -395,18 +391,21 @@
.getOwningProcess(), dispatchEvent.getIndex(),
dispatchEvent.getContext(), newInputs,
((DispatchJobEvent) dispatchEvent).getActivities());
- // TODO: Should this be registered as an incomingJobs? If so the
- // conditional
- // could even feed to itself, and we should also keep a list of
- // originalJobs.
+ /*
+ * TODO: Should this be registered as an incomingJobs? If so the
+ * conditional could even feed to itself, and we should also keep a
+ * list of originalJobs.
+ */
return newJobEvent;
}
+ @Override
public void requestRun(Runnable runMe) {
String newThreadName = "Condition service "
+ getParentProcessIdentifier();
Thread thread = new Thread(runMe, newThreadName);
thread.setUncaughtExceptionHandler(new UncaughtExceptionHandler() {
+ @Override
public void uncaughtException(Thread t, Throwable e) {
fail("Uncaught exception while invoking " + jobIdentifier,
e);
@@ -416,11 +415,10 @@
}
}
+ @Override
public Processor getProcessor() {
- if (dispatchStack == null) {
+ if (dispatchStack == null)
return null;
- }
return dispatchStack.getProcessor();
}
-
}
diff --git a/workflowmodel-core-extensions/src/main/java/net/sf/taverna/t2/workflowmodel/processor/dispatch/layers/LoopConfiguration.java b/workflowmodel-core-extensions/src/main/java/net/sf/taverna/t2/workflowmodel/processor/dispatch/layers/LoopConfiguration.java
index 9fb138a..7cfa2a5 100644
--- a/workflowmodel-core-extensions/src/main/java/net/sf/taverna/t2/workflowmodel/processor/dispatch/layers/LoopConfiguration.java
+++ b/workflowmodel-core-extensions/src/main/java/net/sf/taverna/t2/workflowmodel/processor/dispatch/layers/LoopConfiguration.java
@@ -6,8 +6,6 @@
import net.sf.taverna.t2.workflowmodel.processor.config.ConfigurationBean;
import net.sf.taverna.t2.workflowmodel.processor.config.ConfigurationProperty;
-import org.apache.log4j.Logger;
-
/**
* Configuration bean for the {@link Loop}.
* <p>
@@ -20,25 +18,20 @@
* before invoking the job for the first time, otherwise the condition will be
* invoked after the job has come back with successful results.
* </p>
- *
+ *
* @author Stian Soiland-Reyes
- *
+ *
*/
@ConfigurationBean(uri = Loop.URI + "#Config")
public class LoopConfiguration implements Cloneable {
-
- transient private static Logger logger = Logger
- .getLogger(LoopConfiguration.class);
-
private Activity<?> condition = null;
private Boolean runFirst;
private Properties properties;
public Properties getProperties() {
synchronized (this) {
- if (properties == null) {
+ if (properties == null)
properties = new Properties();
- }
}
return properties;
}
@@ -65,20 +58,18 @@
}
public boolean isRunFirst() {
- if (runFirst == null) {
+ if (runFirst == null)
return true;
- }
return runFirst;
}
- @ConfigurationProperty(name = "condition", label = "Condition Activity", description = "The condition activity with an output port called \"loop\"", required=false)
+ @ConfigurationProperty(name = "condition", label = "Condition Activity", description = "The condition activity with an output port called \"loop\"", required = false)
public void setCondition(Activity<?> activity) {
this.condition = activity;
}
- @ConfigurationProperty(name = "runFirst", label = "Check Condition On Run First", description = "Whether to check the condition before invoking the job for the first time", required=false)
+ @ConfigurationProperty(name = "runFirst", label = "Check Condition On Run First", description = "Whether to check the condition before invoking the job for the first time", required = false)
public void setRunFirst(boolean runFirst) {
this.runFirst = runFirst;
}
-
}
diff --git a/workflowmodel-core-extensions/src/main/java/net/sf/taverna/t2/workflowmodel/processor/dispatch/layers/Parallelize.java b/workflowmodel-core-extensions/src/main/java/net/sf/taverna/t2/workflowmodel/processor/dispatch/layers/Parallelize.java
index abe0df6..bd0e69a 100644
--- a/workflowmodel-core-extensions/src/main/java/net/sf/taverna/t2/workflowmodel/processor/dispatch/layers/Parallelize.java
+++ b/workflowmodel-core-extensions/src/main/java/net/sf/taverna/t2/workflowmodel/processor/dispatch/layers/Parallelize.java
@@ -84,17 +84,12 @@
public class Parallelize extends AbstractDispatchLayer<JsonNode>
implements NotifiableLayer,
PropertyContributingDispatchLayer<JsonNode> {
-
public static final String URI = "http://ns.taverna.org.uk/2010/scufl2/taverna/dispatchlayer/Parallelize";
-
private static Logger logger = Logger.getLogger(Parallelize.class);
- private Map<String, StateModel> stateMap = new HashMap<String, StateModel>();
-
+ private Map<String, StateModel> stateMap = new HashMap<>();
private JsonNode config = JsonNodeFactory.instance.objectNode();
-
int sentJobsCount = 0;
-
int completedJobsCount = 0;
public Parallelize() {
@@ -112,19 +107,19 @@
((ObjectNode)config).put("maxJobs", maxJobs);
}
+ @Override
public void eventAdded(String owningProcess) {
StateModel stateModel;
synchronized (stateMap) {
stateModel = stateMap.get(owningProcess);
}
- if (stateModel == null) {
+ if (stateModel == null)
/*
* Should never see this here, it means we've had duplicate
* completion events from upstream
*/
throw new WorkflowStructureException(
"Unknown owning process " + owningProcess);
- }
synchronized (stateModel) {
stateModel.fillFromQueue();
}
@@ -132,8 +127,9 @@
@Override
public void receiveJobQueue(DispatchJobQueueEvent queueEvent) {
- StateModel model = new StateModel(queueEvent, config.has("maxJobs") ? config.get("maxJobs").intValue() : 1);
- synchronized(stateMap) {
+ StateModel model = new StateModel(queueEvent,
+ config.has("maxJobs") ? config.get("maxJobs").intValue() : 1);
+ synchronized (stateMap) {
stateMap.put(queueEvent.getOwningProcess(), model);
}
model.fillFromQueue();
@@ -144,7 +140,6 @@
"Parallelize layer cannot handle job events");
}
-
@Override
public void receiveError(DispatchErrorEvent errorEvent) {
StateModel model;
@@ -202,8 +197,7 @@
@Override
public void finishedWith(final String owningProcess) {
- // Delay the removal of the state to give the monitor
- // a chance to poll
+ // Delay the removal of the state to give the monitor a chance to poll
cleanupTimer.schedule(new TimerTask() {
@Override
public void run() {
@@ -214,17 +208,18 @@
}, CLEANUP_DELAY_MS);
}
+ @Override
public void configure(JsonNode config) {
this.config = config;
}
+ @Override
public JsonNode getConfiguration() {
return this.config;
}
/**
- * Injects the following properties into its parent processor's property set
- * :
+ * Injects the following properties into its parent processor's property set:
* <ul>
* <li><code>dispatch.parallelize.queuesize [Integer]</code><br/>The current
* size of the incomming job queue, or -1 if the state isn't defined for the
@@ -233,6 +228,7 @@
* some kind.</li>
* </ul>
*/
+ @Override
public void injectPropertiesFor(final String owningProcess) {
/**
* Property for the queue depth, will evaluate to -1 if there isn't a
@@ -240,69 +236,68 @@
* if we haven't created the state yet or the queue has been collected)
*/
MonitorableProperty<Integer> queueSizeProperty = new MonitorableProperty<Integer>() {
-
+ @Override
public Date getLastModified() {
return new Date();
}
+ @Override
public String[] getName() {
return new String[] { "dispatch", "parallelize", "queuesize" };
}
+ @Override
public Integer getValue() throws NoSuchPropertyException {
-
StateModel model;
synchronized(stateMap) {
model = stateMap.get(owningProcess);
}
- if (model != null) {
- return model.queueSize();
- } else {
+ if (model == null)
return -1;
- }
+ return model.queueSize();
}
-
};
dispatchStack.receiveMonitorableProperty(queueSizeProperty,
owningProcess);
MonitorableProperty<Integer> sentJobsProperty = new MonitorableProperty<Integer>() {
-
+ @Override
public Date getLastModified() {
return new Date();
}
+ @Override
public String[] getName() {
return new String[] { "dispatch", "parallelize", "sentjobs" };
}
+ @Override
public Integer getValue() throws NoSuchPropertyException {
return sentJobsCount;
}
-
};
dispatchStack.receiveMonitorableProperty(sentJobsProperty,
owningProcess);
MonitorableProperty<Integer> completedJobsProperty = new MonitorableProperty<Integer>() {
-
+ @Override
public Date getLastModified() {
return new Date();
}
+ @Override
public String[] getName() {
return new String[] { "dispatch", "parallelize",
"completedjobs" };
}
+ @Override
public Integer getValue() throws NoSuchPropertyException {
return completedJobsCount;
}
-
};
dispatchStack.receiveMonitorableProperty(completedJobsProperty,
owningProcess);
-
}
/**
@@ -311,17 +306,13 @@
* @author Tom Oinn
*
*/
+ // suppressed to avoid jdk1.5 error messages caused by the declaration
+ // IterationInternalEvent<? extends IterationInternalEvent<?>> e
+ @SuppressWarnings("rawtypes")
class StateModel {
-
private DispatchJobQueueEvent queueEvent;
-
- @SuppressWarnings("unchecked")
- // suppressed to avoid jdk1.5 error messages caused by the declaration
- // IterationInternalEvent<? extends IterationInternalEvent<?>> e
- private BlockingQueue<IterationInternalEvent> pendingEvents = new LinkedBlockingQueue<IterationInternalEvent>();
-
+ private BlockingQueue<IterationInternalEvent> pendingEvents = new LinkedBlockingQueue<>();
private int activeJobs = 0;
-
private int maximumJobs;
/**
@@ -362,9 +353,6 @@
* jobs list and return
* </ul>
*/
- @SuppressWarnings("unchecked")
- // suppressed to avoid jdk1.5 error messages caused by the declaration
- // IterationInternalEvent<? extends IterationInternalEvent<?>> e
protected void fillFromQueue() {
synchronized (this) {
while (queueEvent.getQueue().peek() != null
@@ -374,6 +362,7 @@
if (e instanceof Completion && pendingEvents.peek() == null) {
new Thread(new Runnable() {
+ @Override
public void run() {
getAbove().receiveResultCompletion(
new DispatchCompletionEvent(e
@@ -415,63 +404,60 @@
* @param index
* @return
*/
- @SuppressWarnings("unchecked")
- // suppressed to avoid jdk1.5 error messages caused by the declaration
- // IterationInternalEvent<? extends IterationInternalEvent<?>> e
protected boolean finishWith(int[] index) {
synchronized (this) {
+ for (IterationInternalEvent e : new ArrayList<>(pendingEvents)) {
+ if (!(e instanceof Job))
+ continue;
+ Job j = (Job) e;
+ if (!arrayEquals(j.getIndex(), index))
+ continue;
- for (IterationInternalEvent e : new ArrayList<IterationInternalEvent>(
- pendingEvents)) {
- if (e instanceof Job) {
- Job j = (Job) e;
- if (arrayEquals(j.getIndex(), index)) {
- // Found a job in the pending events list which has
- // the same index, remove it and decrement the
- // current count of active jobs
- pendingEvents.remove(e);
- activeJobs--;
- completedJobsCount++;
- // Now pull any completion events that have reached
- // the head of the queue - this indicates that all
- // the job events which came in before them have
- // been processed and we can emit the completions
- while (pendingEvents.peek() != null
- && pendingEvents.peek() instanceof Completion) {
- Completion c = (Completion) pendingEvents
- .remove();
- getAbove().receiveResultCompletion(
- new DispatchCompletionEvent(c
- .getOwningProcess(), c
- .getIndex(), c.getContext()));
-
- }
- // Refresh from the queue; as we've just decremented
- // the active job count there should be a worker
- // available
- fillFromQueue();
- // Return true to indicate that we removed a job
- // event from the queue, that is to say that the
- // index wasn't that of a partial completion.
- return true;
- }
+ /*
+ * Found a job in the pending events list which has the
+ * same index, remove it and decrement the current count
+ * of active jobs
+ */
+ pendingEvents.remove(e);
+ activeJobs--;
+ completedJobsCount++;
+ /*
+ * Now pull any completion events that have reached the head
+ * of the queue - this indicates that all the job events
+ * which came in before them have been processed and we can
+ * emit the completions
+ */
+ while (pendingEvents.peek() != null
+ && pendingEvents.peek() instanceof Completion) {
+ Completion c = (Completion) pendingEvents.remove();
+ getAbove().receiveResultCompletion(
+ new DispatchCompletionEvent(c
+ .getOwningProcess(), c.getIndex(), c
+ .getContext()));
}
+ /*
+ * Refresh from the queue; as we've just decremented the
+ * active job count there should be a worker available
+ */
+ fillFromQueue();
+ /*
+ * Return true to indicate that we removed a job event from
+ * the queue, that is to say that the index wasn't that of a
+ * partial completion.
+ */
+ return true;
}
}
return false;
}
private boolean arrayEquals(int[] a, int[] b) {
- if (a.length != b.length) {
+ if (a.length != b.length)
return false;
- }
- for (int i = 0; i < a.length; i++) {
- if (a[i] != b[i]) {
+ for (int i = 0; i < a.length; i++)
+ if (a[i] != b[i])
return false;
- }
- }
return true;
}
}
-
}
diff --git a/workflowmodel-core-extensions/src/main/java/net/sf/taverna/t2/workflowmodel/processor/dispatch/layers/ParallelizeConfig.java b/workflowmodel-core-extensions/src/main/java/net/sf/taverna/t2/workflowmodel/processor/dispatch/layers/ParallelizeConfig.java
index 1cb7594..29d69d6 100644
--- a/workflowmodel-core-extensions/src/main/java/net/sf/taverna/t2/workflowmodel/processor/dispatch/layers/ParallelizeConfig.java
+++ b/workflowmodel-core-extensions/src/main/java/net/sf/taverna/t2/workflowmodel/processor/dispatch/layers/ParallelizeConfig.java
@@ -27,13 +27,11 @@
* Bean to hold the configuration for the parallelize layer, specifically a
* single int property defining the number of concurrent jobs in that processor
* instance per owning process ID.
- *
+ *
* @author Tom Oinn
- *
*/
@ConfigurationBean(uri = Parallelize.URI + "#Config")
public class ParallelizeConfig {
-
private int maxJobs;
public ParallelizeConfig() {
@@ -41,7 +39,7 @@
this.maxJobs = 1;
}
- @ConfigurationProperty(name = "maxJobs", label = "Maximum Parallel Jobs", description = "The maximum number of jobs that can run in parallel", required=false)
+ @ConfigurationProperty(name = "maxJobs", label = "Maximum Parallel Jobs", description = "The maximum number of jobs that can run in parallel", required = false)
public void setMaximumJobs(int maxJobs) {
this.maxJobs = maxJobs;
}
@@ -49,5 +47,4 @@
public int getMaximumJobs() {
return this.maxJobs;
}
-
}
diff --git a/workflowmodel-core-extensions/src/main/java/net/sf/taverna/t2/workflowmodel/processor/dispatch/layers/Retry.java b/workflowmodel-core-extensions/src/main/java/net/sf/taverna/t2/workflowmodel/processor/dispatch/layers/Retry.java
index fad7185..f2054d9 100644
--- a/workflowmodel-core-extensions/src/main/java/net/sf/taverna/t2/workflowmodel/processor/dispatch/layers/Retry.java
+++ b/workflowmodel-core-extensions/src/main/java/net/sf/taverna/t2/workflowmodel/processor/dispatch/layers/Retry.java
@@ -53,32 +53,22 @@
* @author Tom Oinn
* @author David Withers
* @author Stian Soiland-Reyes
- *
*/
@DispatchLayerErrorReaction(emits = { JOB }, relaysUnmodified = true, stateEffects = {
UPDATE_LOCAL_STATE, REMOVE_LOCAL_STATE })
@DispatchLayerJobReaction(emits = {}, relaysUnmodified = true, stateEffects = { CREATE_LOCAL_STATE })
@DispatchLayerResultReaction(emits = {}, relaysUnmodified = true, stateEffects = { REMOVE_LOCAL_STATE })
public class Retry extends AbstractErrorHandlerLayer<JsonNode> {
-
private static final String BACKOFF_FACTOR = "backoffFactor";
-
private static final String MAX_DELAY = "maxDelay";
-
private static final String MAX_RETRIES = "maxRetries";
-
private static final String INITIAL_DELAY = "initialDelay";
-
public static final String URI = "http://ns.taverna.org.uk/2010/scufl2/taverna/dispatchlayer/Retry";
private ObjectNode config;
-
private int maxRetries;
-
private int initialDelay;
-
private int maxDelay;
-
private double backoffFactor;
private static Timer retryTimer = new Timer("Retry timer", true);
@@ -100,7 +90,6 @@
}
class RetryState extends JobState {
-
int currentRetryCount = 0;
public RetryState(DispatchJobEvent jobEvent) {
@@ -116,10 +105,9 @@
*/
@Override
public boolean handleError() {
- if (currentRetryCount >= maxRetries) {
+ if (currentRetryCount >= maxRetries)
return false;
- }
- int delay = (int) (initialDelay * (Math.pow(backoffFactor, currentRetryCount)));
+ int delay = (int) (initialDelay * Math.pow(backoffFactor, currentRetryCount));
delay = Math.min(delay, maxDelay);
TimerTask task = new TimerTask() {
@Override
@@ -127,12 +115,10 @@
currentRetryCount++;
getBelow().receiveJob(jobEvent);
}
-
};
retryTimer.schedule(task, delay);
return true;
}
-
}
@Override
@@ -140,6 +126,7 @@
return new RetryState(jobEvent);
}
+ @Override
public void configure(JsonNode config) {
ObjectNode defaultConfig = defaultConfig();
setAllMissingFields((ObjectNode) config, defaultConfig);
@@ -152,11 +139,9 @@
}
private void setAllMissingFields(ObjectNode config, ObjectNode defaults) {
- for (String fieldName : forEach(defaults.fieldNames())) {
- if (! config.has(fieldName) || config.get(fieldName).isNull()) {
+ for (String fieldName : forEach(defaults.fieldNames()))
+ if (! config.has(fieldName) || config.get(fieldName).isNull())
config.put(fieldName, defaults.get(fieldName));
- }
- }
}
private <T> Iterable<T> forEach(final Iterator<T> iterator) {
@@ -169,18 +154,14 @@
}
private void checkConfig(ObjectNode conf) {
- if (conf.get(MAX_RETRIES).intValue() < 0) {
+ if (conf.get(MAX_RETRIES).intValue() < 0)
throw new IllegalArgumentException("maxRetries < 0");
- }
- if (conf.get(INITIAL_DELAY).intValue() < 0) {
+ if (conf.get(INITIAL_DELAY).intValue() < 0)
throw new IllegalArgumentException("initialDelay < 0");
- }
- if (conf.get(MAX_DELAY).intValue() < conf.get(INITIAL_DELAY).intValue()) {
+ if (conf.get(MAX_DELAY).intValue() < conf.get(INITIAL_DELAY).intValue())
throw new IllegalArgumentException("maxDelay < initialDelay");
- }
- if (conf.get(BACKOFF_FACTOR).doubleValue() < 0.0) {
+ if (conf.get(BACKOFF_FACTOR).doubleValue() < 0.0)
throw new IllegalArgumentException("backoffFactor < 0.0");
- }
}
public static ObjectNode defaultConfig() {
@@ -192,7 +173,8 @@
return conf;
}
- public JsonNode getConfiguration() {
+ @Override
+ public JsonNode getConfiguration() {
return this.config;
}
}
diff --git a/workflowmodel-core-extensions/src/main/java/net/sf/taverna/t2/workflowmodel/processor/dispatch/layers/RetryConfig.java b/workflowmodel-core-extensions/src/main/java/net/sf/taverna/t2/workflowmodel/processor/dispatch/layers/RetryConfig.java
index b9fd21b..39dedbf 100644
--- a/workflowmodel-core-extensions/src/main/java/net/sf/taverna/t2/workflowmodel/processor/dispatch/layers/RetryConfig.java
+++ b/workflowmodel-core-extensions/src/main/java/net/sf/taverna/t2/workflowmodel/processor/dispatch/layers/RetryConfig.java
@@ -35,7 +35,6 @@
private int maxDelay = MAX_DELAY;
private int maxRetries = MAX_RETRIES;
-
/**
* Factor by which the initial delay is multiplied for each retry after the
* first, this allows for exponential backoff of retry times up to a certain
@@ -95,5 +94,4 @@
public void setMaxRetries(int max) {
this.maxRetries = max;
}
-
}
diff --git a/workflowmodel-core-extensions/src/main/java/net/sf/taverna/t2/workflowmodel/processor/dispatch/layers/Stop.java b/workflowmodel-core-extensions/src/main/java/net/sf/taverna/t2/workflowmodel/processor/dispatch/layers/Stop.java
index afea2be..3169f8c 100644
--- a/workflowmodel-core-extensions/src/main/java/net/sf/taverna/t2/workflowmodel/processor/dispatch/layers/Stop.java
+++ b/workflowmodel-core-extensions/src/main/java/net/sf/taverna/t2/workflowmodel/processor/dispatch/layers/Stop.java
@@ -22,46 +22,34 @@
* runs. It does so by intercepting jobs sent to the layer.
*
* @author alanrw
- *
*/
public class Stop extends AbstractDispatchLayer<JsonNode> {
-
public static final String URI = "http://ns.taverna.org.uk/2010/scufl2/taverna/dispatchlayer/Stop";
-
/**
* The set of ids of workflow runs that have been cancelled.
*/
- private static Set<String> cancelledWorkflowRuns = new HashSet<String>();
-
+ private static Set<String> cancelledWorkflowRuns = new HashSet<>();
/**
* A map from workflow run ids to the set of Stop layers where jobs have
* been intercepted for that run.
*/
- private static Map<String, Set<Stop>> pausedLayerMap = new HashMap<String, Set<Stop>>();
-
+ private static Map<String, Set<Stop>> pausedLayerMap = new HashMap<>();
/**
* A map for a given Stop from ids of suspended workflow runs to the jobs
* that have been intercepted.
*/
- private Map<String, Set<DispatchJobEvent>> suspendedJobEventMap = new HashMap<String, Set<DispatchJobEvent>>();
+ private Map<String, Set<DispatchJobEvent>> suspendedJobEventMap = new HashMap<>();
+ @Override
public void configure(JsonNode conf) throws ConfigurationException {
// nothing
}
+ @Override
public JsonNode getConfiguration() {
return null;
}
- /*
- * (non-Javadoc)
- *
- * @see
- * net.sf.taverna.t2.workflowmodel.processor.dispatch.AbstractDispatchLayer
- * #receiveJob
- * (net.sf.taverna.t2.workflowmodel.processor.dispatch.events.DispatchJobEvent
- * )
- */
@Override
public void receiveJob(final DispatchJobEvent jobEvent) {
List<WorkflowRunIdEntity> entities = jobEvent.getContext().getEntities(
@@ -70,11 +58,10 @@
final String wfRunId = entities.get(0).getWorkflowRunId();
// If the workflow run is cancelled then simply "eat" the jobEvent.
// This does a hard-cancel.
- if (cancelledWorkflowRuns.contains(wfRunId)) {
+ if (cancelledWorkflowRuns.contains(wfRunId))
return;
- }
// If the workflow run is paused
- if (pausedLayerMap.containsKey(wfRunId)) {
+ if (pausedLayerMap.containsKey(wfRunId))
synchronized (Stop.class) {
// double check as pausedLayerMap may have been changed
// waiting for the lock
@@ -82,30 +69,20 @@
// Remember that this Stop layer was affected by the
// workflow pause
pausedLayerMap.get(wfRunId).add(this);
- if (!suspendedJobEventMap.containsKey(wfRunId)) {
+ if (!suspendedJobEventMap.containsKey(wfRunId))
suspendedJobEventMap.put(wfRunId,
new HashSet<DispatchJobEvent>());
- }
// Remember the suspended jobEvent
suspendedJobEventMap.get(wfRunId).add(jobEvent);
return;
}
}
- }
}
// By default pass the jobEvent down to the next layer
super.receiveJob(jobEvent);
}
- /*
- * (non-Javadoc)
- *
- * @see
- * net.sf.taverna.t2.workflowmodel.processor.dispatch.AbstractDispatchLayer
- * #receiveJobQueue
- * (net.sf.taverna.t2.workflowmodel.processor.dispatch.events
- * .DispatchJobQueueEvent)
- */
+ @Override
public void receiveJobQueue(DispatchJobQueueEvent jobQueueEvent) {
super.receiveJobQueue(jobQueueEvent);
}
@@ -119,17 +96,12 @@
* cancelled then false.
*/
public static synchronized boolean cancelWorkflow(String workflowRunId) {
-
- if (cancelledWorkflowRuns.contains(workflowRunId)) {
+ if (cancelledWorkflowRuns.contains(workflowRunId))
return false;
- }
- Set<String> cancelledWorkflowRunsCopy = new HashSet<String>(
+ Set<String> cancelledWorkflowRunsCopy = new HashSet<>(
cancelledWorkflowRuns);
-
cancelledWorkflowRunsCopy.add(workflowRunId);
-
cancelledWorkflowRuns = cancelledWorkflowRunsCopy;
-
return true;
}
@@ -142,20 +114,14 @@
* paused or cancelled then false.
*/
public static synchronized boolean pauseWorkflow(String workflowRunId) {
-
- if (cancelledWorkflowRuns.contains(workflowRunId)) {
+ if (cancelledWorkflowRuns.contains(workflowRunId))
return false;
- }
- if (!pausedLayerMap.containsKey(workflowRunId)) {
- Map<String, Set<Stop>> pausedLayerMapCopy = new HashMap<String, Set<Stop>>();
- pausedLayerMapCopy.putAll(pausedLayerMap);
- pausedLayerMapCopy.put(workflowRunId, new HashSet<Stop>());
- pausedLayerMap = pausedLayerMapCopy;
- return true;
- } else {
+ if (pausedLayerMap.containsKey(workflowRunId))
return false;
- }
-
+ Map<String, Set<Stop>> pausedLayerMapCopy = new HashMap<>(pausedLayerMap);
+ pausedLayerMapCopy.put(workflowRunId, new HashSet<Stop>());
+ pausedLayerMap = pausedLayerMapCopy;
+ return true;
}
/**
@@ -167,23 +133,17 @@
* was not paused or it was cancelled, then false.
*/
public static synchronized boolean resumeWorkflow(String workflowRunId) {
-
- if (cancelledWorkflowRuns.contains(workflowRunId)) {
+ if (cancelledWorkflowRuns.contains(workflowRunId))
return false;
- }
- if (pausedLayerMap.containsKey(workflowRunId)) {
- Map<String, Set<Stop>> pausedLayerMapCopy = new HashMap<String, Set<Stop>>();
- pausedLayerMapCopy.putAll(pausedLayerMap);
- Set<Stop> stops = pausedLayerMapCopy.remove(workflowRunId);
- pausedLayerMap = pausedLayerMapCopy;
- for (Stop s : stops) {
- s.resumeLayerWorkflow(workflowRunId);
- }
- return true;
- } else {
+ if (!pausedLayerMap.containsKey(workflowRunId))
return false;
- }
-
+ Map<String, Set<Stop>> pausedLayerMapCopy = new HashMap<>();
+ pausedLayerMapCopy.putAll(pausedLayerMap);
+ Set<Stop> stops = pausedLayerMapCopy.remove(workflowRunId);
+ pausedLayerMap = pausedLayerMapCopy;
+ for (Stop s : stops)
+ s.resumeLayerWorkflow(workflowRunId);
+ return true;
}
/**
@@ -196,11 +156,8 @@
private void resumeLayerWorkflow(String workflowRunId) {
synchronized (Stop.class) {
for (DispatchJobEvent dje : suspendedJobEventMap
- .remove(workflowRunId)) {
-
- this.receiveJob(dje);
- }
+ .remove(workflowRunId))
+ receiveJob(dje);
}
}
-
}
diff --git a/workflowmodel-core-extensions/src/test/java/net/sf/taverna/t2/workflowmodel/processor/dispatch/layers/TestRetry.java b/workflowmodel-core-extensions/src/test/java/net/sf/taverna/t2/workflowmodel/processor/dispatch/layers/TestRetry.java
index 0d5aea9..e202df1 100644
--- a/workflowmodel-core-extensions/src/test/java/net/sf/taverna/t2/workflowmodel/processor/dispatch/layers/TestRetry.java
+++ b/workflowmodel-core-extensions/src/test/java/net/sf/taverna/t2/workflowmodel/processor/dispatch/layers/TestRetry.java
@@ -1,7 +1,5 @@
package net.sf.taverna.t2.workflowmodel.processor.dispatch.layers;
-import net.sf.taverna.t2.workflowmodel.processor.dispatch.events.DispatchJobEvent;
-
import org.junit.Test;
import com.fasterxml.jackson.databind.JsonNode;
diff --git a/workflowmodel-impl/pom.xml b/workflowmodel-impl/pom.xml
index c2c6851..c72abea 100644
--- a/workflowmodel-impl/pom.xml
+++ b/workflowmodel-impl/pom.xml
@@ -46,7 +46,6 @@
<dependency>
<groupId>com.thoughtworks.xstream</groupId>
<artifactId>com.springsource.com.thoughtworks.xstream</artifactId>
- <version>${xstream.version}</version>
</dependency>
<dependency>
<groupId>net.sf.taverna.t2.core</groupId>
@@ -54,11 +53,9 @@
<version>${project.version}</version>
<!--<scope>test</scope> -->
</dependency>
-
<dependency>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
- <version>${log4j.version}</version>
</dependency>
<dependency>
diff --git a/workflowmodel-impl/src/test/java/net/sf/taverna/t2/annotation/TestAnnotations.java b/workflowmodel-impl/src/test/java/net/sf/taverna/t2/annotation/TestAnnotations.java
index cd3b902..90af49d 100644
--- a/workflowmodel-impl/src/test/java/net/sf/taverna/t2/annotation/TestAnnotations.java
+++ b/workflowmodel-impl/src/test/java/net/sf/taverna/t2/annotation/TestAnnotations.java
@@ -48,7 +48,7 @@
public class TestAnnotations {
- @SuppressWarnings("unchecked")
+ @SuppressWarnings({ "unchecked", "rawtypes", "deprecation" })
@Test
@Ignore("utterly broken")
public void getAnnotationsForADataFlow() {
diff --git a/workflowmodel-impl/src/test/java/net/sf/taverna/t2/workflowmodel/impl/ConnectMergedDatalinkEditTest.java b/workflowmodel-impl/src/test/java/net/sf/taverna/t2/workflowmodel/impl/ConnectMergedDatalinkEditTest.java
index bfb84bf..c5ca43f 100644
--- a/workflowmodel-impl/src/test/java/net/sf/taverna/t2/workflowmodel/impl/ConnectMergedDatalinkEditTest.java
+++ b/workflowmodel-impl/src/test/java/net/sf/taverna/t2/workflowmodel/impl/ConnectMergedDatalinkEditTest.java
@@ -22,7 +22,7 @@
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertNull;
+//import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertSame;
import static org.junit.Assert.assertTrue;
import net.sf.taverna.t2.workflowmodel.Datalink;
diff --git a/workflowmodel-impl/src/test/java/net/sf/taverna/t2/workflowmodel/impl/DummyDataflow.java b/workflowmodel-impl/src/test/java/net/sf/taverna/t2/workflowmodel/impl/DummyDataflow.java
index b88f8d1..cffcd18 100644
--- a/workflowmodel-impl/src/test/java/net/sf/taverna/t2/workflowmodel/impl/DummyDataflow.java
+++ b/workflowmodel-impl/src/test/java/net/sf/taverna/t2/workflowmodel/impl/DummyDataflow.java
@@ -44,41 +44,50 @@
public List<Processor> processors = new ArrayList<Processor>();
public List<Merge> merges = new ArrayList<Merge>();
+ @Override
public DataflowValidationReport checkValidity() {
return new DummyValidationReport(true);
}
+ @Override
public <T extends NamedWorkflowEntity> List<? extends T> getEntities(
Class<T> entityType) {
// TODO Auto-generated method stub
return null;
}
+ @Override
public List<? extends DataflowInputPort> getInputPorts() {
return inputPorts;
}
+ @Override
public List<? extends Datalink> getLinks() {
// TODO Auto-generated method stub
return null;
}
+ @Override
public List<? extends DataflowOutputPort> getOutputPorts() {
return outputPorts;
}
+ @Override
public List<? extends Processor> getProcessors() {
return processors;
}
+ @Override
public List<? extends Merge> getMerges() {
return merges;
}
+ @Override
public String getLocalName() {
return "test_dataflow";
}
+ @Override
public void fire(String owningProcess, InvocationContext context) {
String newOwningProcess = owningProcess + ":" + getLocalName();
for (Processor p : processors) {
@@ -88,16 +97,19 @@
}
}
+ @Override
public FailureTransmitter getFailureTransmitter() {
// TODO Auto-generated method stub
return null;
}
+ @Override
public boolean doTypeCheck() throws IterationTypeMismatchException {
throw new UnsupportedOperationException(
"Not implemented for this class");
}
+ @Override
public String getIdentifier() {
return "an id";
}
@@ -112,10 +124,12 @@
}
+ @Override
public boolean isInputPortConnected(DataflowInputPort inputPort) {
return false;
}
+ @Override
public String recordIdentifier() {
return getIdentifier();
}
diff --git a/workflowmodel-impl/src/test/java/net/sf/taverna/t2/workflowmodel/impl/DummyProcessor.java b/workflowmodel-impl/src/test/java/net/sf/taverna/t2/workflowmodel/impl/DummyProcessor.java
index 8f8cab5..4ad81cd 100644
--- a/workflowmodel-impl/src/test/java/net/sf/taverna/t2/workflowmodel/impl/DummyProcessor.java
+++ b/workflowmodel-impl/src/test/java/net/sf/taverna/t2/workflowmodel/impl/DummyProcessor.java
@@ -45,86 +45,103 @@
public List<Condition> preConditionList = new ArrayList<Condition>();
public List<ProcessorInputPort> inputPorts = new ArrayList<ProcessorInputPort>();
+ @Override
public boolean doTypeCheck() throws IterationTypeMismatchException {
// TODO Auto-generated method stub
return false;
}
+ @Override
public void fire(String owningProcess, InvocationContext context) {
firedOwningProcess=owningProcess;
}
+ @Override
public List<? extends Activity<?>> getActivityList() {
// TODO Auto-generated method stub
return null;
}
+ @Override
public List<? extends Condition> getControlledPreconditionList() {
// TODO Auto-generated method stub
return null;
}
+ @Override
public DispatchStack getDispatchStack() {
// TODO Auto-generated method stub
return null;
}
+ @Override
public List<? extends ProcessorInputPort> getInputPorts() {
return inputPorts;
}
+ @Override
public IterationStrategyStack getIterationStrategy() {
// TODO Auto-generated method stub
return null;
}
+ @Override
public List<? extends ProcessorOutputPort> getOutputPorts() {
// TODO Auto-generated method stub
return null;
}
+ @Override
public List<? extends Condition> getPreconditionList() {
return preConditionList;
}
+ @Override
public String getLocalName() {
// TODO Auto-generated method stub
return null;
}
+ @Override
public Edit<? extends Processor> getAddAnnotationEdit(
AnnotationChain newAnnotation) {
// TODO Auto-generated method stub
return null;
}
+ @Override
public Set<? extends AnnotationChain> getAnnotations() {
// TODO Auto-generated method stub
return null;
}
+ @Override
public Edit<? extends Processor> getRemoveAnnotationEdit(
AnnotationChain annotationToRemove) {
// TODO Auto-generated method stub
return null;
}
+ @Override
public void setAnnotations(Set<AnnotationChain> annotations) {
// TODO Auto-generated method stub
}
+ @Override
public void addObserver(Observer<ProcessorFinishedEvent> observer) {
// TODO Auto-generated method stub
}
+ @Override
public List<Observer<ProcessorFinishedEvent>> getObservers() {
// TODO Auto-generated method stub
return null;
}
+ @Override
public void removeObserver(Observer<ProcessorFinishedEvent> observer) {
// TODO Auto-generated method stub
diff --git a/workflowmodel-impl/src/test/java/net/sf/taverna/t2/workflowmodel/impl/DummyValidationReport.java b/workflowmodel-impl/src/test/java/net/sf/taverna/t2/workflowmodel/impl/DummyValidationReport.java
index aca6300..87c7b2a 100644
--- a/workflowmodel-impl/src/test/java/net/sf/taverna/t2/workflowmodel/impl/DummyValidationReport.java
+++ b/workflowmodel-impl/src/test/java/net/sf/taverna/t2/workflowmodel/impl/DummyValidationReport.java
@@ -37,26 +37,32 @@
this.valid = valid;
}
+ @Override
public boolean isValid() {
return valid;
}
+ @Override
public List<? extends TokenProcessingEntity> getUnsatisfiedEntities() {
return null;
}
+ @Override
public List<? extends DataflowOutputPort> getUnresolvedOutputs() {
return null;
}
+ @Override
public List<? extends TokenProcessingEntity> getFailedEntities() {
return null;
}
+ @Override
public Map<TokenProcessingEntity, DataflowValidationReport> getInvalidDataflows() {
return null;
}
+ @Override
public boolean isWorkflowIncomplete() {
return false;
}
diff --git a/workflowmodel-impl/src/test/java/net/sf/taverna/t2/workflowmodel/impl/ProcessorHealthReportTest.java b/workflowmodel-impl/src/test/java/net/sf/taverna/t2/workflowmodel/impl/ProcessorHealthReportTest.java
index 97e635f..d4a0e22 100644
--- a/workflowmodel-impl/src/test/java/net/sf/taverna/t2/workflowmodel/impl/ProcessorHealthReportTest.java
+++ b/workflowmodel-impl/src/test/java/net/sf/taverna/t2/workflowmodel/impl/ProcessorHealthReportTest.java
@@ -21,7 +21,7 @@
package net.sf.taverna.t2.workflowmodel.impl;
import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertSame;
+//import static org.junit.Assert.assertSame;
import static org.junit.Assert.fail;
import java.util.ArrayList;
diff --git a/workflowmodel-impl/src/test/java/net/sf/taverna/t2/workflowmodel/impl/UpdateDataflowInternalIdentifierEditTest.java b/workflowmodel-impl/src/test/java/net/sf/taverna/t2/workflowmodel/impl/UpdateDataflowInternalIdentifierEditTest.java
index 6623bc1..f56dc9a 100644
--- a/workflowmodel-impl/src/test/java/net/sf/taverna/t2/workflowmodel/impl/UpdateDataflowInternalIdentifierEditTest.java
+++ b/workflowmodel-impl/src/test/java/net/sf/taverna/t2/workflowmodel/impl/UpdateDataflowInternalIdentifierEditTest.java
@@ -22,7 +22,7 @@
import static org.junit.Assert.assertEquals;
import net.sf.taverna.t2.workflowmodel.Dataflow;
-import net.sf.taverna.t2.workflowmodel.Edit;
+//import net.sf.taverna.t2.workflowmodel.Edit;
import net.sf.taverna.t2.workflowmodel.Edits;
import org.junit.Test;
diff --git a/workflowmodel-impl/src/test/java/net/sf/taverna/t2/workflowmodel/processor/DiagnosticEventHandler.java b/workflowmodel-impl/src/test/java/net/sf/taverna/t2/workflowmodel/processor/DiagnosticEventHandler.java
index ac14154..ccd440e 100644
--- a/workflowmodel-impl/src/test/java/net/sf/taverna/t2/workflowmodel/processor/DiagnosticEventHandler.java
+++ b/workflowmodel-impl/src/test/java/net/sf/taverna/t2/workflowmodel/processor/DiagnosticEventHandler.java
@@ -37,6 +37,7 @@
protected int eventCount = 0;
+ @Override
public synchronized void receiveEvent(WorkflowDataToken token) {
eventCount++;
logger.debug(token);
@@ -50,14 +51,17 @@
this.eventCount = 0;
}
+ @Override
public int getDepth() {
return 0;
}
+ @Override
public String getName() {
return "Test port";
}
+ @Override
public Datalink getIncomingLink() {
// TODO Auto-generated method stub
return null;
diff --git a/workflowmodel-impl/src/test/java/net/sf/taverna/t2/workflowmodel/processor/dispatch/DiagnosticLayer.java b/workflowmodel-impl/src/test/java/net/sf/taverna/t2/workflowmodel/processor/dispatch/DiagnosticLayer.java
index f6a73aa..f8c752d 100644
--- a/workflowmodel-impl/src/test/java/net/sf/taverna/t2/workflowmodel/processor/dispatch/DiagnosticLayer.java
+++ b/workflowmodel-impl/src/test/java/net/sf/taverna/t2/workflowmodel/processor/dispatch/DiagnosticLayer.java
@@ -62,10 +62,12 @@
System.out.println(" Purging caches for " + process);
}
+ @Override
public void configure(Object config) {
// Do nothing
}
+ @Override
public Object getConfiguration() {
return null;
}
diff --git a/workflowmodel-impl/src/test/java/net/sf/taverna/t2/workflowmodel/processor/iteration/DiagnosticIterationStrategyNode.java b/workflowmodel-impl/src/test/java/net/sf/taverna/t2/workflowmodel/processor/iteration/DiagnosticIterationStrategyNode.java
index 3a9d276..6a44143 100644
--- a/workflowmodel-impl/src/test/java/net/sf/taverna/t2/workflowmodel/processor/iteration/DiagnosticIterationStrategyNode.java
+++ b/workflowmodel-impl/src/test/java/net/sf/taverna/t2/workflowmodel/processor/iteration/DiagnosticIterationStrategyNode.java
@@ -36,7 +36,7 @@
* @author Tom Oinn
*
*/
-@SuppressWarnings("unchecked") //suppressed to avoid jdk1.5 compilation errors caused by the declaration IterationInternalEvent<? extends IterationInternalEvent<?>> e
+@SuppressWarnings({ "rawtypes", "serial" }) //suppressed to avoid jdk1.5 compilation errors caused by the declaration IterationInternalEvent<? extends IterationInternalEvent<?>> e
public class DiagnosticIterationStrategyNode extends
AbstractIterationStrategyNode {
@@ -101,6 +101,7 @@
return true;
}
+ @Override
public synchronized void receiveCompletion(int inputIndex,
Completion completion) {
String owningProcess = completion.getOwningProcess();
@@ -112,6 +113,7 @@
jobs.add(completion);
}
+ @Override
public synchronized void receiveJob(int inputIndex, Job newJob) {
List<IterationInternalEvent> jobs = ownerToJobList.get(newJob.getOwningProcess());
if (jobs == null) {
@@ -121,6 +123,7 @@
jobs.add(newJob);
}
+ @Override
public int getIterationDepth(Map<String, Integer> inputDepths) throws IterationTypeMismatchException {
// TODO Auto-generated method stub
return 0;