blob: acb40d2a6d71ff08e167d7902611ae29cffb11d4 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package flex.messaging.client;
import flex.messaging.ConnectionAwareSession;
import flex.messaging.FlexContext;
import flex.messaging.FlexSession;
import flex.messaging.FlexSessionListener;
import flex.messaging.HttpFlexSession;
import flex.messaging.MessageClient;
import flex.messaging.MessageClientListener;
import flex.messaging.MessageException;
import flex.messaging.endpoints.Endpoint;
import flex.messaging.log.Log;
import flex.messaging.log.LogCategories;
import flex.messaging.messages.CommandMessage;
import flex.messaging.messages.Message;
import flex.messaging.services.MessageService;
import flex.messaging.util.StringUtils;
import flex.messaging.util.TimeoutAbstractObject;
import java.security.Principal;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Enumeration;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;
/**
* Represents a Flex client application instance on the server.
*/
public class FlexClient extends TimeoutAbstractObject implements FlexSessionListener, MessageClientListener {
//--------------------------------------------------------------------------
//
// Public Static Constants
//
//--------------------------------------------------------------------------
/**
* Log category for FlexClient related messages.
*/
public static final String FLEX_CLIENT_LOG_CATEGORY = LogCategories.CLIENT_FLEXCLIENT;
/**
* This value is passed to the server in an initial client connect to
* indicate that the client needs a server-assigned FlexClient Id.
*/
public static final String NULL_FLEXCLIENT_ID = "nil";
//--------------------------------------------------------------------------
//
// Private Static Constants
//
//--------------------------------------------------------------------------
/**
* Error string constants.
*/
private static final int FLEX_CLIENT_INVALIDATED = 10027;
private static final int ENDPOINT_PUSH_HANDLER_ALREADY_REGISTERED = 10033;
private static final String POLL_WAIT_THREAD_NAME_EXTENSION = "-in-poll-wait";
//--------------------------------------------------------------------------
//
// Private Static Variables
//
//--------------------------------------------------------------------------
/**
* List of registered FlexClient created listeners.
*/
private static final CopyOnWriteArrayList<FlexClientListener> createdListeners = new CopyOnWriteArrayList<FlexClientListener>();
//--------------------------------------------------------------------------
//
// Public Static Methods
//
//--------------------------------------------------------------------------
/**
* Adds a create listener that will be notified when new FlexClients
* are created.
*
* @param listener The listener to add.
* @see flex.messaging.client.FlexClientListener
*/
public static void addClientCreatedListener(FlexClientListener listener) {
if (listener != null)
createdListeners.addIfAbsent(listener);
}
/**
* Removes a FlexClient created listener.
*
* @param listener The listener to remove.
* @see flex.messaging.client.FlexClientListener
*/
public static void removeClientCreatedListener(FlexClientListener listener) {
if (listener != null)
createdListeners.remove(listener);
}
//--------------------------------------------------------------------------
//
// Constructor
//
//--------------------------------------------------------------------------
/**
* Constructs a new FlexClient instance.
*
* @param manager The FlexClientManager managing this instance.
*/
public FlexClient(FlexClientManager manager) {
this(manager, FlexContext.getMessageBroker().createUUID());
}
/**
* Constructs a new FlexClient instance having the specified Id.
*
* @param manager The FlexClientManager managing this instance.
* @param id The Id for this instance.
*/
public FlexClient(FlexClientManager manager, String id) {
this.id = id;
flexClientManager = manager;
updateLastUse();
valid = true;
if (Log.isDebug())
Log.getLogger(FLEX_CLIENT_LOG_CATEGORY).debug("FlexClient created with id '" + this.id + "'.");
}
//--------------------------------------------------------------------------
//
// Variables
//
//--------------------------------------------------------------------------
/**
* Storage for custom attributes.
*/
private volatile Map<String, Object> attributes;
/**
* List of registered FlexClient attribute listeners.
*/
private volatile CopyOnWriteArrayList<FlexClientAttributeListener> attributeListeners;
/**
* List of registered FlexClient destroyed listeners.
*/
private volatile CopyOnWriteArrayList<FlexClientListener> destroyedListeners;
/**
* The manager for the FlexClient.
*/
final FlexClientManager flexClientManager;
/**
* The unique Id for the instance.
*/
private final String id;
/**
* Flag used to break cycles during invalidation.
*/
/* package visibility for FlexClientManager */ volatile boolean invalidating;
/**
* Instance level lock to sync for state changes.
*/
final Object lock = new Object();
/**
* MessageClient subscriptions for this MessageClient.
*/
private volatile CopyOnWriteArrayList<MessageClient> messageClients;
/**
* Queues of outbound messages to push to the client keyed by endpoint id.
* Map(String endpointId, EndpointQueue queue).
*/
private final Map<String, EndpointQueue> outboundQueues = new ConcurrentHashMap<String, EndpointQueue>(1);
/**
* EndpointPushHandlers keyed by endpointId that the FlexClient
* can use to push messages to remote clients.
* NOTE: these can't be added to the EndpointQueue data type because the existence of queues depends
* upon client subscription state whereas endpoints that support push will generally set up their push
* handling before any subscriptions have been created.
*/
private Map<String, EndpointPushHandler> endpointPushHandlers;
/**
* Associated FlexSessions that represent the connections the FlexClient makes to the server.
*/
private final CopyOnWriteArrayList<FlexSession> sessions = new CopyOnWriteArrayList<FlexSession>(); // We always have at least one session.
/**
* Flag indicating whether the instance is valid; once invalidated this flag is
* set to false.
*/
boolean valid;
/**
* The principal associated with this client. Only used when perClientAuthentication
* is being used.
*/
private Principal userPrincipal;
//--------------------------------------------------------------------------
//
// Public Methods
//
//--------------------------------------------------------------------------
/**
* Adds a FlexClient attribute listener that will be notified when an
* attribute is added, removed or changed. If the attribute implements
* FlexClientBindingListener, it will be notified before any
* FlexClientAttributeListeners are notified.
*
* @param listener The listener to add.
*/
public void addClientAttributeListener(FlexClientAttributeListener listener) {
if (listener != null) {
checkValid();
synchronized (lock) {
if (attributeListeners == null)
attributeListeners = new CopyOnWriteArrayList<FlexClientAttributeListener>();
}
attributeListeners.addIfAbsent(listener);
}
}
/**
* Adds a destroy listener that will be notified when the FlexClient
* is destroyed. Listeners are notified after all attributes
* have been unbound from the FlexClient and any FlexClientBindingListeners
* and FlexClientAttributeListeners have been notified.
*
* @param listener The listener to add.
* @see flex.messaging.client.FlexClientListener
*/
public void addClientDestroyedListener(FlexClientListener listener) {
if (listener != null) {
checkValid();
synchronized (lock) {
if (destroyedListeners == null)
destroyedListeners = new CopyOnWriteArrayList<FlexClientListener>();
}
destroyedListeners.addIfAbsent(listener);
}
}
/**
* Returns the attribute bound to the specified name for the FlexClient, or null
* if no attribute is bound under the name.
*
* @param name The name the attribute is bound to.
* @return The attribute bound to the specified name.
*/
public Object getAttribute(String name) {
synchronized (lock) {
checkValid();
updateLastUse();
return (attributes == null) ? null : attributes.get(name);
}
}
/**
* Returns a snapshot of the names of all attributes bound to the FlexClient.
*
* @return A snapshot of the names of all attributes bound to the FlexClient.
*/
public Enumeration<String> getAttributeNames() {
synchronized (lock) {
checkValid();
updateLastUse();
if (attributes == null)
return Collections.enumeration(Collections.<String>emptyList());
// Return a copy so we do not run into concurrent modification problems if
// someone adds to the attributes while iterating through the returned enumeration.
return Collections.enumeration(new ArrayList<String>(attributes.keySet()));
}
}
/**
* Returns the push handler registered with the FlexClient with the supplied
* endpoint id, or null if no push handler was registered with the FlexClient
* for that endpoint.
*
* @return The push handler registered with the FlexClient with the supplied
* endpoint id, or null if no push handler was registered with the FlexClient
* for that endpoint.
*/
public EndpointPushHandler getEndpointPushHandler(String endpointId) {
synchronized (lock) {
if (endpointPushHandlers != null && endpointPushHandlers.containsKey(endpointId))
return endpointPushHandlers.get(endpointId);
return null;
}
}
/**
* Returns the queue processor registered with the FlexClient with the supplied
* endpoint id, or null if no queue processor was registered with the FlexClient
* for that endpoint.
*
* @param endpointId The endpoint id.
* @return The queue processor registered with the FlexClient.
*/
public FlexClientOutboundQueueProcessor getOutboundQueueProcessor(String endpointId) {
EndpointQueue queue = outboundQueues.get(endpointId);
return (queue != null) ? queue.processor : null;
}
/**
* Returns the endpoint queue registered with the FlexClient with the supplied
* endpoint id, or null if no endpoint queue was registered with the FlexClient
* for that endpoint.
*
* @param endpointId The endpoint id.
* @return The endpoint queue registered with the FlexClient.
*/
public EndpointQueue getEndpointQueue(String endpointId) {
return outboundQueues.get(endpointId);
}
/**
* Override {@link flex.messaging.util.TimeoutAbstractObject#getLastUse()} to make timeout
* dependent upon FlexClient inactivity but also upon the presence of an active push-enabled session,
* async or waited poll, or registered endpoint push handler (all of which indicate that a client has
* an active, open connection to the server).
*
* @return The 'last use' timestamp for the FlexClient, which may be the current system time if the FlexClient
* has been idle but an open connection from the client to the server exists.
*/
@Override
public long getLastUse() {
synchronized (lock) {
long currentLastUse = super.getLastUse();
long idleTime = System.currentTimeMillis() - currentLastUse;
if (idleTime < flexClientManager.getFlexClientTimeoutMillis())
return currentLastUse; // Not timed out; this will trigger the timeout to be rescheduled.
// Check for async long-polls or endpoint streaming connections, if found, keep alive.
if (!outboundQueues.isEmpty()) {
for (EndpointQueue queue : outboundQueues.values()) {
if (queue.asyncPoll != null)
return System.currentTimeMillis();
if (endpointPushHandlers != null && endpointPushHandlers.containsKey(queue.endpointId))
return System.currentTimeMillis();
}
}
// Check for connected sessions, or a session holding a (non-async) long poll and if found, keep alive.
for (FlexSession session : sessions) {
if (session instanceof ConnectionAwareSession) {
if (((ConnectionAwareSession) session).isConnected())
return System.currentTimeMillis();
}
// Otherwise, check for a long-poll.
if (session.waitMonitor != null) {
for (EndpointQueue queue : session.waitMonitor.values()) {
if (queue.flexClient.equals(this))
return System.currentTimeMillis();
}
}
}
return currentLastUse; // Allow the FlexClient to timeout.
}
}
/**
* Returns the attribute bound to the specified name for the current FlexSession
* associated with the FlexClient. If the attribute does not exist in the current
* FlexSession, this method iterates through all the other FlexSessions associated with
* the FlexClient and either returns the attribute bound, or null if no attribute is bound
* under the name.
*
* @param name The name the attribute is bound to.
* @return The attribute bound to the specified name.
*/
public Object getSessionAttribute(String name) {
Object attributeValue = getSessionAttributeInCurrentSession(name);
if (attributeValue != null)
return attributeValue;
return getSessionAttributeInOtherSessions(name);
}
/**
* Returns a snapshot of the names of all attributes bound to all the FlexSessions
* associated with the FlexClient.
*
* @return A snapshot of the names of all attributes bound to all the FlexSessions
* associated with the FlexClient.
*/
public Enumeration<String> getSessionAttributeNames() {
Set<String> attributeNames = new HashSet<String>();
for (FlexSession session : sessions)
attributeNames.addAll(getSessionAttributeNames(session));
return Collections.enumeration(attributeNames);
}
/**
* Returns the principal associated with this client. If the client has not
* authenticated the principal will be null. Should only be called from FlexContext
* and only if perClientAuthentication is used. Not available to users.
*
* @return The principal associated with the session.
*/
public Principal getUserPrincipal() {
synchronized (lock) {
checkValid();
return userPrincipal;
}
}
/**
* Should only be called from FlexContext and only if perClientAuthentication is used.
* Not available to users.
*
* @param userPrincipal The principal to associate with the session.
*/
public void setUserPrincipal(Principal userPrincipal) {
synchronized (lock) {
checkValid();
this.userPrincipal = userPrincipal;
}
}
/**
* Invalidates the FlexClient.
*/
public void invalidate() {
synchronized (lock) {
if (!valid || invalidating)
return; // Already shutting down.
invalidating = true; // This thread gets to shut the FlexClient down.
flexClientManager.removeFlexClient(this);
cancelTimeout();
}
// Unregister from all FlexSessions.
if (!sessions.isEmpty()) {
for (FlexSession session : sessions)
unregisterFlexSession(session);
}
// Invalidate associated MessageClient subscriptions.
if (messageClients != null && !messageClients.isEmpty()) {
for (MessageClient messageClient : messageClients) {
messageClient.removeMessageClientDestroyedListener(this);
messageClient.invalidate();
}
messageClients.clear();
}
// Notify destroy listeners that we're shutting the FlexClient down.
if (destroyedListeners != null && !destroyedListeners.isEmpty()) {
for (FlexClientListener destroyListener : destroyedListeners) {
destroyListener.clientDestroyed(this);
}
destroyedListeners.clear();
}
// Unbind all attributes.
if (attributes != null && !attributes.isEmpty()) {
Set<String> keySet = attributes.keySet();
String[] keys = keySet.toArray(new String[keySet.size()]);
for (String key : keys)
removeAttribute(key);
}
// Close any registered push handlers.
if (endpointPushHandlers != null && !endpointPushHandlers.isEmpty()) {
for (EndpointPushHandler handler : endpointPushHandlers.values()) {
handler.close(true /* notify Channel of disconnect */);
}
endpointPushHandlers = null;
}
synchronized (lock) {
valid = false;
invalidating = false;
}
if (Log.isDebug())
Log.getLogger(FLEX_CLIENT_LOG_CATEGORY).debug("FlexClient with id '" + this.id + "' has been invalidated.");
}
/**
* Returns true if the FlexClient is valid; false if it has been invalidated.
*
* @return true if the FlexClient is valid; otherwise false.
*/
public boolean isValid() {
synchronized (lock) {
return valid;
}
}
/**
* Returns a snapshot of the FlexSessions associated with the FlexClient
* when this method is invoked.
* This list is not guaranteed to remain consistent with the actual list
* of active FlexSessions associated with the FlexClient over time.
*
* @return A snapshot of the current list of FlexSessions associated with the FlexClient.
*/
public List<FlexSession> getFlexSessions() {
List<FlexSession> currentSessions;
synchronized (lock) {
checkValid();
updateLastUse();
currentSessions = new ArrayList<FlexSession>(sessions); // Make a copy of the current list to return.
}
return currentSessions;
}
/**
* Return the session count.
*
* @return The number of sessions associated with this FlexClient.
*/
public int getSessionCount() {
int sessionCount;
synchronized (lock) {
sessionCount = (sessions != null) ? sessions.size() : 0; // Make a copy of the current list to return.
}
return sessionCount;
}
/**
* Return the subscription count.
*
* @return The number of subscriptions associated with this FlexClient.
*/
public int getSubscriptionCount() {
int count = 0;
synchronized (lock) {
if (messageClients != null && !messageClients.isEmpty()) {
for (MessageClient messageClient : messageClients)
count += messageClient.getSubscriptionCount();
}
}
return count;
}
/**
* Returns the message client registered with the FlexClient with the supplied
* client id, or null if no message client was registered with the FlexClient
* with that client id.
*
* @param clientId The client id.
* @return The message client registered with the FlexClient.
*/
public MessageClient getMessageClient(String clientId) {
synchronized (lock) {
if (messageClients != null && !messageClients.isEmpty()) {
for (MessageClient messageClient : messageClients) {
if (messageClient.getClientId().equals(clientId))
return messageClient;
}
}
}
return null;
}
/**
* Returns a snapshot of the MessageClients (subscriptions) associated with the FlexClient
* when this method is invoked.
* This list is not guaranteed to remain consistent with the actual list
* of active MessageClients associated with the FlexClient over time.
*
* @return A snapshot of the current list of MessageClients associated with the FlexClient.
*/
public List<MessageClient> getMessageClients() {
List<MessageClient> currentMessageClients;
synchronized (lock) {
checkValid();
updateLastUse();
currentMessageClients = (messageClients != null) ? new ArrayList<MessageClient>(messageClients) // Make a copy of the current list to return.
: Collections.<MessageClient>emptyList(); // Return an empty list.
}
return currentMessageClients;
}
/**
* Returns the unique Id for the FlexClient.
*
* @return The unique Id for the FlexClient.
*/
public String getId() {
return id;
}
/**
* Implements TimeoutCapable.
* Determine the time, in milliseconds, that this object is allowed to idle
* before having its timeout method invoked.
*/
@Override
public long getTimeoutPeriod() {
return flexClientManager.getFlexClientTimeoutMillis();
}
/**
* Implements MessageClientListener.
* Handling created events is a no-op.
*
* @param messageClient The new MessageClient.
*/
public void messageClientCreated(MessageClient messageClient) {
}
/**
* Implements MessageClientListener.
* Notification that an associated FlexSession was destroyed.
*
* @param messageClient The MessageClient that was destroyed.
*/
public void messageClientDestroyed(MessageClient messageClient) {
unregisterMessageClient(messageClient);
}
/**
* Poll for outbound messages for the FlexClient.
* This method is only invoked by internal code while processing a client poll request; it
* is not intended for general public use.
* Poll requests that trigger this method come from client-side polling channels and the request
* is not specific to a single Consumer/MessageClient instance so process any queued messages for
* the specified endpoint across all subscriptions.
*
* @param endpointId The Id of the endpoint that received the poll request.
* @return The flush result including messages to return in the poll response and
* an optional wait time for the next poll/flush.
*/
public FlushResult poll(String endpointId) {
EndpointQueue queue = null;
synchronized (lock) {
checkValid();
queue = outboundQueues.get(endpointId);
if (queue != null)
return internalPoll(queue);
}
if (queue == null) {
// Otherwise, the client is not subscribed.
throwNotSubscribedException(endpointId);
}
return null;
}
/**
* Poll for outbound messages for the FlexClient and if no messages are available
* immediately, store a reference to the passed async handler and call back when messages arrive.
*
* @param endpointId The Id of the endpoint that received the poll request.
* @param handler The handler to callback when messages arrive.
* @param waitIntervalMillis The wait interval in milliseconds for the poll to wait for data to arrive
* before returning an empty poll response.
* @return A <tt>TimeoutAbstractObject</tt> representing the asynchronous poll, or <code>null</code>
* if the poll request was handled immediately because data was available to return.
*/
public TimeoutAbstractObject pollAsync(String endpointId, AsyncPollHandler handler, long waitIntervalMillis) {
EndpointQueue queue;
TimeoutAbstractObject asyncPollTask = null;
synchronized (lock) {
checkValid();
queue = outboundQueues.get(endpointId);
// If the queue exists and is not empty, flush immediately.
if (queue != null) {
if (!queue.messages.isEmpty()) {
handler.asyncPollComplete(internalFlush(queue));
} else // Set up an async long-poll.
{
// Avoid monopolizing user agent connections.
FlexSession session = FlexContext.getFlexSession();
synchronized (session) {
if (session.asyncPollMap != null) {
AsyncPollWithTimeout parkedPoll = session.asyncPollMap.get(endpointId);
if (parkedPoll != null) {
// If the poll is from the same client for this endpoint, treat it as a no-op.
if (parkedPoll.getFlexClient().equals(this)) {
PollFlushResult result = new PollFlushResult();
result.setClientProcessingSuppressed(true);
handler.asyncPollComplete(result);
} else // If the poll is for a different client on the same session, swap their waits.
{
PollFlushResult result = new PollFlushResult();
result.setAvoidBusyPolling(true);
completeAsyncPoll(parkedPoll, result);
}
}
}
AsyncPollWithTimeout asyncPoll = new AsyncPollWithTimeout(this, session, queue, handler, waitIntervalMillis, endpointId);
synchronized (session) {
if (session.asyncPollMap == null)
session.asyncPollMap = new HashMap<String, AsyncPollWithTimeout>();
session.asyncPollMap.put(endpointId, asyncPoll);
}
queue.asyncPoll = asyncPoll;
asyncPollTask = asyncPoll;
}
}
}
}
if (queue == null) {
// The queue was null; let the client know that there are no active subscriptions.
throwNotSubscribedException(endpointId);
}
return asyncPollTask;
}
/**
* Poll for outbound messages for the FlexClient and if no messages are available
* immediately, put processing into a wait state until messages arrive.
* This method is only invoked by internal code while processing a client poll request; it
* is not intended for general public use.
* Poll requests that trigger this method come from client-side polling channels and the request
* is not specific to a single Consumer/MessageClient instance so process any queued messages for
* the specified endpoint across all subscriptions.
*
* @param endpointId The Id of the endpoint that received the poll request.
* @param session The FlexSession associated with this waitable poll request.
* @param listener The listener to notify before a wait begins and as soon as one completes.
* @param waitIntervalMillis The maximum amount of time to wait for messages in milliseconds.
* @return The flush result including messages to return in the poll response and
* an optional wait time for the next poll/flush.
*/
public FlushResult pollWithWait(String endpointId, FlexSession session, PollWaitListener listener, long waitIntervalMillis) {
EndpointQueue queue;
synchronized (lock) {
checkValid();
queue = outboundQueues.get(endpointId);
// If the queue exists and is not empty there's no reason to wait; flush immediately.
if (queue != null) {
FlushResult flushResult = internalPoll(queue);
if (flushResult != null)
return flushResult;
}
}
// The queue exists but it was empty; we can try to wait for messages.
if (queue != null) {
synchronized (session) {
// Set up the waitMonitor on the session; this is a reference to the queue that the
// current poll request targets and we use it as a wait/notify monitor.
// This also lets us prevent busy polling cycles from a single client. If we already have a waited
// poll request a subsequent poll request is treated as a no-op.
if (session.waitMonitor != null) {
final EndpointQueue waitingQueue = session.waitMonitor.get(endpointId);
// If the poll is from the same client swf, and the same endpoint, treat it as a no-op poll.
if (waitingQueue != null && waitingQueue.flexClient.equals(this)) {
PollFlushResult result = new PollFlushResult();
result.setClientProcessingSuppressed(true);
return result;
}
} else {
session.waitMonitor = new HashMap<String, EndpointQueue>();
}
// Set the waitMonitor for the session to the queue
// for this poll request before releasing the lock.
session.waitMonitor.put(endpointId, queue);
}
// Now that the session references the wait monitor this thread will use to wait we can enter
// the wait state.
// -1 wait-interval actually means wait until notified.
waitIntervalMillis = (waitIntervalMillis == -1) ? 0 : waitIntervalMillis;
String threadName = Thread.currentThread().getName();
try {
boolean didWait = false;
boolean avoidBusyPolling = false;
synchronized (queue) {
// If the message queue is still empty, wait for a message to be added before invoking flush.
if (queue.messages.isEmpty()) {
reportStatusIfDebug("waiting for new messages to arrive");
didWait = true;
// Tag thread name during the wait.
Thread currentThread = Thread.currentThread();
currentThread.setName(threadName + POLL_WAIT_THREAD_NAME_EXTENSION);
if (listener != null)
listener.waitStart(queue);
queue.waitPoll = true; // Mark the queue as waiting.
queue.wait(waitIntervalMillis);
queue.waitPoll = false; // Unmark the queue as waiting.
// Reset thread name now that the wait is over.
currentThread.setName(threadName);
if (listener != null)
listener.waitEnd(queue);
if (queue.avoidBusyPolling) {
avoidBusyPolling = true;
queue.avoidBusyPolling = false;
}
}
}
synchronized (session) {
if (session.waitMonitor != null) {
session.waitMonitor.remove(endpointId);
}
}
if (Log.isDebug()) {
if (didWait)
reportStatusIfDebug("done waiting for new messages to arrive and is flushing the outbound queue");
else
reportStatusIfDebug("didn't need to wait and is flushing the outbound queue");
}
// We need to hold the FlexClient lock to invoke flush.
FlushResult result;
synchronized (lock) {
result = internalFlush(queue);
}
if (avoidBusyPolling) {
PollFlushResult swappedPollResult = new PollFlushResult();
if (result != null) {
swappedPollResult.setMessages(result.getMessages());
swappedPollResult.setNextFlushWaitTimeMillis(result.getNextFlushWaitTimeMillis());
}
swappedPollResult.setAvoidBusyPolling(true);
result = swappedPollResult;
}
return result;
} catch (InterruptedException e) {
if (Log.isWarn())
Log.getLogger(FLEX_CLIENT_LOG_CATEGORY).warn("Poll wait thread '" + threadName + "' for FlexClient with id '" + this.id +
"' could not finish waiting for new messages to arrive " +
"because it was interrupted: " + e.toString());
}
} else {
// The queue was null; let the client know that there are no active subscriptions.
throwNotSubscribedException(endpointId);
}
return null;
}
private void reportStatusIfDebug(String message) {
String threadName = Thread.currentThread().getName();
if (Log.isDebug())
Log.getLogger(FLEX_CLIENT_LOG_CATEGORY).debug("Poll wait thread '" + threadName + "' for FlexClient with id '" + this.id + "' is " + message);
}
/**
* Poll for outbound messages for a specific MessageClient/Consumer.
* This overload of poll() is only invoked when handling a Consumer.receive() request.
*
* @param client The specific MessageClient instance to poll for messages for.
* @return The flush result including messages to return in the poll response.
* The nextFlushWaitTimeMillis value is always forced to a value of 0 because
* Consumer.receive() calls are driven by client code and this setting has no meaning.
*/
public FlushResult poll(MessageClient client) {
FlushResult flushResult = null;
String endpointId = client.getEndpointId();
EndpointQueue queue = null;
synchronized (lock) {
checkValid();
queue = outboundQueues.get(endpointId);
if (queue != null) {
try {
flushResult = internalFlush(queue, client);
} catch (RuntimeException e) {
if (Log.isError())
Log.getLogger(FLEX_CLIENT_LOG_CATEGORY).error("Failed to flush an outbound queue for MessageClient '" + client.getClientId() + "' for FlexClient '" + getId() + "'.", e);
throw e;
}
if (flushResult != null)
flushResult.setNextFlushWaitTimeMillis(0); // Force to 0.
}
}
if (queue == null) {
throwNotSubscribedException(endpointId);
}
return flushResult;
}
/**
* Push a message to the FlexClient.
* The message is added to the outbound queue of messages for the client and
* will be pushed if possible or retrieved via a client poll request.
*
* @param message The Message to push.
* @param messageClient The MessageClient subscription that this message targets.
*/
public void push(Message message, MessageClient messageClient) {
// We should check the message client is valid or not
if (!isValid())
return;
// Route this message to the proper per-endpoint outbound queue.
EndpointQueue queue = outboundQueues.get(messageClient.getEndpointId());
// This queue may be null if all corresponding subscriptions have been invalidated.
if (queue == null)
return;
boolean empty;
// We need to obtain the lock here
// Maintain the pattern of using the FlexClient.lock and ensure that order of locks should always start with the FlexClient.lock
// This is critical to prevent deadlock cases, see Watson bug 2724938
synchronized (lock) {
synchronized (queue) // To protect the list during the add and allow for notification.
{
// Let the processor add the message to the queue.
try {
queue.processor.add(queue.messages, message);
empty = queue.messages.isEmpty();
if (Log.isDebug())
Log.getLogger(LogCategories.MESSAGE_GENERAL).debug(
"Queuing message: " + message.getMessageId() +
StringUtils.NEWLINE +
" to send to MessageClient: " + messageClient.getClientId() +
StringUtils.NEWLINE +
" for FlexClient: " + messageClient.getFlexClient().getId() +
StringUtils.NEWLINE +
" via endpoint: " + queue.endpointId +
StringUtils.NEWLINE +
" client outbound queue size: " + queue.messages.size());
} catch (RuntimeException e) {
if (Log.isError())
Log.getLogger(FLEX_CLIENT_LOG_CATEGORY).error("Failed to add a message to an outbound queue for FlexClient '" + getId() + "'.", e);
throw e;
}
// And notify any threads that may be in a poll wait state.
if (!empty && queue.waitPoll) {
// TODO This updateLastUse call is added here because there used to be a call
// at the beginning of the push method but not convinced that it is needed.
updateLastUse();
queue.notifyAll();
}
}
if (!empty) {
if (queue.asyncPoll != null) {
completeAsyncPoll(queue.asyncPoll, internalFlush(queue));
} else if (!empty && queue.flushTask == null &&
(queue.pushSession != null || (endpointPushHandlers != null && endpointPushHandlers.containsKey(queue.endpointId)))) {
// If a delayed flush is not scheduled and we have a push-enabled session associated with the queue
// or a push-enabled endpoint, try a direct push to the client.
// Once again we should acquire the lock for queue, otherwise a potential dead lock could happen, see Watson bug 2724936
// By acquiring the queue lock again, we break the cycle by acquiring the queue before holding FlexClient.lock object
synchronized (queue) {
directFlush(queue);
}
}
}
}
}
/**
* Registers an <tt>EndpointPushHandler</tt> for the specified endpoint to handle pushing messages
* to remote clients.
*
* @param handler The <tt>EndpointPushHandler</tt> to register.
* @param endpointId The endpoint to register for.
*/
public void registerEndpointPushHandler(EndpointPushHandler handler, String endpointId) {
synchronized (lock) {
if (endpointPushHandlers == null)
endpointPushHandlers = new HashMap<String, EndpointPushHandler>(1);
if (endpointPushHandlers.containsKey(endpointId)) {
MessageException me = new MessageException();
me.setMessage(ENDPOINT_PUSH_HANDLER_ALREADY_REGISTERED, new Object[]{getId(), endpointId});
throw me;
}
endpointPushHandlers.put(endpointId, handler);
}
}
/**
* Used internally to associate a FlexSession with this FlexClient.
*
* @param session The FlexSession to associate with this FlexClient.
*/
public void registerFlexSession(FlexSession session) {
if (sessions.addIfAbsent(session)) {
session.addSessionDestroyedListener(this);
session.registerFlexClient(this);
}
}
/**
* Used internally to associate a MessageClient with this FlexClient.
*
* @param messageClient The MessageClient to associate with this FlexClient.
*/
public void registerMessageClient(MessageClient messageClient) {
synchronized (lock) {
if (messageClients == null)
messageClients = new CopyOnWriteArrayList<MessageClient>();
}
if (messageClients.addIfAbsent(messageClient)) {
messageClient.addMessageClientDestroyedListener(this);
String endpointId = messageClient.getEndpointId();
// Manage the outbound queue this MessageClient's subscription(s) will use
// and associate the MessageClient with an EndpointPushHandler if one exists for the
// endpoint the subscription was made over; this allows the shut-down of a
// push connection to invalidate any subscriptions that are using it.
synchronized (lock) {
getOrCreateEndpointQueueAndRegisterSubscription(messageClient, endpointId);
if (endpointPushHandlers != null) {
EndpointPushHandler handler = endpointPushHandlers.get(endpointId);
if (handler != null)
handler.registerMessageClient(messageClient);
}
}
}
}
/**
* Removes the attribute bound to the specified name for the FlexClient.
*
* @param name The name of the attribute to remove.
*/
public void removeAttribute(String name) {
Object value; // Used for event dispatch after the attribute is removed.
synchronized (lock) {
checkValid();
updateLastUse();
value = (attributes != null) ? attributes.remove(name) : null;
}
// If no value was bound under this name it's a no-op.
if (value == null)
return;
notifyAttributeUnbound(name, value);
notifyAttributeRemoved(name, value);
}
/**
* Removes a FlexClient attribute listener.
*
* @param listener The listener to remove.
*/
public void removeClientAttributeListener(FlexClientAttributeListener listener) {
// No need to check validity; removing a listener is always ok.
if (listener != null && attributeListeners != null)
attributeListeners.remove(listener);
}
/**
* Removes a FlexClient destroyed listener.
*
* @param listener The listener to remove.
* @see flex.messaging.client.FlexClientListener
*/
public void removeClientDestroyedListener(FlexClientListener listener) {
// No need to check validity; removing a listener is always ok.
if (listener != null && destroyedListeners != null)
destroyedListeners.remove(listener);
}
/**
* Removes the attribute bound to the specified name for all the FlexSessions
* associated with the FlexClient.
*
* @param name The name of the attribute to remove.
*/
public void removeSessionAttribute(String name) {
for (FlexSession session : sessions)
session.removeAttribute(name);
}
/**
* Implements FlexSessionListener interface.
* Notification that a FlexSession was created.
* This is a no-op because the FlexClient is never added as a static FlexSession created listener
* but this method is required by the interface. We only listen for the destroyed event from
* associated FlexSessions.
*
* @param session The FlexSession that was created.
*/
public void sessionCreated(FlexSession session) {
}
/**
* Implements FlexSessionListener interface.
* Notification that an associated FlexSession was destroyed.
*
* @param session The FlexSession that was destroyed.
*/
public void sessionDestroyed(FlexSession session) {
unregisterFlexSession(session);
}
/**
* Binds an attribute value for the FlexClient under the specified name.
*
* @param name The name to bind the attribute under.
* @param value The value of the attribute.
*/
public void setAttribute(String name, Object value) {
// Null value set is the same as removeAttribute().
if (value == null) {
removeAttribute(name);
return;
}
Object oldValue; // Used to determine which events to dispatch after the set is performed.
// Only synchronize for the attribute mutation; event dispatch doesn't require it.
synchronized (lock) {
checkValid();
updateLastUse();
if (attributes == null)
attributes = new HashMap<String, Object>();
oldValue = attributes.put(name, value);
}
if (oldValue == null) {
notifyAttributeBound(name, value);
notifyAttributeAdded(name, value);
} else {
notifyAttributeUnbound(name, oldValue);
notifyAttributeReplaced(name, oldValue);
notifyAttributeBound(name, value);
}
}
/**
* Binds an attribute value for the current FlexSession associated with the
* FlexClient under the specified name. If the current FlexSession is NIO-based
* (NIOHTTPFlexSession or RTMPFlexSession), and if the FlexClient is associated
* with a Servlet-based session (HttpFlexSession) as well, the attribute is bound
* on the Servlet-based session too to make it available to the underlying J2EE HttpSession.
*
* @param name The name to bind the attribute under.
* @param value The value of the attribute.
*/
public void setSessionAttribute(String name, Object value) {
setSessionAttributeInCurrentSession(name, value);
if (!isCurrentSessionServletBased())
setSessionAttributeInServletBasedSession(name, value);
}
/**
* Implements TimeoutCapable.
* Inform the object that it has timed out.
*/
public void timeout() {
invalidate();
}
/**
* Unregisters an <tt>EndpointPushHandler</tt> from the specified endpoint.
*
* @param handler The <tt>EndpointPushHandler</tt> to unregister.
* @param endpointId The endpoint to unregister from.
*/
public void unregisterEndpointPushHandler(EndpointPushHandler handler, String endpointId) {
synchronized (lock) {
if (endpointPushHandlers == null)
return; // No-op.
if (endpointPushHandlers.get(endpointId).equals(handler))
endpointPushHandlers.remove(endpointId);
}
}
/**
* Used internally to disassociate a FlexSession from this FlexClient.
*
* @param session The FlexSession to disassociate from this FlexClient.
*/
public void unregisterFlexSession(FlexSession session) {
if (sessions.remove(session)) {
session.removeSessionDestroyedListener(this);
session.unregisterFlexClient(this);
// Once all client sessions/connections terminate; shut down.
if (sessions.isEmpty())
invalidate();
}
}
/**
* Used internally to disassociate a MessageClient (subscription) from a FlexClient.
*
* @param messageClient The MessageClient to disassociate from the FlexClient.
*/
public void unregisterMessageClient(MessageClient messageClient) {
if (messageClients != null && messageClients.remove(messageClient)) {
messageClient.removeMessageClientDestroyedListener(this);
String endpointId = messageClient.getEndpointId();
// Manage the outbound queue that this subscription uses.
synchronized (lock) {
EndpointQueue queue = outboundQueues.get(endpointId);
if (queue != null) {
// Decrement the ref count of MessageClients using this queue.
queue.messageClientRefCount--;
// Unregister the message client from the outbound throttle
// manager (if one exists).
OutboundQueueThrottleManager tm = queue.processor.getOutboundQueueThrottleManager();
if (tm != null)
tm.unregisterAllSubscriptions(messageClient.getDestinationId());
// If we're not attempting to notify the remote client that this MessageClient has
// been invalidated, remove any associated messages from the queue.
if (!messageClient.isAttemptingInvalidationClientNotification()) {
Object messageClientId = messageClient.getClientId();
for (Iterator<Message> iter = queue.messages.iterator(); iter.hasNext(); ) {
Message message = iter.next();
if (message.getClientId().equals(messageClientId))
iter.remove();
}
}
// If no active subscriptions require the queue, clean it up if possible.
if (queue.messageClientRefCount == 0) {
if (queue.messages.isEmpty() || messageClient.isClientChannelDisconnected()) {
if (queue.asyncPoll != null) // Close out async long-poll if one is registered.
{
FlushResult flushResult = internalFlush(queue);
// If the MessageClient isn't attempting client notification, override
// and do so in this case to suppress the next poll request from the remote client
// which will fail triggering an unnecessary channel disconnect on the client.
if (!messageClient.isAttemptingInvalidationClientNotification()) {
CommandMessage msg = new CommandMessage();
msg.setClientId(messageClient.getClientId());
msg.setOperation(CommandMessage.SUBSCRIPTION_INVALIDATE_OPERATION);
List<Message> messages = flushResult.getMessages();
if (messages == null)
messages = new ArrayList<Message>(1);
messages.add(msg);
}
completeAsyncPoll(queue.asyncPoll, flushResult);
}
// Remove the empty, unused queue.
outboundQueues.remove(endpointId);
}
// Otherwise, the queue is being used by a polling client or contains messages
// that will be written by a delayed flush.
// Leave it in place. Once the next poll request or delayed flush occurs the
// queue will be cleaned up at that point. See internalFlush() and shutdownQueue().
}
// Make sure to notify any threads waiting on this queue that may be associated
// with the subscription that's gone away.
synchronized (queue) {
queue.notifyAll();
}
}
// And if this subscription was associated with an endpoint push handler, unregister it.
if (endpointPushHandlers != null) {
EndpointPushHandler handler = endpointPushHandlers.get(endpointId);
if (handler != null)
handler.unregisterMessageClient(messageClient);
}
}
}
}
//--------------------------------------------------------------------------
//
// Protected Methods
//
//--------------------------------------------------------------------------
/**
* Utility method that tests validity and throws an exception if the instance
* has been invalidated.
*/
protected void checkValid() {
synchronized (lock) {
if (!valid) {
MessageException e = new MessageException();
e.setMessage(FLEX_CLIENT_INVALIDATED);
throw e;
}
}
}
/**
* Invoked to clean up a timed out or closed async poll.
*
* @param asyncPoll The async poll to complete.
* @param result The FlushResult for the poll response.
*/
protected void completeAsyncPoll(AsyncPollWithTimeout asyncPoll, FlushResult result) {
synchronized (lock) {
asyncPoll.cancelTimeout();
EndpointQueue queue = asyncPoll.getEndpointQueue();
if (queue.asyncPoll.equals(asyncPoll))
queue.asyncPoll = null;
FlexSession session = asyncPoll.getFlexSession();
synchronized (session) {
if (session.asyncPollMap != null)
session.asyncPollMap.remove(asyncPoll.getEndpointId());
}
asyncPoll.getHandler().asyncPollComplete(result);
}
}
/**
* Invoked to flush queued outbound messages to a client directly using a session
* that supports real-time push.
* Called by push() or delayed flush tasks for push-enabled sessions/connections.
*/
protected void directFlush(EndpointQueue queue) {
synchronized (lock) {
// No need to invoke flush if the FlexClient has been invalidated.
if (!valid)
return;
// If this invocation is a callback from a flush task, null out the task ref on
// the queue to allow a subsequent delayed flush to be scheduled.
if (queue.flushTask != null)
queue.flushTask = null;
FlushResult flushResult = internalFlush(queue, null, false /* updateLastUse */);
if (flushResult == null) // If there's no flush result, return.
return;
// Pass any messages that are ready to flush off to the network layer.
List<Message> messages = flushResult.getMessages();
if (messages != null && !messages.isEmpty()) {
if (queue.pushSession != null) {
if (queue.pushSession instanceof ConnectionAwareSession) {
// Update last use only if we're actually writing back to the client.
if (((ConnectionAwareSession) queue.pushSession).isConnected())
updateLastUse();
}
for (Message msg : messages)
queue.pushSession.push(msg);
} else if (endpointPushHandlers != null) {
updateLastUse();
EndpointPushHandler handler = endpointPushHandlers.get(queue.endpointId);
handler.pushMessages(messages);
}
}
// Schedule a delayed flush if necessary.
int flushWaitTime = flushResult.getNextFlushWaitTimeMillis();
if (flushWaitTime > 0) // Set up and schedule the delayed flush task.
queue.flushTask = new FlexClientScheduledFlushForPush(queue, flushWaitTime);
}
}
/**
* Utility method to initialize an EndpointQueue (if necessary) and associate a subscription (MessageClient) with it.
*/
protected EndpointQueue getOrCreateEndpointQueueAndRegisterSubscription(MessageClient messageClient, String endpointId) {
EndpointQueue newQueue;
if (!outboundQueues.containsKey(endpointId)) {
newQueue = new EndpointQueue();
newQueue.flexClient = this;
newQueue.endpointId = endpointId;
newQueue.endpoint = flexClientManager.getMessageBroker().getEndpoint(endpointId);
newQueue.messages = new ArrayList<Message>(); /* Default size of 10 is fine */
FlexSession session = messageClient.getFlexSession();
if (session.isPushSupported())
newQueue.pushSession = session;
newQueue.processor = flexClientManager.createOutboundQueueProcessor(this, endpointId);
newQueue.messageClientRefCount = 1;
outboundQueues.put(endpointId, newQueue);
} else {
newQueue = outboundQueues.get(endpointId);
newQueue.messageClientRefCount++;
// Resubscribes as a result of network connectivity issues may arrive over the same
// endpoint but use a new session.
FlexSession session = messageClient.getFlexSession();
if (session.isPushSupported())
newQueue.pushSession = session;
}
return newQueue;
}
/**
* Utility method to flush the outbound queue and log any problems.
* Any exceptions are logged and then rethrown.
*
* @param queue The outbound queue to flush.
*/
protected FlushResult internalFlush(EndpointQueue queue) {
return internalFlush(queue, null);
}
/**
* Utility method to flush the outbound queue and log any problems.
* If a specific client is passed, we need to invoke a client-specific flush.
* If the passed client is null, we do a general flush of the queue.
* Any exceptions are logged and then rethrown.
*
* @param queue The outbound queue to flush.
* @param client The client to flush for.
*/
protected FlushResult internalFlush(EndpointQueue queue, MessageClient client) {
return internalFlush(queue, null, true);
}
/**
* Utility method to flush the outbound queue and log any problems.
* If a specific client is passed, we need to invoke a client-specific flush.
* If the passed client is null, we do a general flush of the queue.
* Any exceptions are logged and then rethrown.
*
* @param queue The outbound queue to flush.
* @param client The client to flush for.
* @param updateLastUse Whether the last-use timestamp of the FlexClient should
* be updated.
*/
protected FlushResult internalFlush(EndpointQueue queue, MessageClient client,
boolean updateLastUse) {
FlushResult flushResult;
try {
synchronized (queue) {
flushResult = queue.processor.flush(client, queue.messages);
shutdownQueue(queue);
}
if (updateLastUse)
updateLastUseIfNecessary(flushResult);
} catch (RuntimeException e) {
if (Log.isError())
Log.getLogger(FLEX_CLIENT_LOG_CATEGORY).error("Failed to flush an outbound queue for FlexClient '" + getId() + "'.", e);
throw e;
}
return flushResult;
}
/**
* Utility method to flush messages in response to a poll request with
* regular and wait poll.
*
* @param queue The endpoint queue to flush messages for.
* @return The flush result with messages, or null if there are no messages.
*/
protected FlushResult internalPoll(EndpointQueue queue) {
List<Message> allMessages = new ArrayList<Message>();
// First, add the previously flushed messages.
if (queue.flushedMessagesBetweenPolls != null && queue.flushedMessagesBetweenPolls.size() > 0) {
allMessages.addAll(queue.flushedMessagesBetweenPolls);
queue.flushedMessagesBetweenPolls.clear();
}
// Then, check for regularly queued messages. We call internalFlush
// even if the queue is empty so the queue processor could know
// about the incoming poll request regardless.
FlushResult internalFlushResult = internalFlush(queue);
List<Message> flushedMessages = internalFlushResult.getMessages();
if (flushedMessages != null && !flushedMessages.isEmpty())
allMessages.addAll(flushedMessages);
// Schedule a delayed flush, if necessary.
int flushWaitTime = internalFlushResult.getNextFlushWaitTimeMillis();
if (flushWaitTime > 0)
queue.flushTask = new FlexClientScheduledFlushForPoll(queue, flushWaitTime);
if (allMessages.size() > 0) // Flush, if there are messages.
{
FlushResult flushResult = new FlushResult();
flushResult.setMessages(allMessages);
return flushResult;
}
return null;
}
/**
* Notify attribute listeners that an attribute has been added.
*
* @param name The name of the attribute.
* @param value The new value of the attribute.
*/
protected void notifyAttributeAdded(String name, Object value) {
if (attributeListeners != null && !attributeListeners.isEmpty()) {
FlexClientBindingEvent event = new FlexClientBindingEvent(this, name, value);
// CopyOnWriteArrayList is iteration-safe from ConcurrentModificationExceptions.
for (FlexClientAttributeListener attribListener : attributeListeners)
attribListener.attributeAdded(event);
}
}
/**
* Notify binding listener that it has been bound to the FlexClient.
*
* @param name The attribute name.
* @param value The attribute that has been bound.
*/
protected void notifyAttributeBound(String name, Object value) {
if ((value != null) && (value instanceof FlexClientBindingListener)) {
FlexClientBindingEvent bindingEvent = new FlexClientBindingEvent(this, name);
((FlexClientBindingListener) value).valueBound(bindingEvent);
}
}
/**
* Notify attribute listeners that an attribute has been removed.
*
* @param name The name of the attribute.
* @param value The previous value of the attribute.
*/
protected void notifyAttributeRemoved(String name, Object value) {
if (attributeListeners != null && !attributeListeners.isEmpty()) {
FlexClientBindingEvent event = new FlexClientBindingEvent(this, name, value);
// CopyOnWriteArrayList is iteration-safe from ConcurrentModificationExceptions.
for (FlexClientAttributeListener attribListener : attributeListeners)
attribListener.attributeRemoved(event);
}
}
/**
* Notify attribute listeners that an attribute has been replaced.
*
* @param name The name of the attribute.
* @param value The previous value of the attribute.
*/
protected void notifyAttributeReplaced(String name, Object value) {
if (attributeListeners != null && !attributeListeners.isEmpty()) {
FlexClientBindingEvent event = new FlexClientBindingEvent(this, name, value);
// CopyOnWriteArrayList is iteration-safe from ConcurrentModificationExceptions.
for (FlexClientAttributeListener attribListener : attributeListeners)
attribListener.attributeReplaced(event);
}
}
/**
* Notify binding listener that it has been unbound from the FlexClient.
*
* @param name The attribute name.
* @param value The attribute that has been unbound.
*/
protected void notifyAttributeUnbound(String name, Object value) {
if ((value != null) && (value instanceof FlexClientBindingListener)) {
FlexClientBindingEvent bindingEvent = new FlexClientBindingEvent(this, name);
((FlexClientBindingListener) value).valueUnbound(bindingEvent);
}
}
/**
* Invoked by FlexClientManager after this new FlexClient has been constructed and
* is fully configured.
*/
protected void notifyCreated() {
if (!createdListeners.isEmpty()) {
// CopyOnWriteArrayList is iteration-safe from ConcurrentModificationExceptions.
for (FlexClientListener createListener : createdListeners)
createListener.clientCreated(this);
}
}
/**
* Utility method used to shutdown endpoint queues accessed via polling channels
* that have no more active subscriptions and no more pending outbound messages.
*
* @param queue The queue to potentially shutdown.
* @return true if the queue was cleaned up/removed; otherwise false.
*/
protected boolean shutdownQueue(EndpointQueue queue) {
// If no more subscriptions are using the queue and it is empty, shut it down.
if (queue.messageClientRefCount == 0 && queue.messages.isEmpty()) {
outboundQueues.remove(queue.endpointId);
// Notify any threads waiting on this queue.
synchronized (queue) {
queue.notifyAll();
}
return true;
}
return false;
}
/**
* Utility method to throw a not subscribed exception back to the client
* if they issue a poll request to an endpoint that they haven't subscribed over.
* <p>
* This method should not be called when you hold an internal thread lock. It iterates
* over all the FlexClients in the current session and will not work if two or more
* FlexClients in the same session call it simultaneously.
*
* @param endpointId The endpoint Id.
*/
protected void throwNotSubscribedException(String endpointId) {
// Pre-3.1 versions of the client library did not handle URL session tokens properly
// and may incorrectly issue a poll, after subscribing, that does not contain the proper
// FlexClient id.
// This scenario looks like a poll from a client that is not subscribed, but it is not,
// and deserves a more useful error message.
// We determine this by checking for an (orphaned) FlexClient instance associated with the
// current session that has a subscription established through the target endpoint.
List<FlexClient> flexClients = FlexContext.getFlexSession().getFlexClients();
for (FlexClient otherClient : flexClients) {
if (!otherClient.equals(this)) {
List<MessageClient> otherSubs = otherClient.getMessageClients();
for (MessageClient otherSub : otherSubs) {
if (otherSub.getEndpointId().equals(endpointId)) {
// Throw not-subscribed exception with extra guidance.
FlexClientNotSubscribedException e = new FlexClientNotSubscribedException();
e.setMessage(10036, new Object[]{endpointId});
e.setCode(MessageService.NOT_SUBSCRIBED_CODE);
throw e;
}
}
}
}
// Throw general not-subscribed exception.
FlexClientNotSubscribedException e = new FlexClientNotSubscribedException();
e.setMessage(10028, new Object[]{endpointId});
e.setCode(MessageService.NOT_SUBSCRIBED_CODE);
throw e;
}
/**
* Updates the last-use timestamp if there are messages in the flush result.
*
* @param flushResult The flush result.
*/
protected void updateLastUseIfNecessary(FlushResult flushResult) {
List<Message> messages = flushResult != null ? flushResult.getMessages() : null;
if (messages != null && !messages.isEmpty())
updateLastUse();
}
//--------------------------------------------------------------------------
//
// Private Methods
//
//--------------------------------------------------------------------------
private Set<String> getSessionAttributeNames(FlexSession session) {
Set<String> attributeNames = new HashSet<String>();
Enumeration<String> currentAttributeNames = session.getAttributeNames();
while (currentAttributeNames.hasMoreElements())
attributeNames.add(currentAttributeNames.nextElement());
return attributeNames;
}
private Object getSessionAttributeInCurrentSession(String name) {
return FlexContext.getFlexSession().getAttribute(name);
}
private Object getSessionAttributeInOtherSessions(String name) {
FlexSession currentSession = FlexContext.getFlexSession();
for (FlexSession session : sessions) {
if (session == currentSession)
continue;
Object attributeValue = session.getAttribute(name);
if (attributeValue != null)
return attributeValue;
}
return null;
}
private void setSessionAttributeInCurrentSession(String name, Object value) {
FlexContext.getFlexSession().setAttribute(name, value);
}
private void setSessionAttributeInServletBasedSession(String name, Object value) {
for (FlexSession session : sessions) {
if (isServletBasedSession(session)) {
session.setAttribute(name, value);
return;
}
}
}
private boolean isCurrentSessionServletBased() {
return isServletBasedSession(FlexContext.getFlexSession());
}
private boolean isServletBasedSession(FlexSession session) {
return session instanceof HttpFlexSession;
}
//--------------------------------------------------------------------------
//
// Inner Classes
//
//--------------------------------------------------------------------------
/**
* Helper class for handling async poll requests. This class allows the response for an async poll
* to be delayed until data arrives to return to the client or the specified wait interval elapses.
* Wait timeouts are monitored by the <tt>FlexClientManager</tt> which contains a <tt>TimeoutManager</tt>
* instance that is started and stopped during application bootstrap and shutdown. Managing timeouts
* locally or statically isn't a good option because they lack a useful shutdown hook that's necessary
* in order to close down the timeout manager cleanly.
*/
public class AsyncPollWithTimeout extends TimeoutAbstractObject {
/**
* Constructor.
*
* @param flexClient flex client
* @param session flex session
* @param queue endpoint queue
* @param handler poll handler
* @param waitIntervalMillis wait interval
* @param endpointId endpoint
*/
public AsyncPollWithTimeout(FlexClient flexClient, FlexSession session, EndpointQueue queue, AsyncPollHandler handler, long waitIntervalMillis, String endpointId) {
this.flexClient = flexClient;
this.session = session;
this.queue = queue;
this.handler = handler;
setTimeoutPeriod(waitIntervalMillis);
flexClientManager.monitorAsyncPollTimeout(this);
this.endpointId = endpointId;
}
private final FlexClient flexClient;
/**
* Return client.
*
* @return flex client
*/
public FlexClient getFlexClient() {
return flexClient;
}
private final FlexSession session;
/**
* Return session.
*
* @return flex session
*/
public FlexSession getFlexSession() {
return session;
}
private final EndpointQueue queue;
/**
* Return endpoint queue.
*
* @return the queue
*/
public EndpointQueue getEndpointQueue() {
return queue;
}
private final AsyncPollHandler handler;
/**
* Return handler.
*
* @return the handler
*/
public AsyncPollHandler getHandler() {
return handler;
}
private final String endpointId;
/**
* Return endpoint ID.
*
* @return the id
*/
public String getEndpointId() {
return endpointId;
}
/**
* Trigger a timeout.
*/
public void timeout() {
completeAsyncPoll(this, null /* nothing to return */);
}
}
/**
* Helper class to flush a FlexClient's outbound queue after a specified delay.
* Delayed flushes are handled by the <tt>FlexClientManager</tt>
* using <tt>TimeoutManager</tt>.
*/
abstract class FlexClientScheduledFlush extends TimeoutAbstractObject {
final EndpointQueue queue;
public FlexClientScheduledFlush(EndpointQueue queue, long waitIntervalMillis) {
this.queue = queue;
setTimeoutPeriod(waitIntervalMillis);
flexClientManager.monitorScheduledFlush(this);
}
abstract void performFlushTask();
public void timeout() {
FlexContext.setThreadLocalFlexClient(FlexClient.this);
performFlushTask();
FlexContext.setThreadLocalFlexClient(null);
}
}
/**
* Helper class for push channels to directly flush a FlexClient's outbound
* queue after a specified delay.
*/
class FlexClientScheduledFlushForPush extends FlexClientScheduledFlush {
public FlexClientScheduledFlushForPush(EndpointQueue queue, long waitIntervalMillis) {
super(queue, waitIntervalMillis);
}
@Override
void performFlushTask() {
synchronized (lock) {
synchronized (queue) {
directFlush(queue);
}
}
}
}
/**
* Helper class for polling channels to flush a FlexClient's outbound
* queue to flushedMessagesBetweenPolls queue after a specified delay.
* When the next poll happens, the flushedMessagesBetweenPolls will be
* drained first.
*/
class FlexClientScheduledFlushForPoll extends FlexClientScheduledFlush {
public FlexClientScheduledFlushForPoll(EndpointQueue queue, long waitIntervalMillis) {
super(queue, waitIntervalMillis);
}
@Override
void performFlushTask() {
synchronized (lock) {
// No need to invoke flush if the FlexClient has been invalidated.
if (!valid)
return;
// If this invocation is a callback from a flush task, null out the task ref on
// the queue to allow a subsequent delayed flush to be scheduled.
if (queue.flushTask != null)
queue.flushTask = null;
FlushResult flushResult = internalFlush(queue);
if (flushResult == null)
return;
List<Message> messages = flushResult.getMessages();
if (messages != null && messages.size() > 0) {
if (queue.asyncPoll != null) {
completeAsyncPoll(queue.asyncPoll, flushResult);
} else {
if (queue.flushedMessagesBetweenPolls == null)
queue.flushedMessagesBetweenPolls = new ArrayList<Message>();
queue.flushedMessagesBetweenPolls.addAll(messages);
}
}
// Schedule a delayed flush, if necessary.
int flushWaitTime = flushResult.getNextFlushWaitTimeMillis();
if (flushWaitTime > 0)
queue.flushTask = new FlexClientScheduledFlushForPoll(queue, flushWaitTime);
}
}
}
/**
* Helper class that stores per-endpoint outbound queue state including:
* <ul>
* <li>flexClient - The <tt>FlexClient</tt> the queue is used by.</li>
* <li>messages - The outbound queue of messages for the endpoint.</li>
* <li>flushedMessagesBetweenPolls - Keeps track of flushed (more precisely
* drained buffered) messages between polls. A seperate list is needed
* from messages list to avoid regular flush handling.</li>
* <li>flushedMessagesBetweenPolls - Keeps track of flushed messages between polls.</li>
* <li>processor - The processor that handles adding messages to the queue as well as flushing
* them to the network.</li>
* <li>asyncPoll - The async poll to timeout or callback when messages arrive
* (null if the endpoint or session supports direct push).</li>
* <li>pushSession - A reference to a pushSession to use for direct writes to the
* client (null if the endpoint uses polling or handles push directly).</li>
*
* <li>flushTask - A reference to a pending flush task that will perform a delayed flush of the queue;
* null if no delayed flush has been scheduled.</li>
* <li>messageClientRefCount - A reference count of MessageClients subcribed over this endpoint.
* Once all MessageClients unsubscribe this queue can be shut down.</li>
* <li>avoidBusyPolling - Used to signal poll result generation for the queue to avoid busy polling.</li>
* </ul>
*/
public static class EndpointQueue {
public FlexClient flexClient;
public String endpointId;
public Endpoint endpoint;
public List<Message> messages;
public List<Message> flushedMessagesBetweenPolls;
public FlexClientOutboundQueueProcessor processor;
public AsyncPollWithTimeout asyncPoll;
public boolean waitPoll;
public FlexSession pushSession;
public TimeoutAbstractObject flushTask;
public int messageClientRefCount;
public boolean avoidBusyPolling;
}
}