blob: 69b4464b1791c56c7b1e0cda6abbe5abc7b66e59 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package flex.messaging;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.Set;
import java.util.TreeSet;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.CopyOnWriteArraySet;
import flex.messaging.client.FlexClient;
import flex.messaging.client.FlexClientOutboundQueueProcessor;
import flex.messaging.client.OutboundQueueThrottleManager;
import flex.messaging.config.ThrottleSettings;
import flex.messaging.config.ThrottleSettings.Policy;
import flex.messaging.log.LogCategories;
import flex.messaging.log.Log;
import flex.messaging.messages.AsyncMessage;
import flex.messaging.messages.CommandMessage;
import flex.messaging.messages.Message;
import flex.messaging.services.MessageService;
import flex.messaging.services.messaging.Subtopic;
import flex.messaging.util.ExceptionUtil;
import flex.messaging.util.TimeoutAbstractObject;
import flex.messaging.util.StringUtils;
/**
* Represents a client-side MessageAgent instance.
* Currently a server-side MessageClient is only created if its client-side counterpart has subscribed
* to a destination for pushed data (e.g. Consumer). Client-side Producers do not result in the creation of
* corresponding server-side MessageClient instances.
* <p>
* Client-side MessageAgents communicate with the server over a Channel that corresponds to a FlexSession.
* Server-side MessageClient instances are always created in the context of a FlexSession and when the FlexSession
* is invalidated any associated MessageClients are invalidated as well.
* <p>
* MessageClients may also be timed out on a per-destination basis and this is based on subscription inactivity.
* If no messages are pushed to the MessageClient within the destination's subscription timeout period the
* MessageClient will be shutdown even if the associated FlexSession is still active and connected.
* Per-destination subscription timeout is an optional configuration setting, and should only be used when inactive
* subscriptions should be shut down opportunistically to preserve server resources.
*/
public class MessageClient extends TimeoutAbstractObject implements Serializable {
//--------------------------------------------------------------------------
//
// Public Static Variables
//
//--------------------------------------------------------------------------
/**
* Log category for MessageClient related messages.
*/
public static final String MESSAGE_CLIENT_LOG_CATEGORY = LogCategories.CLIENT_MESSAGECLIENT;
//--------------------------------------------------------------------------
//
// Static Constants
//
//--------------------------------------------------------------------------
/**
* Serializable to support broadcasting subscription state across the cluster for
* optimized message routing.
*/
static final long serialVersionUID = 3730240451524954453L;
//--------------------------------------------------------------------------
//
// Static Variables
//
//--------------------------------------------------------------------------
/**
* The list of MessageClient created listeners.
*/
private static final CopyOnWriteArrayList<MessageClientListener> createdListeners = new CopyOnWriteArrayList<MessageClientListener>();
//--------------------------------------------------------------------------
//
// Static Methods
//
//--------------------------------------------------------------------------
/**
* Adds a MessageClient created listener.
*
* @param listener The listener to add.
* @see flex.messaging.MessageClientListener
*/
public static void addMessageClientCreatedListener(MessageClientListener listener) {
if (listener != null)
createdListeners.addIfAbsent(listener);
}
/**
* Removes a MessageClient created listener.
*
* @param listener The listener to remove.
* @see flex.messaging.MessageClientListener
*/
public static void removeMessageClientCreatedListener(MessageClientListener listener) {
if (listener != null)
createdListeners.remove(listener);
}
//--------------------------------------------------------------------------
//
// Private Static Methods
//
//--------------------------------------------------------------------------
/**
* Utility method.
*/
private static boolean equalStrings(String a, String b) {
return a == b || (a != null && a.equals(b));
}
/**
* Utility method.
*/
static int compareStrings(String a, String b) {
if (a == b)
return 0;
if (a != null && b != null)
return a.compareTo(b);
if (a == null)
return -1;
return 1;
}
//--------------------------------------------------------------------------
//
// Constructor
//
//--------------------------------------------------------------------------
/**
* Constructs a new MessageClient for local use.
*
* @param clientId The clientId for the MessageClient.
* @param destination The destination the MessageClient is subscribed to.
* @param endpointId The Id of the endpoint this MessageClient subscription was created over.
*/
public MessageClient(Object clientId, Destination destination, String endpointId) {
this(clientId, destination, endpointId, true);
}
/**
* Constructs a new MessageClient.
*
* @param clientId The clientId for the MessageClient.
* @param destination The destination the MessageClient is subscribed to.
* @param endpointId The Id of the endpoint this MessageClient subscription was created over.
* @param useSession RemoteMessageClient instances should not be associated with a FlexSession (pass false).
*/
public MessageClient(Object clientId, Destination destination, String endpointId, boolean useSession) {
valid = true;
this.clientId = clientId;
this.destination = destination;
this.endpointId = endpointId;
destinationId = destination.getId();
updateLastUse(); // Initialize last use timestamp to construct time.
/* If this is for a remote server, we do not associate with the session. */
if (useSession) {
flexSession = FlexContext.getFlexSession();
flexSession.registerMessageClient(this);
flexClient = FlexContext.getFlexClient();
flexClient.registerMessageClient(this);
// SubscriptionManager will notify the created listeners, once
// subscription state is setup completely.
// notifyCreatedListeners();
} else {
flexClient = null;
flexSession = null;
// Use an instance level lock.
lock = new Object();
// On a remote server we don't notify created listeners.
}
if (Log.isDebug())
Log.getLogger(MESSAGE_CLIENT_LOG_CATEGORY).debug("MessageClient created with clientId '" + this.clientId + "' for destination '" + destinationId + "'.");
}
//--------------------------------------------------------------------------
//
// Variables
//
//--------------------------------------------------------------------------
/**
* This flag is set to true when the client channel that this subscription was
* established over is disconnected.
* It supports cleaning up per-endpoint outbound queues maintained by the FlexClient.
* If the client notifies the server that its channel is disconnecting, the FlexClient
* does not need to maintain an outbound queue containing a subscription invalidation
* message for this MessageClient to send to the client.
*/
private volatile boolean clientChannelDisconnected;
/**
* The clientId for the MessageClient.
* This value is specified by the client directly or is autogenerated on the client.
*/
protected final Object clientId;
/**
* Internal reference to the associated Destination; don't expose this in the public API.
*/
protected final Destination destination;
/**
* The destination the MessageClient is subscribed to.
*/
protected final String destinationId;
/**
* The set of session destroy listeners to notify when the session is destroyed.
*/
private transient volatile CopyOnWriteArrayList destroyedListeners;
/**
* The Id for the endpoint this MessageClient subscription was created over.
*/
private String endpointId;
/**
* The FlexClient associated with the MessageClient.
*/
private final transient FlexClient flexClient;
/**
* The FlexSession associated with the MessageClient.
* Not final because this needs to be reset if the subscription fails over to a new endpoint.
*/
private transient FlexSession flexSession;
/**
* Flag used to break cycles during invalidation.
*/
private boolean invalidating;
/**
* The lock to use to guard all state changes for the MessageClient.
*/
protected Object lock = new Object();
/**
* Flag indicating whether the MessageClient is attempting to notify the remote client of
* its invalidation.
*/
private volatile boolean attemptingInvalidationClientNotification;
/**
* A counter used to control invalidation for a MessageClient that has multiple
* subscriptions to its destination.
* Unsubscribing from one will not invalidate the MessageClient as long as other
* subscriptions remain active.
*/
private transient int numReferences;
/**
* A set of all of the subscriptions managed by this message client.
*/
protected final Set<SubscriptionInfo> subscriptions = new CopyOnWriteArraySet<SubscriptionInfo>();
/**
* Flag indicating whether this client is valid or not.
*/
protected boolean valid;
/**
* Flag that indicates whether the MessageClient has a per-destination subscription timeout.
* If false, the MessageClient will remain valid until its associated FlexSession is invalidated.
*/
private volatile boolean willTimeout;
/**
* Has anyone explicitly registered this message client. This indicates that
* there is a reference to this MessageClient which is not an explicit subscription.
* This is a hook for FDMS and other adapters which want to use pushMessageToClients
* with clientIds but that do not want the subscription manager to manage subscriptions
* for them.
*/
private volatile boolean registered = false;
//--------------------------------------------------------------------------
//
// Public Methods
//
//--------------------------------------------------------------------------
/**
* Returns the clientId for the MessageClient.
*
* @return The clientId for the MessageClient.
*/
public Object getClientId() {
return clientId; // Field is final; no need to sync.
}
/**
* Returns the destination the MessageClient is subscribed to.
*
* @return The destination the MessageClient is subscribed to.
*/
public Destination getDestination() {
return destination; // Field is final; no need to sync.
}
/**
* Returns the id of the destination the MessageClient is subscribed to.
*
* @return The id of the destination the MessageClient is subscribed to.
*/
public String getDestinationId() {
return destinationId; // Field is final; no need to sync.
}
/**
* Returns the Id for the endpoint the MessageClient subscription was created over.
*
* @return The Id for the endpoint the MessageClient subscription was created over.
*/
public String getEndpointId() {
return endpointId; // Field is final; no need to sync.
}
/**
* Returns the FlexClient associated with this MessageClient.
*
* @return The FlexClient assocaited with this MessageClient.
*/
public FlexClient getFlexClient() {
return flexClient; // Field is final; no need to sync.
}
/**
* Returns the FlexSession associated with this MessageClient.
*
* @return The FlexSession associated with this MessageClient.
*/
public FlexSession getFlexSession() {
synchronized (lock) {
return flexSession;
}
}
/**
* Returns the number of subscriptions associated with this MessageClient.
*
* @return The number of subscriptions associated with this MessageClient.
*/
public int getSubscriptionCount() {
int count;
synchronized (lock) {
count = subscriptions != null ? subscriptions.size() : 0;
}
return count;
}
/**
* This is used for FlexClient outbound queue management. When a MessageClient is invalidated
* if it is attempting to notify the client, then we must leave the outbound queue containing
* the notification in place. Otherwise, any messages queued for the subscription may be
* removed from the queue and possibly shut down immediately.
*
* @return true if the MessageClient is currently trying to notify the client about it's invalidation.
*/
public boolean isAttemptingInvalidationClientNotification() {
return attemptingInvalidationClientNotification;
}
/**
* This is set to true when the MessageClient is invalidated due to the client
* channel the subscription was established over disconnecting.
* It allows the FlexClient class to cleanup the outbound queue for the channel's
* corresponding server endpoint for the remote client, because we know that no
* currently queued messages need to be retained for delivery.
*
* @param value true if the MessageClient is invalidated due to the client being disconnected
*/
public void setClientChannelDisconnected(boolean value) {
clientChannelDisconnected = value;
}
/**
* @return true if the MessageClient is invalidated due to the client being disconnected
*/
public boolean isClientChannelDisconnected() {
return clientChannelDisconnected;
}
/**
* This is true when some code other than the SubscriptionManager
* is maintaining subscriptions for this message client. It ensures
* that we have this MessageClient kept around until the session
* expires.
*/
public void setRegistered(boolean reg) {
registered = reg;
}
/**
*
*/
public boolean isRegistered() {
return registered;
}
/**
* Adds a MessageClient destroy listener.
*
* @param listener The listener to add.
* @see flex.messaging.MessageClientListener
*/
public void addMessageClientDestroyedListener(MessageClientListener listener) {
if (listener != null) {
checkValid();
if (destroyedListeners == null) {
synchronized (lock) {
if (destroyedListeners == null)
destroyedListeners = new CopyOnWriteArrayList();
}
}
destroyedListeners.addIfAbsent(listener);
}
}
/**
* Removes a MessageClient destroyed listener.
*
* @param listener The listener to remove.
* @see flex.messaging.MessageClientListener
*/
public void removeMessageClientDestroyedListener(MessageClientListener listener) {
// No need to check validity; removing a listener is always ok.
if (listener != null && destroyedListeners != null)
destroyedListeners.remove(listener);
}
/**
* Adds a subscription to the subscription set for this MessageClient.
*
* @param selector The selector expression used for the subscription.
* @param subtopic The subtopic used for the subscription.
* @param maxFrequency The maximum number of messages the client wants to
* receive per second (0 disables this limit).
*/
public void addSubscription(String selector, String subtopic, int maxFrequency) {
synchronized (lock) {
checkValid();
incrementReferences();
// Create and add the subscription to the subscriptions set.
SubscriptionInfo si = new SubscriptionInfo(selector, subtopic, maxFrequency);
subscriptions.add(si);
registerSubscriptionWithThrottleManager(si);
}
}
/**
* Registers the subscription with the outbound queue processor's throttle
* manager, if one exists.
*
* @param si The subscription info object.
*/
public void registerSubscriptionWithThrottleManager(SubscriptionInfo si) {
// Register the destination that will setup client level outbound throttling.
ThrottleSettings ts = destination.getNetworkSettings().getThrottleSettings();
if (ts.getOutboundPolicy() != Policy.NONE && (ts.isOutboundClientThrottleEnabled() || si.maxFrequency > 0)) {
// Setup the client level outbound throttling, and register the destination
// only if the policy is not NONE, and a throttling limit is specified
// either at the destination or by consumer.
OutboundQueueThrottleManager throttleManager = getThrottleManager(true);
if (throttleManager != null)
throttleManager.registerDestination(destinationId, ts.getOutgoingClientFrequency(), ts.getOutboundPolicy());
} else if (si.maxFrequency > 0) // Let the client know that maxFrequency will be ignored.
{
if (Log.isWarn())
Log.getLogger(MESSAGE_CLIENT_LOG_CATEGORY).warn("MessageClient with clientId '"
+ clientId + "' for destination '" + destinationId
+ "' specified a maxFrequency value of '" + si.maxFrequency
+ "' but the destination does not define a throttling policy. This value will be ignored.");
}
// Now, register the subscription.
OutboundQueueThrottleManager throttleManager = getThrottleManager(false);
if (throttleManager != null)
throttleManager.registerSubscription(destinationId, si);
}
/**
* Removes a subscription from the subscription set for this MessageClient.
*
* @param selector The selector expression for the subscription.
* @param subtopic The subtopic for the subscription.
* @return true if no subscriptions remain for this MessageClient; otherwise false.
*/
public boolean removeSubscription(String selector, String subtopic) {
synchronized (lock) {
SubscriptionInfo si = new SubscriptionInfo(selector, subtopic);
if (subscriptions.remove(si)) {
unregisterSubscriptionWithThrottleManager(si);
return decrementReferences();
} else if (Log.isError()) {
Log.getLogger(MessageService.LOG_CATEGORY).error("Error - unable to find subscription to remove for MessageClient: "
+ clientId + " selector: " + selector + " subtopic: " + subtopic);
}
return numReferences == 0;
}
}
/**
* We use the same MessageClient for more than one subscription with different
* selection criteria. This tracks the number of subscriptions that are active
* so that we know when we are finished.
*/
public void incrementReferences() {
synchronized (lock) {
numReferences++;
}
}
/**
* Decrements the numReferences variable and returns true if this was the last reference.
*/
public boolean decrementReferences() {
synchronized (lock) {
if (--numReferences == 0) {
cancelTimeout();
if (destination instanceof MessageDestination) {
MessageDestination msgDestination = (MessageDestination) destination;
if (msgDestination.getThrottleManager() != null)
msgDestination.getThrottleManager().removeClientThrottleMark(clientId);
}
return true;
}
return false;
}
}
/**
* Invoked by SubscriptionManager once the subscription state is setup completely
* for the MessageClient..
*/
public void notifyCreatedListeners() {
// Notify MessageClient created listeners.
if (!createdListeners.isEmpty()) {
// CopyOnWriteArrayList is iteration-safe from ConcurrentModificationExceptions.
for (Iterator iter = createdListeners.iterator(); iter.hasNext(); )
((MessageClientListener) iter.next()).messageClientCreated(this);
}
}
/**
* Invoked by SubscriptionManager while handling a subscribe request.
* If the request is updating an existing subscription the 'push' state in the associated FlexClient
* may need to be updated to ensure that the correct endpoint is used for this subscription.
*
* @param newEndpointId The id for the new endpoint that the subscription may have failed over to.
*/
public void resetEndpoint(String newEndpointId) {
String oldEndpointId = null;
FlexSession oldSession = null;
FlexSession newSession = FlexContext.getFlexSession();
synchronized (lock) {
// If anything is null, or nothing has changed, no need for a reset.
if (endpointId == null || newEndpointId == null || flexSession == null || newSession == null || (endpointId.equals(newEndpointId) && flexSession.equals(newSession)))
return;
oldEndpointId = endpointId;
endpointId = newEndpointId;
oldSession = flexSession;
flexSession = newSession;
}
// Unregister in order to reset the proper push settings in the re-registration below once the session association has been patched.
if (flexClient != null)
flexClient.unregisterMessageClient(this);
// Clear out any reference to this subscription that the previously associated session has.
if (oldSession != null)
oldSession.unregisterMessageClient(this);
// Associate the current session with this subscription.
if (flexSession != null)
flexSession.registerMessageClient(this);
// Reset proper push settings.
if (flexClient != null)
flexClient.registerMessageClient(this);
if (Log.isDebug()) {
String msg = "MessageClient with clientId '" + clientId + "' for destination '" + destinationId + "' has been reset as a result of a resubscribe.";
if (oldEndpointId != null && !oldEndpointId.equals(newEndpointId))
msg += " Endpoint change [" + oldEndpointId + " -> " + newEndpointId + "]";
if ((oldSession != null) && (newSession != null) && (oldSession != newSession)) // Test identity.
msg += " FlexSession change [" + oldSession.getClass().getName() + ":" + oldSession.getId() + " -> " + newSession.getClass().getName() + ":" + newSession.getId() + "]";
Log.getLogger(MESSAGE_CLIENT_LOG_CATEGORY).debug(msg);
}
}
/**
* Used to test whether this client should receive this message
* based on the list of subscriptions we have recorded for it.
* It must match both the subtopic and the selector expression.
* Usually this is done by the subscription manager - this logic is
* only here to maintain api compatibility with one of the variants
* of the pushMessageToClients which has subscriberIds and an evalSelector
* property.
*
* @param message The message to test.
*/
public boolean testMessage(Message message, MessageDestination destination) {
String subtopic = (String) message.getHeader(AsyncMessage.SUBTOPIC_HEADER_NAME);
String subtopicSeparator = destination.getServerSettings().getSubtopicSeparator();
synchronized (lock) {
for (SubscriptionInfo si : subscriptions) {
if (si.matches(message, subtopic, subtopicSeparator))
return true;
}
}
return false;
}
/**
* Returns true if the MessageClient is valid; false if it has been invalidated.
*
* @return true if the MessageClient is valid; otherwise false.
*/
public boolean isValid() {
synchronized (lock) {
return valid;
}
}
/**
* Invalidates the MessageClient.
*/
public void invalidate() {
invalidate(false /* don't attempt to notify the client */);
}
/**
* Invalidates the MessageClient, and optionally attempts to notify the client that
* this subscription has been invalidated.
* This overload is used when a subscription is timed out while the client is still
* actively connected to the server but should also be used by any custom code on the server
* that invalidates MessageClients but wishes to notify the client cleanly.
*
* @param notifyClient <code>true</code> to notify the client that its subscription has been
* invalidated.
*/
public void invalidate(boolean notifyClient) {
synchronized (lock) {
if (!valid || invalidating)
return; // Already shutting down.
invalidating = true; // This thread gets to shut the MessageClient down.
cancelTimeout();
}
// Record whether we're attempting to notify the client or not.
attemptingInvalidationClientNotification = notifyClient;
// Build a subscription invalidation message and push to the client if it is still valid.
if (notifyClient && flexClient != null && flexClient.isValid()) {
CommandMessage msg = new CommandMessage();
msg.setDestination(destination.getId());
msg.setClientId(clientId);
msg.setOperation(CommandMessage.SUBSCRIPTION_INVALIDATE_OPERATION);
Set subscriberIds = new TreeSet();
subscriberIds.add(clientId);
try {
if (destination instanceof MessageDestination) {
MessageDestination msgDestination = (MessageDestination) destination;
((MessageService) msgDestination.getService()).pushMessageToClients(msgDestination, subscriberIds, msg, false /* don't eval selector */);
}
} catch (MessageException ignore) {
}
}
// Notify messageClientDestroyed listeners that we're being invalidated.
if (destroyedListeners != null && !destroyedListeners.isEmpty()) {
for (Iterator iter = destroyedListeners.iterator(); iter.hasNext(); ) {
((MessageClientListener) iter.next()).messageClientDestroyed(this);
}
destroyedListeners.clear();
}
// And generate unsubscribe messages for all of the MessageClient's subscriptions and
// route them to the destination this MessageClient is subscribed to.
// The reason we send a message to the service rather than just going straight to the SubscriptionManager
// is that some adapters manage their own subscription state (i.e. JMS) in addition to us keeping track of
// things with our SubscriptionManager.
ArrayList<CommandMessage> unsubMessages = new ArrayList<CommandMessage>();
synchronized (lock) {
for (SubscriptionInfo subInfo : subscriptions) {
CommandMessage unsubMessage = new CommandMessage();
unsubMessage.setDestination(destination.getId());
unsubMessage.setClientId(clientId);
unsubMessage.setOperation(CommandMessage.UNSUBSCRIBE_OPERATION);
unsubMessage.setHeader(CommandMessage.SUBSCRIPTION_INVALIDATED_HEADER, Boolean.TRUE);
unsubMessage.setHeader(CommandMessage.SELECTOR_HEADER, subInfo.selector);
unsubMessage.setHeader(AsyncMessage.SUBTOPIC_HEADER_NAME, subInfo.subtopic);
unsubMessages.add(unsubMessage);
}
}
// Release the lock and send the unsub messages.
for (CommandMessage unsubMessage : unsubMessages) {
try {
destination.getService().serviceCommand(unsubMessage);
} catch (MessageException me) {
if (Log.isDebug())
Log.getLogger(MESSAGE_CLIENT_LOG_CATEGORY).debug("MessageClient: " + getClientId() + " issued an unsubscribe message during invalidation that was not processed but will continue with invalidation. Reason: " + ExceptionUtil.toString(me));
}
}
synchronized (lock) {
// If we didn't clean up all subscriptions log an error and continue with shutdown.
int remainingSubscriptionCount = subscriptions.size();
if (remainingSubscriptionCount > 0 && Log.isError())
Log.getLogger(MESSAGE_CLIENT_LOG_CATEGORY).error("MessageClient: " + getClientId() + " failed to remove " + remainingSubscriptionCount + " subscription(s) during invalidation");
}
// If someone registered this message client, invalidating it will free
// their reference which will typically also remove this message client.
if (registered && destination instanceof MessageDestination)
((MessageDestination) destination).getSubscriptionManager().releaseMessageClient(this);
synchronized (lock) {
valid = false;
invalidating = false;
}
if (Log.isDebug())
Log.getLogger(MESSAGE_CLIENT_LOG_CATEGORY).debug("MessageClient with clientId '" + clientId + "' for destination '" + destinationId + "' has been invalidated.");
}
/**
* Pushes the supplied message and then invalidates the MessageClient.
*
* @param message The message to push to the client before invalidating.
* When message is null, MessageClient is invalidated silently.
*/
public void invalidate(Message message) {
if (message != null) {
message.setDestination(destination.getId());
message.setClientId(clientId);
Set subscriberIds = new TreeSet();
subscriberIds.add(clientId);
try {
if (destination instanceof MessageDestination) {
MessageDestination msgDestination = (MessageDestination) destination;
((MessageService) msgDestination.getService()).pushMessageToClients(msgDestination, subscriberIds, message, false /* don't eval selector */);
}
} catch (MessageException ignore) {
}
invalidate(true /* attempt to notify remote client */);
} else {
invalidate();
}
}
/**
* Compares this MessageClient to the specified object. The result is true if
* the argument is not null and is a MessageClient instance with a matching
* clientId value.
*
* @param o The object to compare this MessageClient to.
* @return true if the MessageClient is equal; otherwise false.
*/
public boolean equals(Object o) {
if (o instanceof MessageClient) {
MessageClient c = (MessageClient) o;
if (c != null && c.getClientId().equals(clientId))
return true;
}
return false;
}
/**
* Returns the hash code for this MessageClient. The returned value is
* the hash code for the MessageClient's clientId property.
*
* @return The hash code value for this MessageClient.
*/
@Override
public int hashCode() {
return getClientId().hashCode();
}
/**
* The String representation of this MessageClient is returned (its clientId value).
*
* @return The clientId value for this MessageClient.
*/
@Override
public String toString() {
return String.valueOf(clientId);
}
//----------------------------------
// TimeoutAbstractObject overrides
//----------------------------------
/**
* Implements TimeoutCapable.
* This method returns the timeout value configured for the MessageClient's destination.
*/
@Override
public long getTimeoutPeriod() {
return (destination instanceof MessageDestination) ?
((MessageDestination) destination).getSubscriptionManager().getSubscriptionTimeoutMillis() : 0;
}
/**
* Implements TimeoutCapable.
* This method is invoked when the MessageClient has timed out and it
* invalidates the MessageClient.
*/
public void timeout() {
invalidate(true /* notify client */);
}
/**
* Returns true if a timeout task is running for this MessageClient.
*/
public boolean isTimingOut() {
return willTimeout;
}
/**
* Records whether a timeout task is running for this MessageClient.
*/
public void setTimingOut(boolean value) {
willTimeout = value;
}
//--------------------------------------------------------------------------
//
// Private Methods
//
//--------------------------------------------------------------------------
/**
* Utility method that tests validity and throws an exception if the instance
* has been invalidated.
*/
private void checkValid() {
synchronized (lock) {
if (!valid) {
throw new RuntimeException("MessageClient has been invalidated."); // TODO - localize
}
}
}
private OutboundQueueThrottleManager getThrottleManager(boolean create) {
if (flexClient != null) {
FlexClientOutboundQueueProcessor processor = flexClient.getOutboundQueueProcessor(endpointId);
if (processor != null)
return create ? processor.getOrCreateOutboundQueueThrottleManager() : processor.getOutboundQueueThrottleManager();
}
return null;
}
private void unregisterSubscriptionWithThrottleManager(SubscriptionInfo si) {
OutboundQueueThrottleManager throttleManager = getThrottleManager(false);
if (throttleManager != null)
throttleManager.unregisterSubscription(destinationId, si);
}
//--------------------------------------------------------------------------
//
// Nested Classes
//
//--------------------------------------------------------------------------
/**
* Represents a MessageClient's subscription to a destination.
* It captures the optional selector expression and subtopic for the
* subscription.
*/
public static class SubscriptionInfo implements Comparable {
public String selector, subtopic;
public int maxFrequency; // maxFrequency per subscription. Not used in BlazeDS.
public SubscriptionInfo(String sel, String sub) {
this(sel, sub, 0);
}
public SubscriptionInfo(String sel, String sub, int maxFrequency) {
this.selector = sel;
this.subtopic = sub;
this.maxFrequency = maxFrequency;
}
@Override
public boolean equals(Object o) {
if (o instanceof SubscriptionInfo) {
SubscriptionInfo other = (SubscriptionInfo) o;
return equalStrings(other.selector, selector) &&
equalStrings(other.subtopic, subtopic);
}
return false;
}
@Override
public int hashCode() {
return (selector == null ? 0 : selector.hashCode()) +
(subtopic == null ? 1 : subtopic.hashCode());
}
/**
* Compares the two subscription infos (being careful to
* ensure we compare in a consistent way if the arguments
* are switched).
*
* @param o the object to compare
* @return int the compare result
*/
public int compareTo(Object o) {
SubscriptionInfo other = (SubscriptionInfo) o;
int result;
if ((result = compareStrings(other.selector, selector)) != 0)
return result;
else if ((result = compareStrings(other.subtopic, subtopic)) != 0)
return result;
return 0;
}
/**
* Check whether the message matches with selected subtopic.
*
* @param message current message
* @param subtopicToMatch subtopc string
* @param subtopicSeparator suptopic separator
* @return true if the message matches the subtopic
*/
public boolean matches(Message message, String subtopicToMatch, String subtopicSeparator) {
if ((subtopicToMatch == null && subtopic != null) || (subtopicToMatch != null && subtopic == null))
return false; // If either defines a subtopic, they both must define one.
// If both define a subtopic, they must match.
if (subtopicToMatch != null && subtopic != null) {
Subtopic consumerSubtopic = new Subtopic(subtopic, subtopicSeparator);
Subtopic messageSubtopic = new Subtopic(subtopicToMatch, subtopicSeparator);
if (!consumerSubtopic.matches(messageSubtopic))
return false; // Not a match.
}
if (selector == null)
return true;
/*JMSSelector jmsSelector = new JMSSelector(selector);
try
{
if (jmsSelector.match(message))
return true;
}
catch (JMSSelectorException jmse)
{
// Log a warning for this client's selector and continue
if (Log.isWarn())
{
Log.getLogger(JMSSelector.LOG_CATEGORY).warn("Error processing message selector: " +
jmse.toString() + StringUtils.NEWLINE +
" incomingMessage: " + message + StringUtils.NEWLINE +
" selector: " + selector + StringUtils.NEWLINE);
}
}*/
return false;
}
/**
* Returns a String representation of the subscription info.
*
* @return String the string representation of the subscription info
*/
public String toString() {
StringBuffer sb = new StringBuffer();
sb.append("Subtopic: " + subtopic + StringUtils.NEWLINE);
sb.append("Selector: " + selector + StringUtils.NEWLINE);
if (maxFrequency > 0)
sb.append("maxFrequency: " + maxFrequency);
return sb.toString();
}
}
}