blob: 4f5c86cd2cd34e92c040277edf44b65c69987e89 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.ode.bpel.runtime;
import java.io.Serializable;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.ode.bpel.obj.OLink;
import org.apache.ode.bpel.obj.OScope;
import org.apache.ode.bpel.obj.OScope.Variable;
import org.apache.ode.bpel.runtime.channels.FaultData;
import org.apache.ode.bpel.runtime.channels.LinkStatus;
import org.apache.ode.bpel.runtime.channels.ParentScope;
import org.apache.ode.bpel.runtime.channels.ReadWriteLock;
import org.apache.ode.jacob.CompositeProcess;
import org.apache.ode.jacob.ProcessUtil;
import org.apache.ode.jacob.ReceiveProcess;
import org.apache.ode.jacob.Synch;
import org.apache.ode.jacob.Val;
import org.w3c.dom.Element;
/**
* A scope activity. The scope activity creates a new scope frame and proceeeds using the {@link SCOPE} template.
*/
public class SCOPEACT extends ACTIVITY {
private static final Logger __log = LoggerFactory.getLogger(SCOPEACT.class);
private static final long serialVersionUID = -4593029783757994939L;
public SCOPEACT(ActivityInfo self, ScopeFrame scopeFrame, LinkFrame linkFrame) {
super(self, scopeFrame, linkFrame);
}
public void run() {
if (((OScope) _self.o).isIsolatedScope()) {
__log.debug("found ISOLATED scope, instance ISOLATEDGUARD");
instance(new ISOLATEDGUARD(createLockList(), newChannel(Synch.class)));
} else {
ScopeFrame newFrame = new ScopeFrame((OScope) _self.o, getBpelRuntimeContext().createScopeInstance(
_scopeFrame.scopeInstanceId, (OScope) _self.o), _scopeFrame, null);
// Depending on whether we are ATOMIC or not, we'll need to create outgoing link status interceptors
LinkFrame linkframe;
if (((OScope) _self.o).isAtomicScope() && !_self.o.getOutgoingLinks().isEmpty()) {
Val linkInterceptorControl = newChannel(Val.class);
ParentScope psc = newChannel(ParentScope.class);
linkframe = createInterceptorLinkFrame();
instance(new LINKSTATUSINTERCEPTOR(linkInterceptorControl,linkframe));
instance(new UNLOCKER(psc, _self.parent, null, Collections.<IsolationLock>emptyList(), linkInterceptorControl));
_self.parent = psc;
} else
linkframe = _linkFrame;
instance(new SCOPE(_self, newFrame, linkframe));
}
}
/**
* Create an ordered list of required locks that need to be acquired before the activity can execute. The list is ordered to
* prevent dead-lock. The method of ordering is not especially important, so long as the same method is always used.
*
* @return
*/
private List<IsolationLock> createLockList() {
LinkedList<IsolationLock> requiredLocks = new LinkedList<IsolationLock>();
OScope o = ((OScope) _self.o);
Set<Variable> vrs = new HashSet<Variable>(o.getVariableRd());
vrs.addAll(o.getVariableWr());
for (Variable v : vrs)
requiredLocks.add(new IsolationLock(v, o.getVariableWr().contains(v), _scopeFrame.globals._varLocks.get(v)));
// Very important, we must sort the locks to prevent deadlocks.
Collections.sort(requiredLocks);
return requiredLocks;
}
/**
* Create outgoing link interceptors. Necessary for ISOLATED and ATOMIC (non-standard ext) scopes. I.e we need to prevent the
* links from coming out until the scope completes successfully.
*
*/
private LinkFrame createInterceptorLinkFrame() {
LinkFrame newframe = new LinkFrame(_linkFrame);
for (OLink outlink : _self.o.getOutgoingLinks()) {
LinkInfo original = _linkFrame.resolve(outlink);
LinkStatus newchannel = newChannel(LinkStatus.class);
newframe.links.put(original.olink, new LinkInfo(original.olink, newchannel, newchannel));
}
return newframe;
}
/**
* Link Status interceptor. Used in ISOLATED and ATOMIC scopes to prevent links from getting out until its time.
*
* @author Maciej Szefler <mszefler at gmail dot com>
*
*/
private class LINKSTATUSINTERCEPTOR extends BpelJacobRunnable {
private static final long serialVersionUID = 3104008741240676253L;
/** We'll listen here for notification that its ok to send links status out. */
private final Val _self;
private final LinkFrame _interceptedChannels;
/** The statuses that have been received */
private final Map<OLink, Boolean> _statuses = new HashMap<OLink, Boolean>();
/** NULL means defer links, TRUE means passthrough, FALSE means send FALSE */
private Boolean _status;
LINKSTATUSINTERCEPTOR(Val self, LinkFrame interceptedChannels) {
_self = self;
_interceptedChannels = interceptedChannels;
}
@Override
public void run() {
__log.debug("LINKSTATUSINTERCEPTOR: running ");
CompositeProcess mlset = ProcessUtil.compose(null);
if (_status == null)
mlset.or(new ReceiveProcess() {
private static final long serialVersionUID = 5029554538593371750L;
}.setChannel(_self).setReceiver(new Val() {
/** Our owner will notify us when it becomes clear what to do with the links. */
public void val(Object retVal) {
if (__log.isDebugEnabled()) {
__log.debug("LINKSTATUSINTERCEPTOR: status received " + retVal);
}
_status = (Boolean) retVal;
for (OLink available : _statuses.keySet())
_linkFrame.resolve(available).pub.linkStatus(_statuses.get(available) && _status);
// Check if we still need to wait around for more links.
if (!isDone()) {
instance(LINKSTATUSINTERCEPTOR.this);
}
}
}));
for (final Map.Entry<OLink, LinkInfo> m : _interceptedChannels.links.entrySet()) {
if (_statuses.containsKey(m.getKey()))
continue;
mlset.or(new ReceiveProcess() {
private static final long serialVersionUID = 1568144473514091593L;
}.setChannel(m.getValue().pub).setReceiver(new LinkStatus() {
public void linkStatus(boolean value) {
_statuses.put(m.getKey(), value);
if (_status != null) {
_linkFrame.resolve(m.getKey()).pub.linkStatus(value && _status);
}
if (!isDone()) {
instance(LINKSTATUSINTERCEPTOR.this);
}
}
}));
}
object(false, mlset);
}
/**
* Did we get all the links we need?
* @return
*/
private boolean isDone() {
return (_statuses.keySet().size() < SCOPEACT.this._self.o.getOutgoingLinks().size());
}
}
/**
* Guard for ISOLATED scopes to prevent start until all locks are acquired.
*
* @author Maciej Szefler <mszefler at gmail dot com>
*
*/
private class ISOLATEDGUARD extends BpelJacobRunnable {
private static final long serialVersionUID = -5017579415744600900L;
final List<IsolationLock> _locksNeeded;
final LinkedList<IsolationLock> _locksAcquired = new LinkedList<IsolationLock>();
final Synch _synchChannel;
ISOLATEDGUARD(List<IsolationLock> locks, Synch synchChannel) {
_locksNeeded = locks;
_synchChannel = synchChannel;
}
@Override
public void run() {
if (_locksNeeded.isEmpty()) {
// acquired all locks.
if (__log.isDebugEnabled()) {
__log.debug("ISOLATIONGUARD: got all required locks: " + _locksAcquired);
}
ScopeFrame newFrame = new ScopeFrame((OScope) _self.o, getBpelRuntimeContext().createScopeInstance(
_scopeFrame.scopeInstanceId, (OScope) _self.o), _scopeFrame, null);
final ParentScope parent = _self.parent;
_self.parent = newChannel(ParentScope.class);
Val lsi = newChannel(Val.class);
instance(new UNLOCKER(_self.parent, parent, _synchChannel, _locksAcquired, lsi));
LinkFrame linkframe = createInterceptorLinkFrame();
instance(new LINKSTATUSINTERCEPTOR(lsi,linkframe));
instance(new SCOPE(_self, newFrame, linkframe));
return;
} else {
if (__log.isDebugEnabled()) {
__log.debug("ISOLATIONGUARD: don't have all locks still need: " + _locksNeeded);
}
// try to acquire the locks in sequence (IMPORTANT) not all at once.
IsolationLock il = _locksNeeded.get(0);
if (il.writeLock)
il.lockChannel.writeLock(_synchChannel);
else
il.lockChannel.readLock(_synchChannel);
object(new ReceiveProcess() {
private static final long serialVersionUID = 2857261074409098274L;
}.setChannel(_synchChannel).setReceiver(new Synch() {
public void ret() {
__log.debug("ISOLATIONGUARD: got lock: " + _locksNeeded.get(0));
_locksAcquired.add(_locksNeeded.remove(0));
instance(ISOLATEDGUARD.this);
}
}));
}
}
}
/**
* Interceptor that waits for the scope to finish and unlock the acquired locks.
*
* @author Maciej Szefler <mszefler at gmail dot com>
*
*/
private class UNLOCKER extends BpelJacobRunnable {
private static final long serialVersionUID = -476393080609348172L;
private final ParentScope _self;
private final ParentScope _parent;
private final Synch _synchChannel;
private final List<IsolationLock> _locks;
private final Val _linkStatusInterceptor;
public UNLOCKER(ParentScope self, ParentScope parent, Synch synchChannel,
List<IsolationLock> locksAcquired, Val linkStatusInterceptor) {
_self = self;
_parent = parent;
_synchChannel = synchChannel;
_locks = locksAcquired;
_linkStatusInterceptor = linkStatusInterceptor;
}
@Override
public void run() {
__log.debug("running UNLOCKER");
object(new ReceiveProcess() {
private static final long serialVersionUID = 1L;
}.setChannel(_self).setReceiver(new ParentScope() {
public void cancelled() {
_parent.cancelled();
unlockAll();
_linkStatusInterceptor.val(false);
// no more listening.
}
public void compensate(OScope scope, Synch ret) {
_parent.compensate(scope, ret);
// keep listening
instance(UNLOCKER.this);
}
public void completed(FaultData faultData, Set<CompensationHandler> compensations) {
_parent.completed(faultData, compensations);
_linkStatusInterceptor.val(faultData == null);
unlockAll();
// no more listening
}
public void failure(String reason, Element data) {
_parent.failure(reason, data);
_linkStatusInterceptor.val(false);
unlockAll();
// no more listening
}
}));
}
/**
* Unlock all the acquired locks.
*
*/
private void unlockAll() {
if (__log.isDebugEnabled()) {
__log.debug("UNLOCKER: unlockAll: " + _locks);
}
if (((OScope)SCOPEACT.this._self.o).isAtomicScope())
getBpelRuntimeContext().forceFlush();
for (IsolationLock il : _locks)
il.lockChannel.unlock(_synchChannel);
_locks.clear();
}
}
/**
* Representation of a lock needed by an isolated scope.
*
* @author Maciej Szefler <mszefler at gmail dot com>
*
*/
private static class IsolationLock implements Comparable<IsolationLock>, Serializable {
private static final long serialVersionUID = 4214864393241172705L;
OScope.Variable guardedObject;
boolean writeLock;
ReadWriteLock lockChannel;
public IsolationLock(OScope.Variable go, boolean writeLock, ReadWriteLock channel) {
this.guardedObject = go;
this.writeLock = writeLock;
this.lockChannel = channel;
}
public int compareTo(IsolationLock o) {
return guardedObject.getId() - o.guardedObject.getId();
}
}
}