| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| */ |
| package org.apache.ode.bpel.engine; |
| |
| import org.apache.ode.bpel.runtime.PartnerLinkInstance; |
| import org.apache.ode.bpel.runtime.Selector; |
| import org.apache.ode.utils.ObjectPrinter; |
| |
| import java.io.Serializable; |
| import java.util.ArrayList; |
| import java.util.Collection; |
| import java.util.HashMap; |
| import java.util.HashSet; |
| import java.util.Map; |
| import java.util.Set; |
| import org.apache.commons.logging.Log; |
| import org.apache.commons.logging.LogFactory; |
| |
| /** |
| * <p> |
| * Manages receive/pick--reply matching. Keeps track of active pick/receive activities (i.e. those that have been |
| * reached in the script) and their association with a message exchange (for those receive/picks that have received |
| * a message). The purpose of this class is to 1) enable matching a reply activity to the corresponding receive/pick |
| * activity and 2) allow us to fault out message exchanges that have not been replied to when they go out of scope. |
| * </p> |
| * <p> |
| * Note, this class is only used for INBOUND synchronous (request-response) operations. None of this is necessary |
| * for asynchronous messages. |
| * </p> |
| */ |
| @Deprecated |
| public class OutstandingRequestManager implements Serializable { |
| private static final long serialVersionUID = -5556374398943757951L; |
| |
| private static final Log __log = LogFactory.getLog(OutstandingRequestManager.class); |
| |
| public final Map<RequestIdTuple, Entry> _byRid = new HashMap<RequestIdTuple, Entry>(); |
| public final Map<String, Entry> _byChannel = new HashMap<String, Entry>(); |
| |
| int findConflict(Selector selectors[]) { |
| if (__log.isTraceEnabled()) { |
| __log.trace(ObjectPrinter.stringifyMethodEnter("findConflict", new Object[] { "selectors", selectors}) ); |
| } |
| |
| Set<RequestIdTuple> workingSet = new HashSet<RequestIdTuple>(_byRid.keySet()); |
| for (int i = 0; i < selectors.length; ++i) { |
| if (selectors[i].oneWay) { |
| continue; |
| } |
| final RequestIdTuple rid = new RequestIdTuple(selectors[i].plinkInstance,selectors[i].opName, selectors[i].messageExchangeId); |
| if (workingSet.contains(rid)) { |
| return i; |
| } |
| workingSet.add(rid); |
| } |
| return -1; |
| } |
| |
| /** |
| * Register a receive/pick with the manager. This occurs when the receive/pick is encountered in the processing of |
| * the BPEL script. |
| * @param pickResponseChannel response channel associated with this receive/pick |
| * @param selectors selectors for this receive/pick |
| */ |
| void register(String pickResponseChannel, Selector selectors[]) { |
| if (__log.isTraceEnabled()) { |
| __log.trace(ObjectPrinter.stringifyMethodEnter("register", new Object[] { |
| "pickResponseChannel", pickResponseChannel, |
| "selectors", selectors |
| }) ); |
| } |
| |
| if (_byChannel.containsKey(pickResponseChannel)) { |
| String errmsg = "INTERNAL ERROR: Duplicate ENTRY for RESPONSE CHANNEL " + pickResponseChannel; |
| __log.fatal(errmsg); |
| throw new IllegalArgumentException(errmsg); |
| } |
| |
| Entry entry = new Entry(pickResponseChannel, selectors); |
| for (int i = 0 ; i < selectors.length; ++i) { |
| if (selectors[i].oneWay) { |
| continue; |
| } |
| |
| final RequestIdTuple rid = new RequestIdTuple(selectors[i].plinkInstance,selectors[i].opName, selectors[i].messageExchangeId); |
| if (_byRid.containsKey(rid)) { |
| String errmsg = "INTERNAL ERROR: Duplicate ENTRY for RID " + rid; |
| __log.fatal(errmsg); |
| throw new IllegalStateException(errmsg); |
| } |
| _byRid.put(rid, entry); |
| } |
| |
| _byChannel.put(pickResponseChannel, entry); |
| } |
| |
| /** |
| * Cancel a previous registration. |
| * @see #register(String, Selector[]) |
| * @param pickResponseChannel |
| */ |
| void cancel(String pickResponseChannel) { |
| if (__log.isTraceEnabled()) |
| __log.trace(ObjectPrinter.stringifyMethodEnter("cancel", new Object[] { |
| "pickResponseChannel", pickResponseChannel |
| }) ); |
| |
| Entry entry = _byChannel.remove(pickResponseChannel); |
| if (entry != null) { |
| while(_byRid.values().remove(entry)); |
| } |
| } |
| |
| /** |
| * Associate a message exchange with a registered receive/pick. This happens when a message corresponding to the |
| * receive/pick is received by the system. |
| * @param pickResponseChannel |
| * @param mexRef |
| */ |
| void associate(String pickResponseChannel, String mexRef) { |
| if (__log.isTraceEnabled()) |
| __log.trace(ObjectPrinter.stringifyMethodEnter("associate", new Object[] { |
| "pickResponseChannel", pickResponseChannel, |
| "mexRef", mexRef |
| }) ); |
| |
| Entry entry = _byChannel.get(pickResponseChannel); |
| if (entry == null) { |
| String errmsg = "INTERNAL ERROR: No ENTRY for RESPONSE CHANNEL " + pickResponseChannel; |
| __log.fatal(errmsg); |
| throw new IllegalArgumentException(errmsg); |
| } |
| |
| // For pub-sub cases, an entry may already be associated with a message |
| // Hence, the sanity check shown below is no longer valid |
| // if (entry.mexRef != null) { |
| // String errmsg = "INTERNAL ERROR: Duplicate ASSOCIATION for CHANEL " + pickResponseChannel; |
| // __log.fatal(errmsg); |
| // throw new IllegalStateException(errmsg); |
| // } |
| |
| entry.mexRef = mexRef; |
| } |
| |
| /** |
| * Release the registration. This method is called when the reply activity sends a reply corresponding to the |
| * registration. |
| * @param plinkInstnace partner link |
| * @param opName operation |
| * @param mexId message exchange identifier IN THE BPEL SENSE OF THE TERM (i.e. a receive/reply disambiguator). |
| * @return message exchange identifier associated with the registration that matches the parameters |
| */ |
| public String release(PartnerLinkInstance plinkInstnace, String opName, String mexId) { |
| if (__log.isTraceEnabled()) |
| __log.trace(ObjectPrinter.stringifyMethodEnter("release", new Object[] { |
| "plinkInstance", plinkInstnace, |
| "opName", opName, |
| "mexId", mexId |
| }) ); |
| |
| final RequestIdTuple rid = new RequestIdTuple(plinkInstnace,opName, mexId); |
| Entry entry = _byRid.get(rid); |
| if (entry == null) { |
| if (__log.isDebugEnabled()) { |
| __log.debug("==release: RID " + rid + " not found in " + _byRid); |
| } |
| return null; |
| } |
| while(_byChannel.values().remove(entry)); |
| while(_byRid.values().remove(entry)); |
| return entry.mexRef; |
| } |
| |
| /** |
| * "Release" all outstanding incoming messages exchanges. Makes the object forget about |
| * the previous registrations |
| * @return a list of message exchange identifiers for message exchanges that were begun (receive/pick got a message) |
| * but not yet completed (reply not yet sent) |
| */ |
| public String[] releaseAll() { |
| if (__log.isTraceEnabled()) |
| __log.trace(ObjectPrinter.stringifyMethodEnter("releaseAll", null) ); |
| |
| ArrayList<String> mexRefs = new ArrayList<String>(); |
| for (Entry entry : _byChannel.values()) { |
| if (entry.mexRef!=null) |
| mexRefs.add(entry.mexRef); |
| } |
| _byChannel.values().clear(); |
| _byRid.values().clear(); |
| return mexRefs.toArray(new String[mexRefs.size()]); |
| } |
| |
| public String toString() { |
| return ObjectPrinter.toString(this, new Object[] { |
| "byRid", _byRid, |
| "byChannel", _byChannel |
| }); |
| } |
| |
| public Map<RequestIdTuple, Entry> getRids() { |
| return _byRid; |
| } |
| |
| /** |
| * Tuple identifying an outstanding request (i.e. a receive,pick, or onMessage on a |
| * synchronous operation needing a reply). |
| */ |
| public class RequestIdTuple implements Serializable { |
| private static final long serialVersionUID = -1059389611839777482L; |
| /** On which partner link it was received. */ |
| public PartnerLinkInstance partnerLink; |
| /** Name of the operation. */ |
| public String opName; |
| /** Message exchange identifier. */ |
| public String mexId; |
| |
| /** Constructor. */ |
| private RequestIdTuple(PartnerLinkInstance partnerLink, String opName, String mexId) { |
| this.partnerLink = partnerLink; |
| this.opName = opName; |
| this.mexId = mexId == null ? "" : mexId; |
| } |
| |
| public int hashCode() { |
| return this.partnerLink.hashCode() ^ this.opName.hashCode() ^ this.mexId.hashCode(); |
| } |
| |
| public boolean equals(Object obj) { |
| RequestIdTuple other = (RequestIdTuple) obj; |
| return other.partnerLink.equals(partnerLink) && |
| other.opName.equals(opName) && |
| other.mexId.equals(mexId); |
| } |
| |
| public String toString() { |
| return ObjectPrinter.toString(this, new Object[] { |
| "partnerLink", partnerLink, |
| "opName", opName, |
| "mexId", mexId |
| }); |
| } |
| } |
| |
| public class Entry implements Serializable { |
| private static final long serialVersionUID = -583743124656582887L; |
| public final String pickResponseChannel; |
| public Object[] selectors; |
| public String mexRef; |
| |
| private Entry(String pickResponseChannel, Selector[] selectors) { |
| this.pickResponseChannel = pickResponseChannel; |
| this.selectors = selectors; |
| } |
| |
| public String toString() { |
| return ObjectPrinter.toString(this, new Object[] { |
| "pickResponseChannel", pickResponseChannel, |
| "selectors", selectors, |
| "mexRef", mexRef |
| }); |
| } |
| } |
| } |