blob: e6c0ce47963d4aa43ebe61261a1ea0b87d7faa1e [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.ode.bpel.engine;
import org.apache.ode.bpel.runtime.PartnerLinkInstance;
import org.apache.ode.bpel.runtime.Selector;
import org.apache.ode.utils.ObjectPrinter;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* <p>
* Manages receive/pick--reply matching. Keeps track of active pick/receive activities (i.e. those that have been
* reached in the script) and their association with a message exchange (for those receive/picks that have received
* a message). The purpose of this class is to 1) enable matching a reply activity to the corresponding receive/pick
* activity and 2) allow us to fault out message exchanges that have not been replied to when they go out of scope.
* </p>
* <p>
* Note, this class is only used for INBOUND synchronous (request-response) operations. None of this is necessary
* for asynchronous messages.
* </p>
*/
@Deprecated
public class OutstandingRequestManager implements Serializable {
private static final long serialVersionUID = -5556374398943757951L;
private static final Logger __log = LoggerFactory.getLogger(OutstandingRequestManager.class);
public final Map<RequestIdTuple, Entry> _byRid = new HashMap<RequestIdTuple, Entry>();
public final Map<String, Entry> _byChannel = new HashMap<String, Entry>();
int findConflict(Selector selectors[]) {
if (__log.isTraceEnabled()) {
__log.trace(ObjectPrinter.stringifyMethodEnter("findConflict", new Object[] { "selectors", selectors}) );
}
Set<RequestIdTuple> workingSet = new HashSet<RequestIdTuple>(_byRid.keySet());
for (int i = 0; i < selectors.length; ++i) {
if (selectors[i].oneWay) {
continue;
}
final RequestIdTuple rid = new RequestIdTuple(selectors[i].plinkInstance,selectors[i].opName, selectors[i].messageExchangeId);
if (workingSet.contains(rid)) {
return i;
}
workingSet.add(rid);
}
return -1;
}
/**
* Register a receive/pick with the manager. This occurs when the receive/pick is encountered in the processing of
* the BPEL script.
* @param pickResponseChannel response channel associated with this receive/pick
* @param selectors selectors for this receive/pick
*/
void register(String pickResponseChannel, Selector selectors[]) {
if (__log.isTraceEnabled()) {
__log.trace(ObjectPrinter.stringifyMethodEnter("register", new Object[] {
"pickResponseChannel", pickResponseChannel,
"selectors", selectors
}) );
}
if (_byChannel.containsKey(pickResponseChannel)) {
String errmsg = "INTERNAL ERROR: Duplicate ENTRY for RESPONSE CHANNEL " + pickResponseChannel;
__log.error(errmsg);
throw new IllegalArgumentException(errmsg);
}
Entry entry = new Entry(pickResponseChannel, selectors);
for (int i = 0 ; i < selectors.length; ++i) {
if (selectors[i].oneWay) {
continue;
}
final RequestIdTuple rid = new RequestIdTuple(selectors[i].plinkInstance,selectors[i].opName, selectors[i].messageExchangeId);
if (_byRid.containsKey(rid)) {
String errmsg = "INTERNAL ERROR: Duplicate ENTRY for RID " + rid;
__log.error(errmsg);
throw new IllegalStateException(errmsg);
}
_byRid.put(rid, entry);
}
_byChannel.put(pickResponseChannel, entry);
}
/**
* Cancel a previous registration.
* @see #register(String, Selector[])
* @param pickResponseChannel
*/
void cancel(String pickResponseChannel) {
if (__log.isTraceEnabled())
__log.trace(ObjectPrinter.stringifyMethodEnter("cancel", new Object[] {
"pickResponseChannel", pickResponseChannel
}) );
Entry entry = _byChannel.remove(pickResponseChannel);
if (entry != null) {
while(_byRid.values().remove(entry));
}
}
/**
* Associate a message exchange with a registered receive/pick. This happens when a message corresponding to the
* receive/pick is received by the system.
* @param pickResponseChannel
* @param mexRef
*/
void associate(String pickResponseChannel, String mexRef) {
if (__log.isTraceEnabled())
__log.trace(ObjectPrinter.stringifyMethodEnter("associate", new Object[] {
"pickResponseChannel", pickResponseChannel,
"mexRef", mexRef
}) );
Entry entry = _byChannel.get(pickResponseChannel);
if (entry == null) {
String errmsg = "INTERNAL ERROR: No ENTRY for RESPONSE CHANNEL " + pickResponseChannel;
__log.error(errmsg);
throw new IllegalArgumentException(errmsg);
}
// For pub-sub cases, an entry may already be associated with a message
// Hence, the sanity check shown below is no longer valid
// if (entry.mexRef != null) {
// String errmsg = "INTERNAL ERROR: Duplicate ASSOCIATION for CHANEL " + pickResponseChannel;
// __log.error(errmsg);
// throw new IllegalStateException(errmsg);
// }
entry.mexRef = mexRef;
}
/**
* Release the registration. This method is called when the reply activity sends a reply corresponding to the
* registration.
* @param plinkInstnace partner link
* @param opName operation
* @param mexId message exchange identifier IN THE BPEL SENSE OF THE TERM (i.e. a receive/reply disambiguator).
* @return message exchange identifier associated with the registration that matches the parameters
*/
public String release(PartnerLinkInstance plinkInstnace, String opName, String mexId) {
if (__log.isTraceEnabled())
__log.trace(ObjectPrinter.stringifyMethodEnter("release", new Object[] {
"plinkInstance", plinkInstnace,
"opName", opName,
"mexId", mexId
}) );
final RequestIdTuple rid = new RequestIdTuple(plinkInstnace,opName, mexId);
Entry entry = _byRid.get(rid);
if (entry == null) {
if (__log.isDebugEnabled()) {
__log.debug("==release: RID " + rid + " not found in " + _byRid);
}
return null;
}
while(_byChannel.values().remove(entry));
while(_byRid.values().remove(entry));
return entry.mexRef;
}
/**
* "Release" all outstanding incoming messages exchanges. Makes the object forget about
* the previous registrations
* @return a list of message exchange identifiers for message exchanges that were begun (receive/pick got a message)
* but not yet completed (reply not yet sent)
*/
public String[] releaseAll() {
if (__log.isTraceEnabled())
__log.trace(ObjectPrinter.stringifyMethodEnter("releaseAll", null) );
ArrayList<String> mexRefs = new ArrayList<String>();
for (Entry entry : _byChannel.values()) {
if (entry.mexRef!=null)
mexRefs.add(entry.mexRef);
}
_byChannel.values().clear();
_byRid.values().clear();
return mexRefs.toArray(new String[mexRefs.size()]);
}
public String toString() {
return ObjectPrinter.toString(this, new Object[] {
"byRid", _byRid,
"byChannel", _byChannel
});
}
public Map<RequestIdTuple, Entry> getRids() {
return _byRid;
}
/**
* Tuple identifying an outstanding request (i.e. a receive,pick, or onMessage on a
* synchronous operation needing a reply).
*/
public class RequestIdTuple implements Serializable {
private static final long serialVersionUID = -1059389611839777482L;
/** On which partner link it was received. */
public PartnerLinkInstance partnerLink;
/** Name of the operation. */
public String opName;
/** Message exchange identifier. */
public String mexId;
/** Constructor. */
private RequestIdTuple(PartnerLinkInstance partnerLink, String opName, String mexId) {
this.partnerLink = partnerLink;
this.opName = opName;
this.mexId = mexId == null ? "" : mexId;
}
public int hashCode() {
return this.partnerLink.hashCode() ^ this.opName.hashCode() ^ this.mexId.hashCode();
}
public boolean equals(Object obj) {
RequestIdTuple other = (RequestIdTuple) obj;
return other.partnerLink.equals(partnerLink) &&
other.opName.equals(opName) &&
other.mexId.equals(mexId);
}
public String toString() {
return ObjectPrinter.toString(this, new Object[] {
"partnerLink", partnerLink,
"opName", opName,
"mexId", mexId
});
}
}
public class Entry implements Serializable {
private static final long serialVersionUID = -583743124656582887L;
public final String pickResponseChannel;
public Object[] selectors;
public String mexRef;
private Entry(String pickResponseChannel, Selector[] selectors) {
this.pickResponseChannel = pickResponseChannel;
this.selectors = selectors;
}
public String toString() {
return ObjectPrinter.toString(this, new Object[] {
"pickResponseChannel", pickResponseChannel,
"selectors", selectors,
"mexRef", mexRef
});
}
}
}