blob: 0fe023211e14e0df0c09726b145e72c7f8869aff [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.ode.bpel.runtime;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Set;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.ode.bpel.common.FaultException;
import org.apache.ode.bpel.evar.ExternalVariableModuleException;
import org.apache.ode.bpel.evt.VariableModificationEvent;
import org.apache.ode.bpel.explang.EvaluationException;
import org.apache.ode.bpel.obj.OExpression;
import org.apache.ode.bpel.obj.OForEach;
import org.apache.ode.bpel.obj.OScope;
import org.apache.ode.bpel.runtime.channels.FaultData;
import org.apache.ode.bpel.runtime.channels.ParentScope;
import org.apache.ode.bpel.runtime.channels.Termination;
import org.apache.ode.jacob.CompositeProcess;
import org.apache.ode.jacob.ProcessUtil;
import org.apache.ode.jacob.ReceiveProcess;
import org.apache.ode.jacob.Synch;
import org.apache.ode.utils.DOMUtils;
import org.apache.ode.utils.stl.FilterIterator;
import org.apache.ode.utils.stl.MemberOfFunction;
import org.w3c.dom.Document;
import org.w3c.dom.Element;
import org.w3c.dom.Node;
public class FOREACH extends ACTIVITY {
private static final long serialVersionUID = 1L;
private static final Logger __log = LoggerFactory.getLogger(FOREACH.class);
private OForEach _oforEach;
private Set<ChildInfo> _children = new HashSet<ChildInfo>();
private Set<CompensationHandler> _compHandlers = new HashSet<CompensationHandler>();
private int _startCounter = -1;
private int _finalCounter = -1;
private int _currentCounter = -1;
private int _completedCounter = 0;
private int _completionCounter = -1;
public FOREACH(ActivityInfo self, ScopeFrame frame, LinkFrame linkFrame) {
super(self,frame, linkFrame);
_oforEach = (OForEach) self.o;
}
public void run() {
try {
_startCounter = evaluateCondition(_oforEach.getStartCounterValue());
_finalCounter = evaluateCondition(_oforEach.getFinalCounterValue());
if (_oforEach.getCompletionCondition() != null) {
_completionCounter = evaluateCondition(_oforEach.getCompletionCondition().getBranchCount());
}
_currentCounter = _startCounter;
} catch (FaultException fe) {
__log.error("",fe);
_self.parent.completed(createFault(fe.getQName(), _self.o), _compHandlers);
return;
}
// Checking for bpws:invalidBranchCondition when the counter limit is superior
// to the maximum number of children
if (_completionCounter > 0 && _completionCounter > _finalCounter - _startCounter) {
_self.parent.completed(
createFault(_oforEach.getOwner().getConstants().getQnInvalidBranchCondition(), _self.o), _compHandlers);
return;
}
// There's really nothing to do
if (_finalCounter < _startCounter || _completionCounter == 0) {
_self.parent.completed(null, _compHandlers);
} else {
// If we're parrallel, starting all our child copies, otherwise one will suffice.
if (_oforEach.isParallel()) {
for (int m = _startCounter; m <= _finalCounter; m++) {
newChild();
}
} else newChild();
instance(new ACTIVE());
}
}
private class ACTIVE extends BpelJacobRunnable {
private static final long serialVersionUID = -5642862698981385732L;
private FaultData _fault;
private boolean _terminateRequested = false;
public void run() {
Iterator<ChildInfo> active = active();
// Continuing as long as a child is active
if (active().hasNext()) {
CompositeProcess mlSet = ProcessUtil.compose(new ReceiveProcess() {
private static final long serialVersionUID = 2554750257484084466L;
}.setChannel(_self.self).setReceiver(new Termination() {
public void terminate() {
// Terminating all children before sepuku
for (Iterator<ChildInfo> i = active(); i.hasNext(); )
replication(i.next().activity.self).terminate();
_terminateRequested = true;
instance(ACTIVE.this);
}
}));
for (;active.hasNext();) {
// Checking out our children
final ChildInfo child = active.next();
mlSet.or(new ReceiveProcess() {
private static final long serialVersionUID = -8027205709961438172L;
}.setChannel(child.activity.parent).setReceiver(new ParentScope() {
public void compensate(OScope scope, Synch ret) {
// Forward compensation to parent
_self.parent.compensate(scope, ret);
instance(ACTIVE.this);
}
public void completed(FaultData faultData, Set<CompensationHandler> compensations) {
child.completed = true;
//
if (_completionCounter > 0 && _oforEach.getCompletionCondition().isSuccessfulBranchesOnly()) {
if (faultData != null) _completedCounter++;
} else _completedCounter++;
_compHandlers.addAll(compensations);
// Keeping the fault to let everybody know
if (faultData != null && _fault == null) {
_fault = faultData;
}
if (shouldContinue() && _fault == null && !_terminateRequested) {
// Everything fine. If parrallel, just let our children be, otherwise making a new child
if (!_oforEach.isParallel()) newChild();
} else {
// Work is done or something wrong happened, children shouldn't continue
for (Iterator<ChildInfo> i = active(); i.hasNext(); )
replication(i.next().activity.self).terminate();
}
instance(ACTIVE.this);
}
public void cancelled() { completed(null, CompensationHandler.emptySet()); }
public void failure(String reason, Element data) { completed(null, CompensationHandler.emptySet()); }
}));
}
object(false, mlSet);
} else {
// No children left, either because they've all been executed or because we
// had to make them stop.
_self.parent.completed(_fault, _compHandlers);
}
}
}
private boolean shouldContinue() {
boolean stop = false;
if (_completionCounter > 0) {
stop = (_completedCounter >= _completionCounter) || stop;
}
stop = (_startCounter + _completedCounter > _finalCounter) || stop;
return !stop;
}
private int evaluateCondition(OExpression condition)
throws FaultException {
try {
int cond = getBpelRuntimeContext().getExpLangRuntime().
evaluateAsNumber(condition, getEvaluationContext()).intValue();
if (cond < 0) {
String msg = "ForEach counter was negative.";
__log.error(msg);
throw new FaultException(_oforEach.getOwner().getConstants().getQnInvalidExpressionValue(),msg);
}
if (cond > Integer.MAX_VALUE) {
// FIXME: this is not entirely correct. ODE uses an signed integer but the
// value range for the counters are xs:unsignedInt, which do not fit in
// an integer. To keep byte code compatibility, we can't easily change the type
// of the fields. But we can probably live with the limitation to only
// support Integer.MAX_VALUE as a maximum instead of Integer.MAX_VALUE * 2 - 1.
String msg = "ForEach counter was too large.";
__log.error(msg);
throw new FaultException(_oforEach.getOwner().getConstants().getQnInvalidExpressionValue(),msg);
}
return cond;
} catch (EvaluationException e) {
String msg = "ForEach counter value couldn't be evaluated as xs:unsignedInt.";
__log.error(msg, e);
throw new FaultException(_oforEach.getOwner().getConstants().getQnInvalidExpressionValue(),msg);
}
}
private void newChild() {
ChildInfo child = new ChildInfo(new ActivityInfo(genMonotonic(), _oforEach.getInnerScope(),
newChannel(Termination.class), newChannel(ParentScope.class)));
_children.add(child);
// Creating the current counter value node
Document doc = DOMUtils.newDocument();
Node counterNode = doc.createTextNode(""+_currentCounter++);
// Instantiating the scope directly to keep control of its scope frame, allows
// the introduction of the counter variable in there (monkey business that is).
ScopeFrame newFrame = new ScopeFrame(
_oforEach.getInnerScope(), getBpelRuntimeContext().createScopeInstance(_scopeFrame.scopeInstanceId,
_oforEach.getInnerScope()), _scopeFrame, null);
VariableInstance vinst = newFrame.resolve(_oforEach.getCounterVariable());
try {
initializeVariable(vinst, counterNode);
} catch (ExternalVariableModuleException e) {
__log.error("Exception while initializing external variable", e);
_self.parent.failure(e.toString(), null);
return;
}
// Generating event
VariableModificationEvent se = new VariableModificationEvent(vinst.declaration.getName());
se.setNewValue(counterNode);
if (_oforEach.getDebugInfo() != null)
se.setLineNo(_oforEach.getDebugInfo().getStartLine());
sendEvent(se);
instance(new SCOPE(child.activity, newFrame, _linkFrame));
}
public String toString() {
return "<T:Act:Flow:" + _oforEach.getName() + ">";
}
private Iterator<ChildInfo> active() {
return new FilterIterator<ChildInfo>(_children.iterator(), new MemberOfFunction<ChildInfo>() {
public boolean isMember(ChildInfo childInfo) {
return !childInfo.completed;
}
});
}
}