/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *    http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing,
 * software distributed under the License is distributed on an
 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 * KIND, either express or implied.  See the License for the
 * specific language governing permissions and limitations
 * under the License.
 */

package org.apache.ode.bpel.runtime;

import java.util.HashSet;
import java.util.Iterator;
import java.util.Set;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.ode.bpel.common.FaultException;
import org.apache.ode.bpel.evar.ExternalVariableModuleException;
import org.apache.ode.bpel.evt.VariableModificationEvent;
import org.apache.ode.bpel.explang.EvaluationException;
import org.apache.ode.bpel.obj.OExpression;
import org.apache.ode.bpel.obj.OForEach;
import org.apache.ode.bpel.obj.OScope;
import org.apache.ode.bpel.runtime.channels.FaultData;
import org.apache.ode.bpel.runtime.channels.ParentScope;
import org.apache.ode.bpel.runtime.channels.Termination;
import org.apache.ode.jacob.CompositeProcess;
import org.apache.ode.jacob.ProcessUtil;
import org.apache.ode.jacob.ReceiveProcess;
import org.apache.ode.jacob.Synch;
import org.apache.ode.utils.DOMUtils;
import org.apache.ode.utils.stl.FilterIterator;
import org.apache.ode.utils.stl.MemberOfFunction;
import org.w3c.dom.Document;
import org.w3c.dom.Element;
import org.w3c.dom.Node;

public class FOREACH extends ACTIVITY {

    private static final long serialVersionUID = 1L;
    private static final Logger __log = LoggerFactory.getLogger(FOREACH.class);

    private OForEach _oforEach;
    private Set<ChildInfo> _children = new HashSet<ChildInfo>();
    private Set<CompensationHandler> _compHandlers = new HashSet<CompensationHandler>();
    private int _startCounter = -1;
    private int _finalCounter = -1;
    private int _currentCounter = -1;
    private int _completedCounter = 0;
    private int _completionCounter = -1;

    public FOREACH(ActivityInfo self, ScopeFrame frame, LinkFrame linkFrame) {
        super(self,frame, linkFrame);
        _oforEach = (OForEach) self.o;
    }

    public void run() {
        try {
            _startCounter = evaluateCondition(_oforEach.getStartCounterValue());
            _finalCounter = evaluateCondition(_oforEach.getFinalCounterValue());
            if (_oforEach.getCompletionCondition() != null) {
                _completionCounter = evaluateCondition(_oforEach.getCompletionCondition().getBranchCount());
            }
            _currentCounter = _startCounter;
        } catch (FaultException fe) {
            __log.error("",fe);
            _self.parent.completed(createFault(fe.getQName(), _self.o), _compHandlers);
            return;
        }

        // Checking for bpws:invalidBranchCondition when the counter limit is superior
        // to the maximum number of children
        if (_completionCounter > 0 && _completionCounter > _finalCounter - _startCounter) {
            _self.parent.completed(
                    createFault(_oforEach.getOwner().getConstants().getQnInvalidBranchCondition(), _self.o), _compHandlers);
            return;
        }

        // There's really nothing to do
        if (_finalCounter < _startCounter || _completionCounter == 0) {
            _self.parent.completed(null, _compHandlers);
        } else {
            // If we're parrallel, starting all our child copies, otherwise one will suffice.
            if (_oforEach.isParallel()) {
                for (int m = _startCounter; m <= _finalCounter; m++) {
                    newChild();
                }
            } else newChild();
            instance(new ACTIVE());
        }
    }

    private class ACTIVE extends BpelJacobRunnable {
        private static final long serialVersionUID = -5642862698981385732L;

        private FaultData _fault;
        private boolean _terminateRequested = false;

