blob: f1487f71a011a1894ef277e27a905e3f614929c4 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.ode.bpel.runtime;
import java.io.Serializable;
import java.util.Date;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.Set;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.ode.bpel.common.FaultException;
import org.apache.ode.bpel.evt.ActivityEnabledEvent;
import org.apache.ode.bpel.evt.ActivityExecEndEvent;
import org.apache.ode.bpel.evt.ActivityExecStartEvent;
import org.apache.ode.bpel.evt.ActivityFailureEvent;
import org.apache.ode.bpel.evt.ActivityRecoveryEvent;
import org.apache.ode.bpel.explang.EvaluationException;
import org.apache.ode.bpel.obj.OActivity;
import org.apache.ode.bpel.obj.OExpression;
import org.apache.ode.bpel.obj.OFailureHandling;
import org.apache.ode.bpel.obj.OInvoke;
import org.apache.ode.bpel.obj.OLink;
import org.apache.ode.bpel.obj.OScope;
import org.apache.ode.bpel.runtime.channels.ActivityRecovery;
import org.apache.ode.bpel.runtime.channels.FaultData;
import org.apache.ode.bpel.runtime.channels.LinkStatus;
import org.apache.ode.bpel.runtime.channels.ParentScope;
import org.apache.ode.bpel.runtime.channels.Termination;
import org.apache.ode.bpel.runtime.channels.TimerResponse;
import org.apache.ode.jacob.CompositeProcess;
import org.apache.ode.jacob.ReceiveProcess;
import org.apache.ode.jacob.Synch;
import org.w3c.dom.Element;
import static org.apache.ode.jacob.ProcessUtil.compose;
class ACTIVITYGUARD extends ACTIVITY {
private static final long serialVersionUID = 1L;
private static final Logger __log = LoggerFactory.getLogger(ACTIVITYGUARD.class);
private static final ActivityTemplateFactory __activityTemplateFactory = new ActivityTemplateFactory();
private OActivity _oactivity;
/** Link values. */
private Map<OLink, Boolean> _linkVals = new HashMap<OLink, Boolean>();
/** Flag to prevent duplicate ActivityEnabledEvents */
private boolean _firstTime = true;
private ActivityFailure _failure;
public ACTIVITYGUARD(ActivityInfo self, ScopeFrame scopeFrame, LinkFrame linkFrame) {
super(self, scopeFrame, linkFrame);
_oactivity = self.o;
}
public void run() {
// Send a notification of the activity being enabled,
if (_firstTime) {
sendEvent(new ActivityEnabledEvent());
_firstTime = false;
}
if (_linkVals.keySet().containsAll(_oactivity.getTargetLinks())) {
if (evaluateJoinCondition()) {
ActivityExecStartEvent aese = new ActivityExecStartEvent();
sendEvent(aese);
// intercept completion channel in order to execute transition conditions.
ActivityInfo activity = new ActivityInfo(genMonotonic(),_self.o,_self.self, newChannel(ParentScope.class));
instance(createActivity(activity));
instance(new TCONDINTERCEPT(activity.parent));
} else {
if (_oactivity.isSuppressJoinFailure()) {
_self.parent.completed(null, CompensationHandler.emptySet());
if (__log.isDebugEnabled())
__log.debug("Join condition false, suppress join failure on activity " + _self.aId);
} else {
FaultData fault = null;
fault = createFault(_oactivity.getOwner().getConstants().getQnJoinFailure(),_oactivity);
_self.parent.completed(fault, CompensationHandler.emptySet());
}
// Dead path activity.
dpe(_oactivity);
}
} else /* don't know all our links statuses */ {
CompositeProcess mlset = compose(new ReceiveProcess() {
private static final long serialVersionUID = 5094153128476008961L;
}.setChannel(_self.self).setReceiver(new Termination() {
public void terminate() {
// Complete immediately, without faulting or registering any comps.
_self.parent.completed(null, CompensationHandler.emptySet());
// Dead-path activity
dpe(_oactivity);
}
}));
for (final OLink link : _oactivity.getTargetLinks()) {
mlset.or(new ReceiveProcess() {
private static final long serialVersionUID = 1024137371118887935L;
}.setChannel(_linkFrame.resolve(link).sub).setReceiver(new LinkStatus() {
public void linkStatus(boolean value) {
_linkVals.put(link, Boolean.valueOf(value));
instance(ACTIVITYGUARD.this);
}
}));
}
object(false, mlset);
}
}
private boolean evaluateTransitionCondition(OExpression transitionCondition)
throws FaultException {
if (transitionCondition == null)
return true;
try {
return getBpelRuntimeContext().getExpLangRuntime().evaluateAsBoolean(transitionCondition,
new ExprEvaluationContextImpl(_scopeFrame, getBpelRuntimeContext()));
} catch (EvaluationException e) {
String msg = "Error in transition condition detected at runtime; condition=" + transitionCondition;
__log.error(msg,e);
throw new InvalidProcessException(msg, e);
}
}
/**
* Evaluate an activity's join condition.
* @return <code>true</code> if join condition evaluates to true.
*/
private boolean evaluateJoinCondition() {
// For activities with no link targets, the join condition is always satisfied.
if (_oactivity.getTargetLinks().size() == 0)
return true;
// For activities with no join condition, an OR condition is assumed.
if (_oactivity.getJoinCondition() == null)
return _linkVals.values().contains(Boolean.TRUE);
try {
return getBpelRuntimeContext().getExpLangRuntime().evaluateAsBoolean(_oactivity.getJoinCondition(),
new ExprEvaluationContextImpl(null, null,_linkVals));
} catch (Exception e) {
String msg = "Unexpected error evaluating a join condition: " + _oactivity.getJoinCondition();
__log.error(msg,e);
throw new InvalidProcessException(msg,e);
}
}
private static ACTIVITY createActivity(ActivityInfo activity, ScopeFrame scopeFrame, LinkFrame linkFrame) {
return __activityTemplateFactory.createInstance(activity.o,activity, scopeFrame, linkFrame);
}
private ACTIVITY createActivity(ActivityInfo activity) {
return createActivity(activity,_scopeFrame, _linkFrame);
}
private void startGuardedActivity() {
ActivityInfo activity = new ActivityInfo(genMonotonic(),_self.o,_self.self, newChannel(ParentScope.class));
instance(createActivity(activity));
instance(new TCONDINTERCEPT(activity.parent));
}
/**
* Intercepts the
* {@link ParentScope#completed(org.apache.ode.bpel.runtime.channels.FaultData, java.util.Set<org.apache.ode.bpel.runtime.CompensationHandler>)}
* call, to evaluate transition conditions before returning to the parent.
*/
private class TCONDINTERCEPT extends BpelJacobRunnable {
private static final long serialVersionUID = 4014873396828400441L;
ParentScope _in;
public TCONDINTERCEPT(ParentScope in) {
_in = in;
}
public void run() {
object(new ReceiveProcess() {
private static final long serialVersionUID = 2667359535900385952L;
}.setChannel(_in).setReceiver(new ParentScope() {
public void compensate(OScope scope, Synch ret) {
_self.parent.compensate(scope,ret);
instance(TCONDINTERCEPT.this);
}
public void completed(FaultData faultData, Set<CompensationHandler> compensations) {
sendEvent(new ActivityExecEndEvent());
if (faultData != null) {
dpe(_oactivity.getSourceLinks());
_self.parent.completed(faultData, compensations);
} else {
FaultData fault = null;
for (Iterator<OLink> i = _oactivity.getSourceLinks().iterator();i.hasNext();) {
OLink olink = i.next();
LinkInfo linfo = _linkFrame.resolve(olink);
try {
boolean val = evaluateTransitionCondition(olink.getTransitionCondition());
linfo.pub.linkStatus(val);
} catch (FaultException e) {
linfo.pub.linkStatus(false);
__log.error("",e);
if (fault == null)
fault = createFault(e.getQName(),olink.getTransitionCondition());
}
}
_self.parent.completed(fault, compensations);
}
}
public void cancelled() {
sendEvent(new ActivityExecEndEvent());
dpe(_oactivity.getOutgoingLinks());
dpe(_oactivity.getSourceLinks());
// Implicit scope can tell the difference between cancelled and completed.
_self.parent.cancelled();
}
private OFailureHandling getFailureHandling() {
if (_oactivity instanceof OInvoke) {
OInvoke _oinvoke = (OInvoke) _oactivity;
OFailureHandling f = getBpelRuntimeContext().getConfigForPartnerLink(_oinvoke.getPartnerLink()).failureHandling;
if (f != null) return f;
}
return _oactivity.getFailureHandling();
}
public void failure(String reason, Element data) {
if (_failure == null)
_failure = new ActivityFailure();
_failure.dateTime = new Date();
_failure.reason = reason;
_failure.data = data;
OFailureHandling failureHandling = getFailureHandling();
if (failureHandling != null && failureHandling.isFaultOnFailure() && _failure.retryCount >= failureHandling.getRetryFor()) {
//Fault after retries (may be 0)
if (__log.isDebugEnabled())
__log.debug("ActivityRecovery: Activity " + _self.aId + " faulting on failure");
FaultData faultData = createFault(OFailureHandling.FAILURE_FAULT_NAME, _oactivity, reason);
completed(faultData, CompensationHandler.emptySet());
return;
}
if (failureHandling == null || _failure.retryCount >= failureHandling.getRetryFor()) {
requireRecovery();
return;
}
if (__log.isDebugEnabled())
__log.debug("ActivityRecovery: Retrying activity " + _self.aId);
Date future = new Date(new Date().getTime() +
(failureHandling == null ? 0L : failureHandling.getRetryDelay() * 1000));
final TimerResponse timerChannel = newChannel(TimerResponse.class);
getBpelRuntimeContext().registerTimer(timerChannel, future);
object(false, new ReceiveProcess() {
private static final long serialVersionUID = -261911108068231376L;
}.setChannel(timerChannel).setReceiver(new TimerResponse() {
public void onTimeout() {
++_failure.retryCount;
startGuardedActivity();
}
public void onCancel() {
requireRecovery();
}
}));
}
private void requireRecovery() {
if (__log.isDebugEnabled())
__log.debug("ActivityRecovery: Activity " + _self.aId + " requires recovery");
sendEvent(new ActivityFailureEvent(_failure.reason));
final ActivityRecovery recoveryChannel = newChannel(ActivityRecovery.class);
getBpelRuntimeContext().registerActivityForRecovery(
recoveryChannel, _self.aId, _failure.reason, _failure.dateTime, _failure.data,
new String[] { "retry", "cancel", "fault" }, _failure.retryCount);
object(false, compose(new ReceiveProcess() {
private static final long serialVersionUID = 8397883882810521685L;
}.setChannel(recoveryChannel).setReceiver(new ActivityRecovery() {
public void retry() {
if (__log.isDebugEnabled())
__log.debug("ActivityRecovery: Retrying activity " + _self.aId + " (user initiated)");
sendEvent(new ActivityRecoveryEvent("retry"));
getBpelRuntimeContext().unregisterActivityForRecovery(recoveryChannel);
++_failure.retryCount;
startGuardedActivity();
}
public void cancel() {
if (__log.isDebugEnabled())
__log.debug("ActivityRecovery: Cancelling activity " + _self.aId + " (user initiated)");
sendEvent(new ActivityRecoveryEvent("cancel"));
getBpelRuntimeContext().unregisterActivityForRecovery(recoveryChannel);
cancelled();
}
public void fault(FaultData faultData) {
if (__log.isDebugEnabled())
__log.debug("ActivityRecovery: Faulting activity " + _self.aId + " (user initiated)");
sendEvent(new ActivityRecoveryEvent("fault"));
getBpelRuntimeContext().unregisterActivityForRecovery(recoveryChannel);
if (faultData == null)
faultData = createFault(OFailureHandling.FAILURE_FAULT_NAME, _self.o, _failure.reason);
completed(faultData, CompensationHandler.emptySet());
}
})).or(new ReceiveProcess() {
private static final long serialVersionUID = 2148587381204858397L;
}.setChannel(_self.self).setReceiver(new Termination() {
public void terminate() {
if (__log.isDebugEnabled())
__log.debug("ActivityRecovery: Cancelling activity " + _self.aId + " (terminated by scope)");
getBpelRuntimeContext().unregisterActivityForRecovery(recoveryChannel);
cancelled();
}
})));
}
}));
}
}
static class ActivityFailure implements Serializable {
private static final long serialVersionUID = 1L;
Date dateTime;
String reason;
Element data;
int retryCount;
}
}