blob: 78f3230df25e233b7533d18e3ab00b560702d75d [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.ode.bpel.runtime;
import java.util.Collection;
import java.util.Date;
import javax.xml.namespace.QName;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.ode.bpel.common.FaultException;
import org.apache.ode.bpel.evar.ExternalVariableModuleException;
import org.apache.ode.bpel.evt.VariableModificationEvent;
import org.apache.ode.bpel.obj.OInvoke;
import org.apache.ode.bpel.obj.OScope;
import org.apache.ode.bpel.runtime.channels.FaultData;
import org.apache.ode.bpel.runtime.channels.InvokeResponse;
import org.apache.ode.bpel.runtime.channels.Termination;
import org.apache.ode.jacob.ReceiveProcess;
import org.apache.ode.utils.DOMUtils;
import org.w3c.dom.Element;
import org.w3c.dom.Node;
import static org.apache.ode.jacob.ProcessUtil.compose;
/**
* JacobRunnable that performs the work of the <code>invoke</code> activity.
*/
public class INVOKE extends ACTIVITY {
private static final long serialVersionUID = 992248281026821783L;
private static final Logger __log = LoggerFactory.getLogger(INVOKE.class);
private OInvoke _oinvoke;
// Records number of invocations on the activity.
private int _invoked;
// Date/time of last failure.
private Date _lastFailure;
// Reason for last failure.
private String _failureReason;
// Data associated with failure.
private Element _failureData;
public INVOKE(ActivityInfo self, ScopeFrame scopeFrame, LinkFrame linkFrame) {
super(self, scopeFrame, linkFrame);
_oinvoke = (OInvoke) _self.o;
_invoked = 0;
}
public final void run() {
Element outboundMsg;
try {
outboundMsg = setupOutbound(_oinvoke, _oinvoke.getInitCorrelationsInput(), _oinvoke.getJoinCorrelationsInput());
} catch (FaultException e) {
__log.error("",e);
FaultData fault = createFault(e.getQName(), _oinvoke);
_self.parent.completed(fault, CompensationHandler.emptySet());
return;
} catch (ExternalVariableModuleException e) {
__log.error("",e);
_self.parent.failure(e.toString(), null);
return;
}
++_invoked;
// if there is no output variable, then this is a one-way invoke
boolean isTwoWay = _oinvoke.getOutputVar() != null;
try {
if (!isTwoWay) {
FaultData faultData = null;
getBpelRuntimeContext().invoke(_oinvoke.getId(),
_scopeFrame.resolve(_oinvoke.getPartnerLink()),
_oinvoke.getOperation(), outboundMsg, null);
_self.parent.completed(faultData, CompensationHandler.emptySet());
} else /* two-way */{
final VariableInstance outputVar = _scopeFrame.resolve(_oinvoke.getOutputVar());
InvokeResponse invokeResponseChannel = newChannel(InvokeResponse.class);
final String mexId = getBpelRuntimeContext().invoke(_oinvoke.getId(),
_scopeFrame.resolve(_oinvoke.getPartnerLink()), _oinvoke.getOperation(),
outboundMsg, invokeResponseChannel);
object(false, compose(new ReceiveProcess() {
private static final long serialVersionUID = 4496880438819196765L;
}.setChannel(invokeResponseChannel).setReceiver(new InvokeResponse() {
public void onResponse() {
// we don't have to write variable data -> this already
// happened in the nativeAPI impl
FaultData fault = null;
Element response;
try {
response = getBpelRuntimeContext().getPartnerResponse(mexId);
} catch (Exception e) {
__log.error("Exception while processing invoke response", e);
throw new RuntimeException(e);
}
try {
initializeVariable(outputVar, response);
} catch (ExternalVariableModuleException e) {
__log.error("Exception while initializing external variable", e);
_self.parent.failure(e.toString(), null);
return;
}
// Generating event
VariableModificationEvent se = new VariableModificationEvent(outputVar.declaration.getName());
se.setNewValue(response);
if (_oinvoke.getDebugInfo() != null)
se.setLineNo(_oinvoke.getDebugInfo().getStartLine());
sendEvent(se);
try {
for (OScope.CorrelationSet anInitCorrelationsOutput : _oinvoke.getInitCorrelationsOutput()) {
initializeCorrelation(_scopeFrame.resolve(anInitCorrelationsOutput), outputVar);
}
for (OScope.CorrelationSet aJoinCorrelationsOutput : _oinvoke.getJoinCorrelationsOutput()) {
// will be ignored if already initialized
initializeCorrelation(_scopeFrame.resolve(aJoinCorrelationsOutput), outputVar);
}
if (_oinvoke.getPartnerLink().hasPartnerRole()) {
// Trying to initialize partner epr based on a message-provided epr/session.
if (!getBpelRuntimeContext().isPartnerRoleEndpointInitialized(_scopeFrame
.resolve(_oinvoke.getPartnerLink())) || !_oinvoke.getPartnerLink().isInitializePartnerRole()) {
Node fromEpr = getBpelRuntimeContext().getSourceEPR(mexId);
if (fromEpr != null) {
getBpelRuntimeContext().writeEndpointReference(
_scopeFrame.resolve(_oinvoke.getPartnerLink()), (Element) fromEpr);
}
}
String partnersSessionId = getBpelRuntimeContext().getSourceSessionId(mexId);
if (partnersSessionId != null)
getBpelRuntimeContext().initializePartnersSessionId(_scopeFrame.resolve(_oinvoke.getPartnerLink()),
partnersSessionId);
}
} catch (FaultException e) {
fault = createFault(e.getQName(), _oinvoke);
}
// TODO update output variable with data from non-initiate correlation sets
_self.parent.completed(fault, CompensationHandler.emptySet());
getBpelRuntimeContext().releasePartnerMex(mexId, fault == null);
}
public void onFault() {
QName faultName = getBpelRuntimeContext().getPartnerFault(mexId);
Element msg = getBpelRuntimeContext().getPartnerResponse(mexId);
QName msgType = getBpelRuntimeContext().getPartnerResponseType(mexId);
FaultData fault = createFault(faultName, msg,
_oinvoke.getOwner().getMessageTypes().get(msgType), _self.o);
_self.parent.completed(fault, CompensationHandler.emptySet());
getBpelRuntimeContext().releasePartnerMex(mexId, false);
}
public void onFailure() {
// This indicates a communication failure. We don't throw a fault,
// because there is no fault, instead we'll re-incarnate the invoke
// and either retry or indicate failure condition.
// admin to resume the process.
String reason = getBpelRuntimeContext().getPartnerFaultExplanation(mexId);
__log.error("Failure during invoke: " + reason);
try {
Element el = DOMUtils.stringToDOM("<invokeFailure><![CDATA["+reason+"]]></invokeFailure>");
_self.parent.failure(reason, el);
} catch (Exception e) {
_self.parent.failure(reason, null);
}
// Resuming the process creates a new invoke
getBpelRuntimeContext().releasePartnerMex(mexId, false);
}
})).or(new ReceiveProcess() {
private static final long serialVersionUID = 4219496341785922396L;
}.setChannel(_self.self).setReceiver(new Termination() {
public void terminate() {
_self.parent.completed(null, CompensationHandler.emptySet());
}
})));
}
} catch (FaultException fault) {
__log.error("",fault);
FaultData faultData = createFault(fault.getQName(), _oinvoke, fault.getMessage());
_self.parent.completed(faultData, CompensationHandler.emptySet());
}
}
private Element setupOutbound(OInvoke oinvoke, Collection<OScope.CorrelationSet> outboundInitiations, Collection<OScope.CorrelationSet> outboundJoins)
throws FaultException, ExternalVariableModuleException {
for (OScope.CorrelationSet c : outboundInitiations) {
initializeCorrelation(_scopeFrame.resolve(c), _scopeFrame.resolve(oinvoke.getInputVar()));
}
for (OScope.CorrelationSet c : outboundJoins) {
// will be ignored if already initialized
initializeCorrelation(_scopeFrame.resolve(c), _scopeFrame.resolve(oinvoke.getInputVar()));
}
if ((oinvoke.getOperation().getInput() != null) && (oinvoke.getOperation().getInput().getMessage().getParts().size() > 0)) {
sendVariableReadEvent(_scopeFrame.resolve(oinvoke.getInputVar()));
Node outboundMsg = fetchVariableData(_scopeFrame.resolve(oinvoke.getInputVar()), false);
// TODO outbound message should be updated with non-initiate correlation sets
assert outboundMsg instanceof Element;
return (Element) outboundMsg;
} else return null;
}
}