| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| */ |
| package org.apache.ode.bpel.runtime; |
| |
| import java.io.Serializable; |
| import java.util.Collections; |
| import java.util.HashMap; |
| import java.util.HashSet; |
| import java.util.LinkedList; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.Set; |
| |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| import org.apache.ode.bpel.obj.OLink; |
| import org.apache.ode.bpel.obj.OScope; |
| import org.apache.ode.bpel.obj.OScope.Variable; |
| import org.apache.ode.bpel.runtime.channels.FaultData; |
| import org.apache.ode.bpel.runtime.channels.LinkStatus; |
| import org.apache.ode.bpel.runtime.channels.ParentScope; |
| import org.apache.ode.bpel.runtime.channels.ReadWriteLock; |
| import org.apache.ode.jacob.CompositeProcess; |
| import org.apache.ode.jacob.ProcessUtil; |
| import org.apache.ode.jacob.ReceiveProcess; |
| import org.apache.ode.jacob.Synch; |
| import org.apache.ode.jacob.Val; |
| import org.w3c.dom.Element; |
| |
| /** |
| * A scope activity. The scope activity creates a new scope frame and proceeeds using the {@link SCOPE} template. |
| */ |
| public class SCOPEACT extends ACTIVITY { |
| private static final Logger __log = LoggerFactory.getLogger(SCOPEACT.class); |
| |
| private static final long serialVersionUID = -4593029783757994939L; |
| |
| public SCOPEACT(ActivityInfo self, ScopeFrame scopeFrame, LinkFrame linkFrame) { |
| super(self, scopeFrame, linkFrame); |
| } |
| |
| public void run() { |
| if (((OScope) _self.o).isIsolatedScope()) { |
| __log.debug("found ISOLATED scope, instance ISOLATEDGUARD"); |
| instance(new ISOLATEDGUARD(createLockList(), newChannel(Synch.class))); |
| |
| } else { |
| ScopeFrame newFrame = new ScopeFrame((OScope) _self.o, getBpelRuntimeContext().createScopeInstance( |
| _scopeFrame.scopeInstanceId, (OScope) _self.o), _scopeFrame, null); |
| |
| // Depending on whether we are ATOMIC or not, we'll need to create outgoing link status interceptors |
| LinkFrame linkframe; |
| if (((OScope) _self.o).isAtomicScope() && !_self.o.getOutgoingLinks().isEmpty()) { |
| Val linkInterceptorControl = newChannel(Val.class); |
| ParentScope psc = newChannel(ParentScope.class); |
| linkframe = createInterceptorLinkFrame(); |
| instance(new LINKSTATUSINTERCEPTOR(linkInterceptorControl,linkframe)); |
| instance(new UNLOCKER(psc, _self.parent, null, Collections.<IsolationLock>emptyList(), linkInterceptorControl)); |
| _self.parent = psc; |
| } else |
| linkframe = _linkFrame; |
| |
| instance(new SCOPE(_self, newFrame, linkframe)); |
| } |
| |
| } |
| |
| /** |
| * Create an ordered list of required locks that need to be acquired before the activity can execute. The list is ordered to |
| * prevent dead-lock. The method of ordering is not especially important, so long as the same method is always used. |
| * |
| * @return |
| */ |
| private List<IsolationLock> createLockList() { |
| LinkedList<IsolationLock> requiredLocks = new LinkedList<IsolationLock>(); |
| OScope o = ((OScope) _self.o); |
| |
| Set<Variable> vrs = new HashSet<Variable>(o.getVariableRd()); |
| vrs.addAll(o.getVariableWr()); |
| |
| for (Variable v : vrs) |
| requiredLocks.add(new IsolationLock(v, o.getVariableWr().contains(v), _scopeFrame.globals._varLocks.get(v))); |
| |
| // Very important, we must sort the locks to prevent deadlocks. |
| Collections.sort(requiredLocks); |
| |
| return requiredLocks; |
| } |
| |
| /** |
| * Create outgoing link interceptors. Necessary for ISOLATED and ATOMIC (non-standard ext) scopes. I.e we need to prevent the |
| * links from coming out until the scope completes successfully. |
| * |
| */ |
| private LinkFrame createInterceptorLinkFrame() { |
| LinkFrame newframe = new LinkFrame(_linkFrame); |
| for (OLink outlink : _self.o.getOutgoingLinks()) { |
| LinkInfo original = _linkFrame.resolve(outlink); |
| LinkStatus newchannel = newChannel(LinkStatus.class); |
| newframe.links.put(original.olink, new LinkInfo(original.olink, newchannel, newchannel)); |
| } |
| return newframe; |
| } |
| |
| /** |
| * Link Status interceptor. Used in ISOLATED and ATOMIC scopes to prevent links from getting out until its time. |
| * |
| * @author Maciej Szefler <mszefler at gmail dot com> |
| * |
| */ |
| private class LINKSTATUSINTERCEPTOR extends BpelJacobRunnable { |
| private static final long serialVersionUID = 3104008741240676253L; |
| |
| /** We'll listen here for notification that its ok to send links status out. */ |
| private final Val _self; |
| |
| private final LinkFrame _interceptedChannels; |
| |
| /** The statuses that have been received */ |
| private final Map<OLink, Boolean> _statuses = new HashMap<OLink, Boolean>(); |
| |
| /** NULL means defer links, TRUE means passthrough, FALSE means send FALSE */ |
| private Boolean _status; |
| |
| LINKSTATUSINTERCEPTOR(Val self, LinkFrame interceptedChannels) { |
| _self = self; |
| _interceptedChannels = interceptedChannels; |
| } |
| |
| @Override |
| public void run() { |
| |
| __log.debug("LINKSTATUSINTERCEPTOR: running "); |
| |
| CompositeProcess mlset = ProcessUtil.compose(null); |
| if (_status == null) |
| mlset.or(new ReceiveProcess() { |
| private static final long serialVersionUID = 5029554538593371750L; |
| }.setChannel(_self).setReceiver(new Val() { |
| /** Our owner will notify us when it becomes clear what to do with the links. */ |
| public void val(Object retVal) { |
| if (__log.isDebugEnabled()) { |
| __log.debug("LINKSTATUSINTERCEPTOR: status received " + retVal); |
| } |
| |
| _status = (Boolean) retVal; |
| for (OLink available : _statuses.keySet()) |
| _linkFrame.resolve(available).pub.linkStatus(_statuses.get(available) && _status); |
| |
| // Check if we still need to wait around for more links. |
| if (!isDone()) { |
| instance(LINKSTATUSINTERCEPTOR.this); |
| } |
| } |
| })); |
| |
| for (final Map.Entry<OLink, LinkInfo> m : _interceptedChannels.links.entrySet()) { |
| if (_statuses.containsKey(m.getKey())) |
| continue; |
| |
| mlset.or(new ReceiveProcess() { |
| private static final long serialVersionUID = 1568144473514091593L; |
| }.setChannel(m.getValue().pub).setReceiver(new LinkStatus() { |
| public void linkStatus(boolean value) { |
| _statuses.put(m.getKey(), value); |
| if (_status != null) { |
| _linkFrame.resolve(m.getKey()).pub.linkStatus(value && _status); |
| } |
| if (!isDone()) { |
| instance(LINKSTATUSINTERCEPTOR.this); |
| } |
| } |
| })); |
| } |
| |
| object(false, mlset); |
| } |
| |
| /** |
| * Did we get all the links we need? |
| * @return |
| */ |
| private boolean isDone() { |
| return (_statuses.keySet().size() < SCOPEACT.this._self.o.getOutgoingLinks().size()); |
| } |
| } |
| |
| |
| /** |
| * Guard for ISOLATED scopes to prevent start until all locks are acquired. |
| * |
| * @author Maciej Szefler <mszefler at gmail dot com> |
| * |
| */ |
| private class ISOLATEDGUARD extends BpelJacobRunnable { |
| |
| private static final long serialVersionUID = -5017579415744600900L; |
| |
| final List<IsolationLock> _locksNeeded; |
| |
| final LinkedList<IsolationLock> _locksAcquired = new LinkedList<IsolationLock>(); |
| |
| final Synch _synchChannel; |
| |
| ISOLATEDGUARD(List<IsolationLock> locks, Synch synchChannel) { |
| _locksNeeded = locks; |
| _synchChannel = synchChannel; |
| } |
| |
| @Override |
| public void run() { |
| if (_locksNeeded.isEmpty()) { |
| // acquired all locks. |
| if (__log.isDebugEnabled()) { |
| __log.debug("ISOLATIONGUARD: got all required locks: " + _locksAcquired); |
| } |
| |
| ScopeFrame newFrame = new ScopeFrame((OScope) _self.o, getBpelRuntimeContext().createScopeInstance( |
| _scopeFrame.scopeInstanceId, (OScope) _self.o), _scopeFrame, null); |
| |
| |
| final ParentScope parent = _self.parent; |
| _self.parent = newChannel(ParentScope.class); |
| Val lsi = newChannel(Val.class); |
| instance(new UNLOCKER(_self.parent, parent, _synchChannel, _locksAcquired, lsi)); |
| LinkFrame linkframe = createInterceptorLinkFrame(); |
| instance(new LINKSTATUSINTERCEPTOR(lsi,linkframe)); |
| instance(new SCOPE(_self, newFrame, linkframe)); |
| return; |
| } else { |
| if (__log.isDebugEnabled()) { |
| __log.debug("ISOLATIONGUARD: don't have all locks still need: " + _locksNeeded); |
| } |
| |
| // try to acquire the locks in sequence (IMPORTANT) not all at once. |
| IsolationLock il = _locksNeeded.get(0); |
| |
| if (il.writeLock) |
| il.lockChannel.writeLock(_synchChannel); |
| else |
| il.lockChannel.readLock(_synchChannel); |
| |
| object(new ReceiveProcess() { |
| private static final long serialVersionUID = 2857261074409098274L; |
| }.setChannel(_synchChannel).setReceiver(new Synch() { |
| public void ret() { |
| __log.debug("ISOLATIONGUARD: got lock: " + _locksNeeded.get(0)); |
| _locksAcquired.add(_locksNeeded.remove(0)); |
| instance(ISOLATEDGUARD.this); |
| } |
| })); |
| } |
| } |
| |
| } |
| |
| /** |
| * Interceptor that waits for the scope to finish and unlock the acquired locks. |
| * |
| * @author Maciej Szefler <mszefler at gmail dot com> |
| * |
| */ |
| private class UNLOCKER extends BpelJacobRunnable { |
| |
| private static final long serialVersionUID = -476393080609348172L; |
| |
| private final ParentScope _self; |
| |
| private final ParentScope _parent; |
| |
| private final Synch _synchChannel; |
| |
| private final List<IsolationLock> _locks; |
| |
| private final Val _linkStatusInterceptor; |
| |
| public UNLOCKER(ParentScope self, ParentScope parent, Synch synchChannel, |
| List<IsolationLock> locksAcquired, Val linkStatusInterceptor) { |
| _self = self; |
| _parent = parent; |
| _synchChannel = synchChannel; |
| _locks = locksAcquired; |
| _linkStatusInterceptor = linkStatusInterceptor; |
| } |
| |
| @Override |
| public void run() { |
| |
| __log.debug("running UNLOCKER"); |
| object(new ReceiveProcess() { |
| private static final long serialVersionUID = 1L; |
| }.setChannel(_self).setReceiver(new ParentScope() { |
| public void cancelled() { |
| _parent.cancelled(); |
| unlockAll(); |
| _linkStatusInterceptor.val(false); |
| // no more listening. |
| } |
| |
| public void compensate(OScope scope, Synch ret) { |
| _parent.compensate(scope, ret); |
| // keep listening |
| instance(UNLOCKER.this); |
| } |
| |
| public void completed(FaultData faultData, Set<CompensationHandler> compensations) { |
| _parent.completed(faultData, compensations); |
| _linkStatusInterceptor.val(faultData == null); |
| unlockAll(); |
| // no more listening |
| } |
| |
| public void failure(String reason, Element data) { |
| _parent.failure(reason, data); |
| _linkStatusInterceptor.val(false); |
| unlockAll(); |
| // no more listening |
| } |
| })); |
| } |
| |
| /** |
| * Unlock all the acquired locks. |
| * |
| */ |
| private void unlockAll() { |
| if (__log.isDebugEnabled()) { |
| __log.debug("UNLOCKER: unlockAll: " + _locks); |
| } |
| |
| if (((OScope)SCOPEACT.this._self.o).isAtomicScope()) |
| getBpelRuntimeContext().forceFlush(); |
| |
| for (IsolationLock il : _locks) |
| il.lockChannel.unlock(_synchChannel); |
| _locks.clear(); |
| } |
| |
| } |
| |
| /** |
| * Representation of a lock needed by an isolated scope. |
| * |
| * @author Maciej Szefler <mszefler at gmail dot com> |
| * |
| */ |
| private static class IsolationLock implements Comparable<IsolationLock>, Serializable { |
| private static final long serialVersionUID = 4214864393241172705L; |
| |
| OScope.Variable guardedObject; |
| |
| boolean writeLock; |
| |
| ReadWriteLock lockChannel; |
| |
| public IsolationLock(OScope.Variable go, boolean writeLock, ReadWriteLock channel) { |
| this.guardedObject = go; |
| this.writeLock = writeLock; |
| this.lockChannel = channel; |
| } |
| |
| public int compareTo(IsolationLock o) { |
| return guardedObject.getId() - o.guardedObject.getId(); |
| } |
| } |
| } |