blob: 2b8aef07ed5525da1d952d6e7d3055bb0788dac4 [file] [log] [blame]
/*
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
*/
package org.apache.taverna.gis;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import org.geotools.ows.ServiceException;
import org.n52.wps.client.ExecuteRequestBuilder;
import org.n52.wps.client.ExecuteResponseAnalyser;
import org.n52.wps.client.WPSClientException;
import org.n52.wps.client.WPSClientSession;
import org.n52.wps.io.data.IData;
import org.n52.wps.io.data.binding.complex.GTVectorDataBinding;
import net.opengis.wps.x100.DataType;
import net.opengis.wps.x100.ExecuteDocument;
import net.opengis.wps.x100.ExecuteResponseDocument;
import net.opengis.wps.x100.OutputDataType;
import net.opengis.wps.x100.ProcessDescriptionType;
import org.apache.taverna.invocation.InvocationContext;
import org.apache.taverna.reference.ReferenceService;
import org.apache.taverna.reference.T2Reference;
import org.apache.taverna.workflowmodel.processor.activity.AbstractAsynchronousActivity;
import org.apache.taverna.workflowmodel.processor.activity.ActivityConfigurationException;
import org.apache.taverna.workflowmodel.processor.activity.AsynchronousActivity;
import org.apache.taverna.workflowmodel.processor.activity.AsynchronousActivityCallback;
import org.apache.taverna.workflowmodel.processor.activity.config.ActivityInputPortDefinitionBean;
import org.apache.taverna.workflowmodel.processor.activity.config.ActivityOutputPortDefinitionBean;
public class GisActivity extends AbstractAsynchronousActivity<GisActivityConfigurationBean>
implements AsynchronousActivity<GisActivityConfigurationBean> {
private GisActivityConfigurationBean configBean;
@Override
public void configure(GisActivityConfigurationBean configBean) throws ActivityConfigurationException {
// TODO: Should I call HealthChecker here??
// Any pre-config sanity checks
if (configBean.getOgcServiceUri().equals("")) {
throw new ActivityConfigurationException("Geospatial web service URI can't be empty");
}
// Store for getConfiguration()
this.configBean = configBean;
// REQUIRED: (Re)create input/output ports depending on configuration
configurePorts();
}
protected void configurePorts() {
// In case we are being reconfigured - remove existing ports first
// to avoid duplicates
removeInputs();
removeOutputs();
// Add input ports
for(ActivityInputPortDefinitionBean inputPort : configBean.getInputPortDefinitions())
{
addInput(inputPort.getName(),inputPort.getDepth(),inputPort.getAllowsLiteralValues(),inputPort.getHandledReferenceSchemes(), inputPort.getTranslatedElementType());
}
// Add output ports
for(ActivityOutputPortDefinitionBean outputPort : configBean.getOutputPortDefinitions())
{
addOutput(outputPort.getName(),outputPort.getDepth());
}
}
@SuppressWarnings("unchecked")
@Override
public void executeAsynch(final Map<String, T2Reference> inputs, final AsynchronousActivityCallback callback) {
// Execute service asynchronously
callback.requestRun(new Runnable() {
public void run() {
InvocationContext context = callback.getContext();
ReferenceService referenceService = context.getReferenceService();
// Declare outputs variable
Map<String, T2Reference> outputs = null;
try {
// prepare the execute object
WPSClientSession wpsClient = WPSClientSession.getInstance();
ProcessDescriptionType processDescription = wpsClient.getProcessDescription(configBean.getOgcServiceUri().toString(), configBean.getProcessIdentifier());
ExecuteRequestBuilder executeBuilder = new ExecuteRequestBuilder(processDescription);
for (ActivityInputPortDefinitionBean activityInputPort : configBean.getInputPortDefinitions()) {
String portValue = (String) referenceService.renderIdentifier(inputs.get(activityInputPort.getName()), String.class, context);
executeBuilder.addLiteralData(activityInputPort.getName(), portValue);
}
ExecuteDocument execute = executeBuilder.getExecute();
execute.getExecute().setService("WPS");
Object responseObject = null;
try {
// execute service
responseObject = wpsClient.execute(configBean.getOgcServiceUri().toString(), execute);
} catch (WPSClientException e) {
// if the an error return from service
callback.fail(e.getServerException().xmlText());
}
// Register outputs
outputs = new HashMap<String, T2Reference>();
T2Reference simpleRef = null;
if (responseObject instanceof ExecuteResponseDocument) {
ExecuteResponseDocument response = (ExecuteResponseDocument) responseObject;
// analyser is used to get complex data
ExecuteResponseAnalyser analyser = new ExecuteResponseAnalyser(
execute, response, processDescription);
for(OutputDataType output : response.getExecuteResponse().getProcessOutputs().getOutputArray())
{
DataType data = output.getData();
if (data.isSetLiteralData())
{
simpleRef = referenceService.register(data.getLiteralData().getStringValue(), 0, true, context);
outputs.put(output.getIdentifier().getStringValue(), simpleRef);
}
}
}
} catch (WPSClientException e) {
callback.fail(e.getMessage());
} catch (IOException e) {
callback.fail(e.getMessage());
}
callback.receiveResult(outputs, new int[0]);
}
});
}
@Override
public GisActivityConfigurationBean getConfiguration() {
return this.configBean;
}
}