        public void run() {
            Iterator<ChildInfo> active = active();
            // Continuing as long as a child is active
            if (active().hasNext()) {
                CompositeProcess mlSet = ProcessUtil.compose(new ReceiveProcess() {
                    private static final long serialVersionUID = 2554750257484084466L;
                }.setChannel(_self.self).setReceiver(new Termination() {
                    public void terminate() {
                        // Terminating all children before sepuku
                        for (Iterator<ChildInfo> i = active(); i.hasNext(); )
                            replication(i.next().activity.self).terminate();
                        _terminateRequested = true;
                        instance(ACTIVE.this);
                    }
                }));
                for (;active.hasNext();) {
                    // Checking out our children
                    final ChildInfo child = active.next();
                    mlSet.or(new ReceiveProcess() {
                        private static final long serialVersionUID = -8027205709961438172L;
                    }.setChannel(child.activity.parent).setReceiver(new ParentScope() {
                        public void compensate(OScope scope, Synch ret) {
                            // Forward compensation to parent
                            _self.parent.compensate(scope, ret);
                            instance(ACTIVE.this);
                        }

                        public void completed(FaultData faultData, Set<CompensationHandler> compensations) {
                            child.completed = true;
                            //
                            if (_completionCounter > 0 && _oforEach.getCompletionCondition().isSuccessfulBranchesOnly()) {
                                if (faultData != null) _completedCounter++;
                            } else _completedCounter++;

                            _compHandlers.addAll(compensations);

                            // Keeping the fault to let everybody know
                            if (faultData != null && _fault == null) {
                                _fault = faultData;
                            }
                            if (shouldContinue() && _fault == null && !_terminateRequested) {
                                // Everything fine. If parrallel, just let our children be, otherwise making a new child
                                if (!_oforEach.isParallel()) newChild();
                            } else {
                                // Work is done or something wrong happened, children shouldn't continue
                                for (Iterator<ChildInfo> i = active(); i.hasNext(); )
                                    replication(i.next().activity.self).terminate();
                            }
                            instance(ACTIVE.this);
                        }

                        public void cancelled() { completed(null, CompensationHandler.emptySet()); }
                        public void failure(String reason, Element data) { completed(null, CompensationHandler.emptySet()); }
                    }));
                }
                object(false, mlSet);
            } else {
                // No children left, either because they've all been executed or because we
                // had to make them stop.
                _self.parent.completed(_fault, _compHandlers);
            }
        }
    }

    private boolean shouldContinue() {
        boolean stop = false;
        if (_completionCounter > 0) {
            stop = (_completedCounter >= _completionCounter) || stop;
        }
        stop = (_startCounter + _completedCounter > _finalCounter) || stop;
        return !stop;
    }

    private int evaluateCondition(OExpression condition)
            throws FaultException {
        try {
            int cond = getBpelRuntimeContext().getExpLangRuntime().
                    evaluateAsNumber(condition, getEvaluationContext()).intValue();
            
            if (cond < 0) {
                String msg = "ForEach counter was negative.";
                __log.error(msg);
                throw new FaultException(_oforEach.getOwner().getConstants().getQnInvalidExpressionValue(),msg);
            }
            
            if (cond > Integer.MAX_VALUE) {
                // FIXME: this is not entirely correct. ODE uses an signed integer but the
                // value range for the counters are xs:unsignedInt, which do not fit in
                // an integer. To keep byte code compatibility, we can't easily change the type
                // of the fields. But we can probably live with the limitation to only
                // support Integer.MAX_VALUE as a maximum instead of Integer.MAX_VALUE * 2 - 1.
                String msg = "ForEach counter was too large.";
                __log.error(msg);
                throw new FaultException(_oforEach.getOwner().getConstants().getQnInvalidExpressionValue(),msg);
            }

            return cond;
        } catch (EvaluationException e) {
            String msg = "ForEach counter value couldn't be evaluated as xs:unsignedInt.";
            __log.error(msg, e);
            throw new FaultException(_oforEach.getOwner().getConstants().getQnInvalidExpressionValue(),msg);
        }
    }

    private void newChild() {
        ChildInfo child = new ChildInfo(new ActivityInfo(genMonotonic(), _oforEach.getInnerScope(),
                newChannel(Termination.class), newChannel(ParentScope.class)));
        _children.add(child);

        // Creating the current counter value node
        Document doc = DOMUtils.newDocument();
        Node counterNode = doc.createTextNode(""+_currentCounter++);

        // Instantiating the scope directly to keep control of its scope frame, allows
        // the introduction of the counter variable in there (monkey business that is).
        ScopeFrame newFrame = new ScopeFrame(
                _oforEach.getInnerScope(), getBpelRuntimeContext().createScopeInstance(_scopeFrame.scopeInstanceId,
                _oforEach.getInnerScope()), _scopeFrame, null);
        VariableInstance vinst = newFrame.resolve(_oforEach.getCounterVariable());

        try {
        initializeVariable(vinst, counterNode);
        } catch (ExternalVariableModuleException e) {
            __log.error("Exception while initializing external variable", e);
            _self.parent.failure(e.toString(), null);
            return;
        }

        // Generating event
        VariableModificationEvent se = new VariableModificationEvent(vinst.declaration.getName());
        se.setNewValue(counterNode);
        if (_oforEach.getDebugInfo() != null)
            se.setLineNo(_oforEach.getDebugInfo().getStartLine());
        sendEvent(se);

        instance(new SCOPE(child.activity, newFrame, _linkFrame));
    }

    public String toString() {
        return "<T:Act:Flow:" + _oforEach.getName() + ">";
    }

    private Iterator<ChildInfo> active() {
        return new FilterIterator<ChildInfo>(_children.iterator(), new MemberOfFunction<ChildInfo>() {
            public boolean isMember(ChildInfo childInfo) {
                return !childInfo.completed;
            }
        });
    }

}
