| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| */ |
| |
| package org.apache.ode.bpel.runtime; |
| |
| import java.util.HashSet; |
| import java.util.Iterator; |
| import java.util.Set; |
| |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| import org.apache.ode.bpel.common.FaultException; |
| import org.apache.ode.bpel.evar.ExternalVariableModuleException; |
| import org.apache.ode.bpel.evt.VariableModificationEvent; |
| import org.apache.ode.bpel.explang.EvaluationException; |
| import org.apache.ode.bpel.obj.OExpression; |
| import org.apache.ode.bpel.obj.OForEach; |
| import org.apache.ode.bpel.obj.OScope; |
| import org.apache.ode.bpel.runtime.channels.FaultData; |
| import org.apache.ode.bpel.runtime.channels.ParentScope; |
| import org.apache.ode.bpel.runtime.channels.Termination; |
| import org.apache.ode.jacob.CompositeProcess; |
| import org.apache.ode.jacob.ProcessUtil; |
| import org.apache.ode.jacob.ReceiveProcess; |
| import org.apache.ode.jacob.Synch; |
| import org.apache.ode.utils.DOMUtils; |
| import org.apache.ode.utils.stl.FilterIterator; |
| import org.apache.ode.utils.stl.MemberOfFunction; |
| import org.w3c.dom.Document; |
| import org.w3c.dom.Element; |
| import org.w3c.dom.Node; |
| |
| public class FOREACH extends ACTIVITY { |
| |
| private static final long serialVersionUID = 1L; |
| private static final Logger __log = LoggerFactory.getLogger(FOREACH.class); |
| |
| private OForEach _oforEach; |
| private Set<ChildInfo> _children = new HashSet<ChildInfo>(); |
| private Set<CompensationHandler> _compHandlers = new HashSet<CompensationHandler>(); |
| private int _startCounter = -1; |
| private int _finalCounter = -1; |
| private int _currentCounter = -1; |
| private int _completedCounter = 0; |
| private int _completionCounter = -1; |
| |
| public FOREACH(ActivityInfo self, ScopeFrame frame, LinkFrame linkFrame) { |
| super(self,frame, linkFrame); |
| _oforEach = (OForEach) self.o; |
| } |
| |
| public void run() { |
| try { |
| _startCounter = evaluateCondition(_oforEach.getStartCounterValue()); |
| _finalCounter = evaluateCondition(_oforEach.getFinalCounterValue()); |
| if (_oforEach.getCompletionCondition() != null) { |
| _completionCounter = evaluateCondition(_oforEach.getCompletionCondition().getBranchCount()); |
| } |
| _currentCounter = _startCounter; |
| } catch (FaultException fe) { |
| __log.error("",fe); |
| _self.parent.completed(createFault(fe.getQName(), _self.o), _compHandlers); |
| return; |
| } |
| |
| // Checking for bpws:invalidBranchCondition when the counter limit is superior |
| // to the maximum number of children |
| if (_completionCounter > 0 && _completionCounter > _finalCounter - _startCounter) { |
| _self.parent.completed( |
| createFault(_oforEach.getOwner().getConstants().getQnInvalidBranchCondition(), _self.o), _compHandlers); |
| return; |
| } |
| |
| // There's really nothing to do |
| if (_finalCounter < _startCounter || _completionCounter == 0) { |
| _self.parent.completed(null, _compHandlers); |
| } else { |
| // If we're parrallel, starting all our child copies, otherwise one will suffice. |
| if (_oforEach.isParallel()) { |
| for (int m = _startCounter; m <= _finalCounter; m++) { |
| newChild(); |
| } |
| } else newChild(); |
| instance(new ACTIVE()); |
| } |
| } |
| |
| private class ACTIVE extends BpelJacobRunnable { |
| private static final long serialVersionUID = -5642862698981385732L; |
| |
| private FaultData _fault; |
| private boolean _terminateRequested = false; |
| |
| public void run() { |
| Iterator<ChildInfo> active = active(); |
| // Continuing as long as a child is active |
| if (active().hasNext()) { |
| CompositeProcess mlSet = ProcessUtil.compose(new ReceiveProcess() { |
| private static final long serialVersionUID = 2554750257484084466L; |
| }.setChannel(_self.self).setReceiver(new Termination() { |
| public void terminate() { |
| // Terminating all children before sepuku |
| for (Iterator<ChildInfo> i = active(); i.hasNext(); ) |
| replication(i.next().activity.self).terminate(); |
| _terminateRequested = true; |
| instance(ACTIVE.this); |
| } |
| })); |
| for (;active.hasNext();) { |
| // Checking out our children |
| final ChildInfo child = active.next(); |
| mlSet.or(new ReceiveProcess() { |
| private static final long serialVersionUID = -8027205709961438172L; |
| }.setChannel(child.activity.parent).setReceiver(new ParentScope() { |
| public void compensate(OScope scope, Synch ret) { |
| // Forward compensation to parent |
| _self.parent.compensate(scope, ret); |
| instance(ACTIVE.this); |
| } |
| |
| public void completed(FaultData faultData, Set<CompensationHandler> compensations) { |
| child.completed = true; |
| // |
| if (_completionCounter > 0 && _oforEach.getCompletionCondition().isSuccessfulBranchesOnly()) { |
| if (faultData != null) _completedCounter++; |
| } else _completedCounter++; |
| |
| _compHandlers.addAll(compensations); |
| |
| // Keeping the fault to let everybody know |
| if (faultData != null && _fault == null) { |
| _fault = faultData; |
| } |
| if (shouldContinue() && _fault == null && !_terminateRequested) { |
| // Everything fine. If parrallel, just let our children be, otherwise making a new child |
| if (!_oforEach.isParallel()) newChild(); |
| } else { |
| // Work is done or something wrong happened, children shouldn't continue |
| for (Iterator<ChildInfo> i = active(); i.hasNext(); ) |
| replication(i.next().activity.self).terminate(); |
| } |
| instance(ACTIVE.this); |
| } |
| |
| public void cancelled() { completed(null, CompensationHandler.emptySet()); } |
| public void failure(String reason, Element data) { completed(null, CompensationHandler.emptySet()); } |
| })); |
| } |
| object(false, mlSet); |
| } else { |
| // No children left, either because they've all been executed or because we |
| // had to make them stop. |
| _self.parent.completed(_fault, _compHandlers); |
| } |
| } |
| } |
| |
| private boolean shouldContinue() { |
| boolean stop = false; |
| if (_completionCounter > 0) { |
| stop = (_completedCounter >= _completionCounter) || stop; |
| } |
| stop = (_startCounter + _completedCounter > _finalCounter) || stop; |
| return !stop; |
| } |
| |
| private int evaluateCondition(OExpression condition) |
| throws FaultException { |
| try { |
| int cond = getBpelRuntimeContext().getExpLangRuntime(). |
| evaluateAsNumber(condition, getEvaluationContext()).intValue(); |
| |
| if (cond < 0) { |
| String msg = "ForEach counter was negative."; |
| __log.error(msg); |
| throw new FaultException(_oforEach.getOwner().getConstants().getQnInvalidExpressionValue(),msg); |
| } |
| |
| if (cond > Integer.MAX_VALUE) { |
| // FIXME: this is not entirely correct. ODE uses an signed integer but the |
| // value range for the counters are xs:unsignedInt, which do not fit in |
| // an integer. To keep byte code compatibility, we can't easily change the type |
| // of the fields. But we can probably live with the limitation to only |
| // support Integer.MAX_VALUE as a maximum instead of Integer.MAX_VALUE * 2 - 1. |
| String msg = "ForEach counter was too large."; |
| __log.error(msg); |
| throw new FaultException(_oforEach.getOwner().getConstants().getQnInvalidExpressionValue(),msg); |
| } |
| |
| return cond; |
| } catch (EvaluationException e) { |
| String msg = "ForEach counter value couldn't be evaluated as xs:unsignedInt."; |
| __log.error(msg, e); |
| throw new FaultException(_oforEach.getOwner().getConstants().getQnInvalidExpressionValue(),msg); |
| } |
| } |
| |
| private void newChild() { |
| ChildInfo child = new ChildInfo(new ActivityInfo(genMonotonic(), _oforEach.getInnerScope(), |
| newChannel(Termination.class), newChannel(ParentScope.class))); |
| _children.add(child); |
| |
| // Creating the current counter value node |
| Document doc = DOMUtils.newDocument(); |
| Node counterNode = doc.createTextNode(""+_currentCounter++); |
| |
| // Instantiating the scope directly to keep control of its scope frame, allows |
| // the introduction of the counter variable in there (monkey business that is). |
| ScopeFrame newFrame = new ScopeFrame( |
| _oforEach.getInnerScope(), getBpelRuntimeContext().createScopeInstance(_scopeFrame.scopeInstanceId, |
| _oforEach.getInnerScope()), _scopeFrame, null); |
| VariableInstance vinst = newFrame.resolve(_oforEach.getCounterVariable()); |
| |
| try { |
| initializeVariable(vinst, counterNode); |
| } catch (ExternalVariableModuleException e) { |
| __log.error("Exception while initializing external variable", e); |
| _self.parent.failure(e.toString(), null); |
| return; |
| } |
| |
| // Generating event |
| VariableModificationEvent se = new VariableModificationEvent(vinst.declaration.getName()); |
| se.setNewValue(counterNode); |
| if (_oforEach.getDebugInfo() != null) |
| se.setLineNo(_oforEach.getDebugInfo().getStartLine()); |
| sendEvent(se); |
| |
| instance(new SCOPE(child.activity, newFrame, _linkFrame)); |
| } |
| |
| public String toString() { |
| return "<T:Act:Flow:" + _oforEach.getName() + ">"; |
| } |
| |
| private Iterator<ChildInfo> active() { |
| return new FilterIterator<ChildInfo>(_children.iterator(), new MemberOfFunction<ChildInfo>() { |
| public boolean isMember(ChildInfo childInfo) { |
| return !childInfo.completed; |
| } |
| }); |
| } |
| |
| } |