blob: 3e751ff4f6546904d84d9143bad4070141a7ef0c [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.ode.bpel.runtime;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Set;
import javax.xml.namespace.QName;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.ode.bpel.evt.ScopeCompletionEvent;
import org.apache.ode.bpel.evt.ScopeFaultEvent;
import org.apache.ode.bpel.evt.ScopeStartEvent;
import org.apache.ode.bpel.evt.VariableModificationEvent;
import org.apache.ode.bpel.obj.OBase;
import org.apache.ode.bpel.obj.OCatch;
import org.apache.ode.bpel.obj.OElementVarType;
import org.apache.ode.bpel.obj.OEventHandler;
import org.apache.ode.bpel.obj.OFailureHandling;
import org.apache.ode.bpel.obj.OFaultHandler;
import org.apache.ode.bpel.obj.OLink;
import org.apache.ode.bpel.obj.OMessageVarType;
import org.apache.ode.bpel.obj.OScope;
import org.apache.ode.bpel.obj.OVarType;
import org.apache.ode.bpel.runtime.channels.Compensation;
import org.apache.ode.bpel.runtime.channels.EventHandlerControl;
import org.apache.ode.bpel.runtime.channels.FaultData;
import org.apache.ode.bpel.runtime.channels.ParentScope;
import org.apache.ode.bpel.runtime.channels.Termination;
import org.apache.ode.jacob.CompositeProcess;
import org.apache.ode.jacob.ProcessUtil;
import org.apache.ode.jacob.ReceiveProcess;
import org.apache.ode.jacob.Synch;
import org.w3c.dom.Element;
/**
* An active scope.
*/
class SCOPE extends ACTIVITY {
private static final long serialVersionUID = 6111903798996023525L;
private static final Logger __log = LoggerFactory.getLogger(SCOPE.class);
private OScope _oscope;
private ActivityInfo _child;
private Set<EventHandlerInfo> _eventHandlers = new HashSet<EventHandlerInfo>();
/** Constructor. */
public SCOPE(ActivityInfo self, ScopeFrame frame, LinkFrame linkFrame) {
super(self, frame, linkFrame);
_oscope = (OScope) self.o;
assert _oscope.getActivity() != null;
}
public void run() {
// Start the child activity.
_child = new ActivityInfo(genMonotonic(),
_oscope.getActivity(),
newChannel(Termination.class), newChannel(ParentScope.class));
instance(createChild(_child, _scopeFrame, _linkFrame));
if (_oscope.getEventHandler() != null) {
for (Iterator<OEventHandler.OAlarm> i = _oscope.getEventHandler().getOnAlarms().iterator(); i.hasNext(); ) {
OEventHandler.OAlarm alarm = i.next();
EventHandlerInfo ehi = new EventHandlerInfo(alarm,
newChannel(EventHandlerControl.class),
newChannel(ParentScope.class),
newChannel(Termination.class));
_eventHandlers.add(ehi);
instance(new EH_ALARM(ehi.psc,ehi.tc, ehi.cc, alarm, _scopeFrame));
}
for (Iterator<OEventHandler.OEvent> i = _oscope.getEventHandler().getOnMessages().iterator(); i.hasNext(); ) {
OEventHandler.OEvent event = i.next();
EventHandlerInfo ehi = new EventHandlerInfo(event,
newChannel(EventHandlerControl.class),
newChannel(ParentScope.class),
newChannel(Termination.class));
_eventHandlers.add(ehi);
instance(new EH_EVENT(ehi.psc,ehi.tc, ehi.cc, event, _scopeFrame));
}
}
getBpelRuntimeContext().initializePartnerLinks(_scopeFrame.scopeInstanceId,
_oscope.getPartnerLinks().values());
sendEvent(new ScopeStartEvent());
instance(new ACTIVE());
}
private List<CompensationHandler> findCompensationData(OScope scope) {
List<CompensationHandler> out = new ArrayList<CompensationHandler>();
for (Iterator<CompensationHandler> i = _scopeFrame.availableCompensations.iterator(); i.hasNext(); ) {
CompensationHandler ch = i.next();
if (null == scope || ch.compensated.oscope.equals(scope))
out.add(ch);
}
// sort out in terms of completion order
Collections.sort(out);
return out;
}
class ACTIVE extends ACTIVITY {
private static final long serialVersionUID = -5876892592071965346L;
/** Links collected. */
private boolean _terminated;
private FaultData _fault;
private long _startTime;
private final HashSet<CompensationHandler> _compensations = new HashSet<CompensationHandler>();
private boolean _childTermRequested;
ACTIVE() {
super(SCOPE.this._self, SCOPE.this._scopeFrame, SCOPE.this._linkFrame);
_startTime = System.currentTimeMillis();
}
public void run() {
if (_child != null || !_eventHandlers.isEmpty()) {
CompositeProcess mlSet = ProcessUtil.compose(new ReceiveProcess() {
private static final long serialVersionUID = 1913414844895865116L;
}.setChannel(_self.self).setReceiver(new Termination() {
public void terminate() {
_terminated = true;
// Forward the termination request to the nested activity.
if (_child != null && !_childTermRequested) {
replication(_child.self).terminate();
_childTermRequested = true;
}
// Forward the termination request to our event handlers.
terminateEventHandlers();
instance(ACTIVE.this);
}
}));
// Handle messages from the child if it is still alive
if (_child != null) {
mlSet.or(new ReceiveProcess() {
private static final long serialVersionUID = -6934246487304813033L;
}.setChannel(_child.parent).setReceiver(new ParentScope() {
public void compensate(OScope scope, Synch ret) {
// If this scope does not have available compensations, defer to
// parent scope, otherwise do compensation.
if (_scopeFrame.availableCompensations == null)
_self.parent.compensate(scope, ret);
else {
// TODO: Check if we are doing duplicate compensation
List<CompensationHandler> compensations = findCompensationData(scope);
_scopeFrame.availableCompensations.removeAll(compensations);
instance(new ORDEREDCOMPENSATOR(compensations, ret));
}
instance(ACTIVE.this);
}
public void completed(FaultData flt, Set<CompensationHandler> compensations) {
// Set the fault to the activity's choice, if and only if no previous fault
// has been detected (first fault wins).
if (flt != null && _fault == null)
_fault = flt;
_child = null;
_compensations.addAll(compensations);
if (flt == null)
stopEventHandlers();
else
terminateEventHandlers();
instance(ACTIVE.this);
}
public void cancelled() {
// Implicit scope holds links of the enclosed activity,
// they only get cancelled when we propagate upwards.
if (_oscope.isImplicitScope())
_self.parent.cancelled();
else
completed(null, CompensationHandler.emptySet());
}
public void failure(String reason, Element data) {
completed(createFault(OFailureHandling.FAILURE_FAULT_NAME, _self.o, null),
CompensationHandler.emptySet());
}
}) );
}
// Similarly, handle messages from the event handler, if one exists
// and if it has not completed.
for (Iterator<EventHandlerInfo> i = _eventHandlers.iterator();i.hasNext();) {
final EventHandlerInfo ehi = i.next();
mlSet.or(new ReceiveProcess() {
private static final long serialVersionUID = -4694721357537858221L;
}.setChannel(ehi.psc).setReceiver(new ParentScope() {
public void compensate(OScope scope, Synch ret) {
// ACTIVE scopes do not compensate, send request up to parent.
_self.parent.compensate(scope, ret);
instance(ACTIVE.this);
}
public void completed(FaultData flt, Set<CompensationHandler> compenstations) {
// Set the fault to the activity's choice, if and only if no previous fault
// has been detected (first fault wins).
if (flt != null && _fault == null)
_fault = flt;
_eventHandlers.remove(ehi);
_compensations.addAll(compenstations);
if (flt != null) {
// Terminate child if we get a fault from the event handler.
if (_child != null && !_childTermRequested) {
replication(_child.self).terminate();
_childTermRequested = true;
}
terminateEventHandlers();
} else
stopEventHandlers();
instance(ACTIVE.this);
}
public void cancelled() { completed(null, CompensationHandler.emptySet()); }
public void failure(String reason, Element data) { completed(null, CompensationHandler.emptySet()); }
}));
}
object(false, mlSet);
} else /* nothing to wait for... */ {
// Any compensation handlers that were available but not activated will be forgotten.
Set<CompensationHandler> unreachableCompensationHandlers = _scopeFrame.availableCompensations;
if (unreachableCompensationHandlers != null)
for (Iterator<CompensationHandler> i = unreachableCompensationHandlers.iterator(); i.hasNext(); ) {
CompensationHandler ch = i.next();
ch.compChannel.forget();
}
_scopeFrame.availableCompensations = null;
// Maintain a set of links needing dead-path elimination.
Set<OLink> linksNeedingDPE = new HashSet<OLink>();
if (_oscope.getFaultHandler() != null)
for (Iterator<OCatch> i = _oscope.getFaultHandler().getCatchBlocks().iterator(); i.hasNext(); )
linksNeedingDPE.addAll(i.next().getOutgoingLinks());
// We're done with the main work, if we were terminated, we will
// need to load the termination handler:
if (_terminated) {
if (__log.isDebugEnabled()) {
__log.debug("Scope: " + _oscope + " was terminated. (fault="+_fault+")");
}
// ??? Should we forward
// If termination handler defined, and the scope has not faulted
if (_oscope.getTerminationHandler() != null && _fault == null) {
// We have to create a scope for the catch block.
BpelRuntimeContext ntive = getBpelRuntimeContext();
ActivityInfo terminationHandlerActivity = new ActivityInfo(genMonotonic(), _oscope.getTerminationHandler(),
newChannel(Termination.class,"TH"), newChannel(ParentScope.class,"TH"));
ScopeFrame terminationHandlerScopeFrame = new ScopeFrame(_oscope.getTerminationHandler(),
ntive.createScopeInstance(_scopeFrame.scopeInstanceId, _oscope.getTerminationHandler()),
_scopeFrame, CompensationHandler.emptySet(), (FaultData)null);
// Create the temination handler scope.
instance(new SCOPE(terminationHandlerActivity,terminationHandlerScopeFrame, SCOPE.this._linkFrame));
object(new ReceiveProcess() {
private static final long serialVersionUID = -6009078124717125270L;
}.setChannel(terminationHandlerActivity.parent).setReceiver(new ParentScope() {
public void compensate(OScope scope, Synch ret) {
// This should never happen.
throw new AssertionError("received compensate request!");
}
public void completed(FaultData fault, Set<CompensationHandler> compensations) {
// The compensations that have been registered here, will never be activated,
// so we'll forget them as soon as possible.
for (CompensationHandler compensation : compensations)
compensation.compChannel.forget();
// When a fault occurs within a termination handler, it is not propagated
_self.parent.completed(null, CompensationHandler.emptySet());
}
public void cancelled() { completed(null, CompensationHandler.emptySet()); }
public void failure(String reason, Element data) { completed(null, CompensationHandler.emptySet()); }
}));
} else {
_self.parent.completed(null,_compensations);
}
} else if (_fault != null) {
sendEvent(new ScopeFaultEvent(_fault.getFaultName(), _fault.getFaultLineNo(),_fault.getExplanation()));
// Find a fault handler for our fault.
OCatch catchBlock = _oscope.getFaultHandler() == null ? null : findCatch(_oscope.getFaultHandler(), _fault.getFaultName(), _fault.getFaultType());
// Collect all the compensation data for completed child scopes.
assert !!_eventHandlers.isEmpty();
assert _child == null;
if (catchBlock == null) {
// If we cannot find a catch block for this fault, then we simply propagate the fault
// to the parent. NOTE: the "default" fault handler as described in the BPEL spec
// must be generated by the compiler.
__log.warn(_self + ": has no fault handler for "
+ _fault.getFaultName() + "; scope will propagate FAULT! , " + _fault.toString());
_self.parent.completed(_fault, _compensations);
} else /* catchBlock != null */ {
__log.warn(_self + ": has a fault handler for "
+ _fault.getFaultName() + ": "+ catchBlock + " , " + _fault.toString());
linksNeedingDPE.removeAll(catchBlock.getOutgoingLinks());
// We have to create a scope for the catch block.
BpelRuntimeContext ntive = getBpelRuntimeContext();
ActivityInfo faultHandlerActivity = new ActivityInfo(genMonotonic(), catchBlock,
newChannel(Termination.class,"FH"), newChannel(ParentScope.class,"FH"));
ScopeFrame faultHandlerScopeFrame = new ScopeFrame(catchBlock,
ntive.createScopeInstance(_scopeFrame.scopeInstanceId, catchBlock),
_scopeFrame, _compensations, _fault);
if (catchBlock.getFaultVariable() != null) {
try {
VariableInstance vinst = faultHandlerScopeFrame.resolve(catchBlock.getFaultVariable());
initializeVariable(vinst, _fault.getFaultMessage().cloneNode(true));
// Generating event
VariableModificationEvent se = new VariableModificationEvent(vinst.declaration.getName());
se.setNewValue(_fault.getFaultMessage());
if (_oscope.getDebugInfo() != null)
se.setLineNo(_oscope.getDebugInfo().getStartLine());
sendEvent(se);
} catch (Exception ex) {
__log.error("",ex);
throw new InvalidProcessException(ex);
}
}
// Create the fault handler scope.
instance(new SCOPE(faultHandlerActivity,faultHandlerScopeFrame, SCOPE.this._linkFrame));
object(new ReceiveProcess() {
private static final long serialVersionUID = -6009078124717125270L;
}.setChannel(faultHandlerActivity.parent).setReceiver(new ParentScope() {
public void compensate(OScope scope, Synch ret) {
// This should never happen.
throw new AssertionError("received compensate request!");
}
public void completed(FaultData fault, Set<CompensationHandler> compensations) {
// The compensations that have been registered here, will never be activated,
// so we'll forget them as soon as possible.
for (CompensationHandler compensation : compensations)
compensation.compChannel.forget();
_self.parent.completed(fault, CompensationHandler.emptySet());
}
public void cancelled() { completed(null, CompensationHandler.emptySet()); }
public void failure(String reason, Element data) { completed(null, CompensationHandler.emptySet()); }
}));
}
} else /* completed ok */ {
sendEvent(new ScopeCompletionEvent());
if (_oscope.getCompensationHandler() != null) {
CompensationHandler compensationHandler = new CompensationHandler(
_scopeFrame,
newChannel(Compensation.class),
_startTime,
System.currentTimeMillis());
_self.parent.completed(null, Collections.singleton(compensationHandler));
instance(new COMPENSATIONHANDLER_(compensationHandler, _compensations));
} else /* no compensation handler */ {
_self.parent.completed(null, _compensations);
}
}
// DPE links needing DPE (i.e. the unselected catch blocks).
dpe(linksNeedingDPE);
}
}
private void terminateEventHandlers() {
for (Iterator<EventHandlerInfo> i = _eventHandlers.iterator();i.hasNext(); ) {
EventHandlerInfo ehi = i.next();
if (!ehi.terminateRequested && !ehi.stopRequested) {
replication(ehi.tc).terminate();
ehi.terminateRequested = true;
}
}
}
private void stopEventHandlers() {
for (Iterator<EventHandlerInfo> i = _eventHandlers.iterator();i.hasNext();) {
EventHandlerInfo ehi = i.next();
if (!ehi.stopRequested && !ehi.terminateRequested) {
ehi.cc.stop();
ehi.stopRequested = true;
}
}
}
}
private static OCatch findCatch(OFaultHandler fh, QName faultName, OVarType faultType) {
OCatch bestMatch = null;
for (OCatch c : fh.getCatchBlocks()) {
// First we try to eliminate this catch block based on fault-name mismatches:
if (c.getFaultName() != null) {
if (faultName == null)
continue;
if (!faultName.equals(c.getFaultName()))
continue;
}
// Then we try to eliminate this catch based on type incompatibility:
if (c.getFaultVariable() != null) {
if (faultType == null)
continue;
else if (c.getFaultVariable().getType() instanceof OMessageVarType) {
if (faultType instanceof OMessageVarType
&& ((OMessageVarType) faultType).equals(c.getFaultVariable().getType())) {
// Don't eliminate.
} else if (faultType instanceof OElementVarType
&& ((OMessageVarType) c.getFaultVariable().getType()).getDocLitType() != null
&& !((OMessageVarType) c.getFaultVariable().getType()).getDocLitType().equals(faultType)) {
// Don't eliminate.
} else {
continue; // Eliminate.
}
} else if (c.getFaultVariable().getType() instanceof OElementVarType) {
if (faultType instanceof OElementVarType && faultType.equals(c.getFaultVariable().getType())) {
// Don't eliminate
} else if (faultType instanceof OMessageVarType
&& ((OMessageVarType) faultType).getDocLitType() != null
&& ((OMessageVarType) faultType).getDocLitType().equals(c.getFaultVariable().getType())) {
// Don't eliminate
} else {
continue; // eliminate
}
} else {
continue; // Eliminate
}
}
// If we got to this point we did not eliminate this catch block. However, we don't just
// use the first non-eliminated catch, we instead try to find the best match.
if (bestMatch == null) {
// Obviously something is better then nothing.
bestMatch = c;
} else {
// Otherwise we prefer name and variable matches but prefer name-only matches to
// variable-only matches.
int existingScore = (bestMatch.getFaultName() == null ? 0 : 2) + (bestMatch.getFaultVariable() == null ? 0 : 1);
int currentScore = (c.getFaultName() == null ? 0 : 2) + (c.getFaultVariable() == null ? 0 : 1);
if (currentScore > existingScore) {
bestMatch = c;
}
}
}
return bestMatch;
}
static final class EventHandlerInfo implements Serializable {
private static final long serialVersionUID = -9046603073542446478L;
final OBase o;
final EventHandlerControl cc;
final ParentScope psc;
final Termination tc;
boolean terminateRequested;
boolean stopRequested;
EventHandlerInfo(OBase o, EventHandlerControl cc, ParentScope psc, Termination tc) {
this.o = o;
this.cc = cc;
this.psc = psc;
this.tc = tc;
}
}
}