blob: f14be5496f6fb31407a50406cd936a921fc35913 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.taverna.platform.execution.impl.local;
import static java.util.logging.Level.SEVERE;
import static java.util.logging.Level.WARNING;
import java.io.IOException;
import java.io.InputStream;
import java.net.URI;
import java.net.URISyntaxException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import java.util.UUID;
import java.util.logging.Logger;
import org.apache.taverna.facade.ResultListener;
import org.apache.taverna.invocation.InvocationContext;
import org.apache.taverna.invocation.WorkflowDataToken;
import org.apache.taverna.lang.observer.Observable;
import org.apache.taverna.lang.observer.Observer;
import org.apache.taverna.monitor.MonitorManager.AddPropertiesMessage;
import org.apache.taverna.monitor.MonitorManager.DeregisterNodeMessage;
import org.apache.taverna.monitor.MonitorManager.MonitorMessage;
import org.apache.taverna.monitor.MonitorManager.RegisterNodeMessage;
import org.apache.taverna.monitor.MonitorableProperty;
import org.apache.taverna.reference.ErrorDocument;
import org.apache.taverna.reference.ExternalReferenceSPI;
import org.apache.taverna.reference.IdentifiedList;
import org.apache.taverna.reference.ReferenceService;
import org.apache.taverna.reference.ReferenceServiceException;
import org.apache.taverna.reference.ReferenceSet;
import org.apache.taverna.reference.StackTraceElementBean;
import org.apache.taverna.reference.T2Reference;
import org.apache.taverna.reference.T2ReferenceType;
import org.apache.taverna.reference.impl.external.file.FileReference;
import org.apache.taverna.reference.impl.external.http.HttpReference;
import org.apache.taverna.workflowmodel.Dataflow;
import org.apache.taverna.workflowmodel.DataflowOutputPort;
import org.apache.taverna.workflowmodel.Processor;
import org.apache.taverna.workflowmodel.processor.activity.Activity;
import org.apache.taverna.workflowmodel.processor.dispatch.events.DispatchJobEvent;
import org.apache.taverna.workflowmodel.processor.dispatch.events.DispatchResultEvent;
import org.apache.taverna.robundle.Bundle;
import org.apache.taverna.databundle.DataBundles;
import org.apache.taverna.platform.execution.api.InvalidWorkflowException;
import org.apache.taverna.platform.report.ActivityReport;
import org.apache.taverna.platform.report.Invocation;
import org.apache.taverna.platform.report.ProcessorReport;
import org.apache.taverna.platform.report.StatusReport;
import org.apache.taverna.platform.report.WorkflowReport;
/**
* A workflow monitor for local executions.
*
* @author David Withers
*/
public class LocalExecutionMonitor implements Observer<MonitorMessage> {
private static final Logger logger = Logger
.getLogger(LocalExecutionMonitor.class.getName());
private static final String ID_SEPARATOR = "/";
private Map<String, StatusReport<?, ?>> reports;
private Map<String, Invocation> invocations;
private Map<String, String> invocationToActivity;
private Map<T2Reference, Path> referenceToPath;
private final String facadeId;
private final Bundle dataBundle;
public LocalExecutionMonitor(WorkflowReport workflowReport,
Bundle dataBundle, WorkflowToDataflowMapper mapping, String facadeId)
throws InvalidWorkflowException {
this.dataBundle = dataBundle;
this.facadeId = facadeId;
reports = new HashMap<>();
invocations = new HashMap<>();
invocationToActivity = new HashMap<>();
referenceToPath = new HashMap<>();
mapReports("", workflowReport, mapping);
}
private void mapReports(String id, WorkflowReport workflowReport,
WorkflowToDataflowMapper mapping) throws InvalidWorkflowException {
Dataflow dataflow = mapping.getDataflow(workflowReport.getSubject());
String dataflowId = null;
if (id.isEmpty()) {
dataflowId = dataflow.getLocalName();
} else {
dataflowId = id + ID_SEPARATOR + dataflow.getLocalName();
}
reports.put(dataflowId, workflowReport);
for (ProcessorReport processorReport : workflowReport
.getProcessorReports()) {
Processor processor = mapping.getDataflowProcessor(processorReport
.getSubject());
String processorId = dataflowId + ID_SEPARATOR
+ processor.getLocalName();
reports.put(processorId, (LocalProcessorReport) processorReport);
for (ActivityReport activityReport : processorReport
.getActivityReports()) {
Activity<?> activity = mapping
.getDataflowActivity(activityReport.getSubject());
String activityId = processorId + ID_SEPARATOR
+ activity.hashCode();
reports.put(activityId, activityReport);
WorkflowReport nestedWorkflowReport = activityReport
.getNestedWorkflowReport();
if (nestedWorkflowReport != null)
mapReports(activityId, nestedWorkflowReport, mapping);
}
}
}
@Override
public void notify(Observable<MonitorMessage> sender, MonitorMessage message)
throws Exception {
String[] owningProcess = message.getOwningProcess();
if (owningProcess.length > 0 && owningProcess[0].equals(facadeId)) {
if (message instanceof RegisterNodeMessage) {
RegisterNodeMessage regMessage = (RegisterNodeMessage) message;
registerNode(regMessage.getWorkflowObject(), owningProcess,
regMessage.getProperties());
} else if (message instanceof DeregisterNodeMessage) {
deregisterNode(owningProcess);
} else if (message instanceof AddPropertiesMessage) {
AddPropertiesMessage addMessage = (AddPropertiesMessage) message;
addPropertiesToNode(owningProcess,
addMessage.getNewProperties());
} else {
logger.warning("Unknown message " + message + " from " + sender);
}
}
}
public void registerNode(Object dataflowObject, String[] owningProcess,
Set<MonitorableProperty<?>> properties) {
if (dataflowObject instanceof Dataflow) {
Dataflow dataflow = (Dataflow) dataflowObject;
Invocation parentInvocation = invocations
.get(getParentInvocationId(owningProcess));
WorkflowReport report = (WorkflowReport) reports
.get(getReportId(owningProcess));
report.setStartedDate(new Date());
Invocation invocation = new Invocation(
getInvocationName(owningProcess), parentInvocation, report);
if (parentInvocation == null) {
if (DataBundles.hasInputs(dataBundle)) {
try {
invocation.setInputs(DataBundles.getPorts(DataBundles
.getInputs(dataBundle)));
} catch (IOException e) {
logger.log(WARNING, "Error setting input ports", e);
}
}
try {
Path outputs = DataBundles.getOutputs(dataBundle);
DataflowResultListener dataflowResultListener = new DataflowResultListener(
outputs);
for (DataflowOutputPort dataflowOutputPort : dataflow
.getOutputPorts()) {
String portName = dataflowOutputPort.getName();
Path portPath = DataBundles.getPort(outputs, portName);
invocation.setOutput(portName, portPath);
dataflowOutputPort
.addResultListener(dataflowResultListener);
}
} catch (IOException e) {
logger.log(WARNING, "Error setting output ports", e);
}
invocations.put(getInvocationId(owningProcess), invocation);
} else {
invocation.setInputs(parentInvocation.getInputs());
NestedDataflowResultListener resultListener = new NestedDataflowResultListener(
invocation);
for (DataflowOutputPort dataflowOutputPort : dataflow
.getOutputPorts()) {
dataflowOutputPort.addResultListener(resultListener);
}
invocations.put(getInvocationId(owningProcess), invocation);
}
} else if (dataflowObject instanceof Processor) {
StatusReport<?, ?> report = reports.get(getReportId(owningProcess));
report.setStartedDate(new Date());
if (report instanceof LocalProcessorReport)
((LocalProcessorReport) report).addProperties(properties);
} else if (dataflowObject instanceof Activity) {
Activity<?> activity = (Activity<?>) dataflowObject;
invocationToActivity.put(owningProcess[owningProcess.length - 1],
String.valueOf(activity.hashCode()));
} else if (dataflowObject instanceof DispatchJobEvent) {
DispatchJobEvent jobEvent = (DispatchJobEvent) dataflowObject;
StatusReport<?, ?> report = reports.get(getReportId(owningProcess));
// create a new invocation
Invocation parentInvocation;
Invocation invocation;
if (report instanceof ActivityReport) {
parentInvocation = invocations
.get(getParentInvocationId(owningProcess)
+ indexToString(jobEvent.getIndex()));
invocation = new Invocation(getInvocationName(owningProcess),
jobEvent.getIndex(), parentInvocation, report);
invocations.put(getInvocationId(owningProcess), invocation);
} else {
parentInvocation = invocations
.get(getParentInvocationId(owningProcess));
invocation = new Invocation(getInvocationName(owningProcess)
+ indexToString(jobEvent.getIndex()),
jobEvent.getIndex(), parentInvocation, report);
invocations.put(getInvocationId(owningProcess)
+ indexToString(jobEvent.getIndex()), invocation);
}
// set the invocation inputs
try {
for (Entry<String, T2Reference> inputInfo : jobEvent.getData()
.entrySet()) {
invocation.setInput(
inputInfo.getKey(),
getIntermediate(inputInfo.getValue(),
jobEvent.getContext()));
}
} catch (IOException | URISyntaxException e) {
logger.log(WARNING, "Error saving intermediate inputs for "
+ jobEvent.getOwningProcess(), e);
}
} else if (dataflowObject instanceof DispatchResultEvent) {
DispatchResultEvent resultEvent = (DispatchResultEvent) dataflowObject;
StatusReport<?, ?> report = reports.get(getReportId(owningProcess));
// find the invocation
Invocation invocation;
if (report instanceof ActivityReport)
invocation = invocations.remove(getInvocationId(owningProcess));
else
invocation = invocations.remove(getInvocationId(owningProcess)
+ indexToString(resultEvent.getIndex()));
if (invocation == null) {
logger.log(SEVERE, "Can't find invocation for owning process "
+ owningProcess);
return;
}
// set the invocation outputs
try {
for (Entry<String, T2Reference> outputInfo : resultEvent.getData()
.entrySet()) {
invocation.setOutput(
outputInfo.getKey(),
getIntermediate(outputInfo.getValue(),
resultEvent.getContext()));
}
} catch (IOException | URISyntaxException e) {
logger.log(WARNING, "Error saving intermediate outputs for "
+ resultEvent.getOwningProcess(), e);
}
invocation.setCompletedDate(new Date());
}
}
public void deregisterNode(String[] owningProcess) {
StatusReport<?, ?> report = reports.get(getReportId(owningProcess));
if (report == null) {
return;
} else if (report instanceof WorkflowReport) {
Invocation invocation = invocations
.remove(getInvocationId(owningProcess));
invocation.setCompletedDate(new Date());
report.setCompletedDate(new Date());
} else if (report instanceof LocalProcessorReport) {
((LocalProcessorReport) report).saveProperties();
report.setCompletedDate(new Date());
} else if (report instanceof ActivityReport) {
// Invocation may still exist if the activity failed
Invocation invocation = invocations
.remove(getInvocationId(owningProcess));
if (invocation != null) {
invocation.setCompletedDate(new Date());
report.setFailedDate(new Date());
} else
report.setCompletedDate(new Date());
invocationToActivity
.remove(owningProcess[owningProcess.length - 1]);
}
}
public void addPropertiesToNode(String[] owningProcess,
Set<MonitorableProperty<?>> newProperties) {
StatusReport<?, ?> report = reports.get(getReportId(owningProcess));
if (report instanceof LocalProcessorReport) {
LocalProcessorReport processorReport = (LocalProcessorReport) report;
processorReport.addProperties(newProperties);
}
}
private String getParentInvocationId(String[] owningProcess) {
List<String> id = new ArrayList<>();
for (int i = 1; i < owningProcess.length - 1; i++)
if (i % 4 != 0)
id.add(owningProcess[i]);
return toPath(id);
}
private String getInvocationId(String[] owningProcess) {
List<String> id = new ArrayList<>();
for (int i = 1; i < owningProcess.length; i++)
if (i % 4 != 0)
id.add(owningProcess[i]);
return toPath(id);
}
private String getInvocationName(String[] owningProcess) {
return owningProcess[owningProcess.length - 1];
}
private String toPath(List<String> id) {
StringBuilder sb = new StringBuilder();
String sep = "";
for (String string : id) {
sb.append(sep).append(string);
sep = ID_SEPARATOR;
}
return sb.toString();
}
private String getReportId(String[] owningProcess) {
List<String> id = new ArrayList<>();
for (int i = 1, position = 0; i < owningProcess.length; i++) {
if (i % 4 == 0)
continue;
if (position == 2) {
id.add(invocationToActivity.get(owningProcess[i]));
position = 0;
} else {
id.add(owningProcess[i]);
position++;
}
}
return toPath(id);
}
public String getProcessorId(String[] owningProcess) {
StringBuffer sb = new StringBuffer();
for (int i = 1, skip = 0; i < owningProcess.length; i++, skip--)
if (i <= 2 || skip < 0) {
sb.append(owningProcess[i]);
skip = 3;
}
return sb.toString();
}
private String indexToString(int[] index) {
StringBuilder indexString = new StringBuilder();
for (int i = 0; i < index.length; i++) {
if (i != 0)
indexString.append(":");
indexString.append(index[i] + 1);
}
return indexString.toString();
}
private Path getIntermediate(T2Reference t2Reference,
InvocationContext context) throws IOException, URISyntaxException {
if (referenceToPath.containsKey(t2Reference))
return referenceToPath.get(t2Reference);
Path path = referencePath(t2Reference);
convertReferenceToPath(path, t2Reference, context);
referenceToPath.put(t2Reference, path);
return path;
}
private Path referencePath(T2Reference t2Reference) throws IOException {
String local = t2Reference.getLocalPart();
try {
return DataBundles.getIntermediate(dataBundle,
UUID.fromString(local));
} catch (IllegalArgumentException ex) {
return DataBundles.getIntermediates(dataBundle)
.resolve(t2Reference.getNamespacePart())
.resolve(t2Reference.getLocalPart());
}
}
public static String getStackTraceElementString(
StackTraceElementBean stackTraceElement) {
StringBuilder sb = new StringBuilder();
sb.append(stackTraceElement.getClassName()).append('.')
.append(stackTraceElement.getMethodName());
if (stackTraceElement.getFileName() == null) {
sb.append("(unknown file)");
} else {
sb.append('(').append(stackTraceElement.getFileName()).append(':')
.append(stackTraceElement.getLineNumber()).append(')');
}
return sb.toString();
}
public void convertReferenceToPath(Path path, T2Reference reference,
InvocationContext context) throws IOException, URISyntaxException {
ReferenceService referenceService = context.getReferenceService();
if (reference.getReferenceType() == T2ReferenceType.ReferenceSet) {
if (DataBundles.isMissing(path)) {
ReferenceSet rs = referenceService.getReferenceSetService()
.getReferenceSet(reference);
if (rs == null)
throw new ReferenceServiceException(
"Could not find ReferenceSet " + reference);
// Check that there are references in the set
if (rs.getExternalReferences().isEmpty())
throw new ReferenceServiceException("ReferenceSet "
+ reference + " is empty");
for (ExternalReferenceSPI ers : rs.getExternalReferences()) {
if (ers instanceof FileReference) {
URI uri = ((FileReference) ers).getFile().toURI();
DataBundles.setReference(path, uri);
} else if (ers instanceof HttpReference) {
URI uri = ((HttpReference) ers).getHttpUrl().toURI();
DataBundles.setReference(path, uri);
} else {
try (InputStream in = ers.openStream(context)) {
Files.copy(in, path);
}
}
}
}
} else if (reference.getReferenceType() == T2ReferenceType.ErrorDocument) {
if (DataBundles.isMissing(path)) {
ErrorDocument errorDocument = referenceService
.getErrorDocumentService().getError(reference);
String message = errorDocument.getMessage();
StringBuilder trace = new StringBuilder();
if (errorDocument.getExceptionMessage() != null
&& !errorDocument.getExceptionMessage().isEmpty()) {
trace.append(errorDocument.getExceptionMessage());
trace.append("\n");
}
List<StackTraceElementBean> stackTraceStrings = errorDocument
.getStackTraceStrings();
for (StackTraceElementBean stackTraceElement : stackTraceStrings) {
trace.append(getStackTraceElementString(stackTraceElement));
trace.append("\n");
}
List<Path> causes = new ArrayList<>();
for (T2Reference errorReference : errorDocument
.getErrorReferences())
causes.add(getIntermediate(errorReference, context));
DataBundles.setError(path, message, trace.toString(),
causes.toArray(new Path[causes.size()]));
}
} else { // it is an IdentifiedList<T2Reference>
IdentifiedList<T2Reference> identifiedList = referenceService
.getListService().getList(reference);
if (!DataBundles.isList(path))
DataBundles.createList(path);
for (T2Reference ref : identifiedList)
convertReferenceToPath(DataBundles.newListItem(path), ref,
context);
}
}
private class NestedDataflowResultListener implements ResultListener {
private final Invocation invocation;
public NestedDataflowResultListener(Invocation invocation) {
this.invocation = invocation;
}
@Override
public void resultTokenProduced(WorkflowDataToken token, String portName) {
try {
if (token.isFinal())
invocation
.setOutput(
portName,
getIntermediate(token.getData(),
token.getContext()));
} catch (IOException | URISyntaxException e) {
logger.log(SEVERE, "Unable to convert T2Reference", e);
}
}
}
private class DataflowResultListener implements ResultListener {
private Path outputs;
private Map<String, Integer> depthSeen = new HashMap<>();
public DataflowResultListener(Path outputs) {
this.outputs = outputs;
}
@Override
public void resultTokenProduced(WorkflowDataToken token, String portName) {
Integer depth = depthSeen.get(portName);
if (depth == null || depth.equals(token.getIndex().length)) {
if (depth == null)
depthSeen.put(portName, token.getIndex().length);
try {
Path port = DataBundles.getPort(outputs, portName);
Path path = getPath(port, 0, token.getIndex());
convertReferenceToPath(path, token.getData(),
token.getContext());
} catch (IOException | URISyntaxException e) {
logger.log(SEVERE, "Unable to convert T2Reference", e);
}
}
}
private Path getPath(Path path, int depth, int[] index)
throws IOException {
if (depth == index.length)
return path;
if (!DataBundles.isList(path))
DataBundles.createList(path);
return getPath(DataBundles.getListItem(path, index[depth]),
depth + 1, index);
}
}
}