blob: cd6db61f7fa2dde38a88e3d9c93f5ead3e3c0a71 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package flex.messaging.client;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;
import flex.messaging.FlexContext;
import flex.messaging.FlexSession;
import flex.messaging.FlexSessionListener;
import flex.messaging.MessageClient;
import flex.messaging.MessageClientListener;
import flex.messaging.endpoints.BaseStreamingHTTPEndpoint;
import flex.messaging.endpoints.Endpoint;
import flex.messaging.log.Log;
import flex.messaging.log.LogCategories;
import flex.messaging.messages.AsyncMessage;
import flex.messaging.messages.CommandMessage;
import flex.messaging.util.TimeoutAbstractObject;
import flex.messaging.util.UUIDUtils;
/**
*
* Instances of this class are used by endpoints that support streaming
* outbound data to connected clients when the client is not polling and
* the FlexSession representing the connection does not support push directly.
* This generally requires that the client and endpoint establish a separate,
* physical connection for pushed data that is part of a larger, logical
* connection/session.
* <p>
* When the endpoint establishes this physical streaming connection it will
* create an instance of this class, register it with the FlexClient and then
* wait on the public <code>pushNeeded</code> condition variable.
* When data arrives to push to the remote client, the FlexClient will queue it
* with this notifier instance and the waiting endpoint will be notified.
* The endpoint will retrieve the queued messages from the notifier instance and will
* stream them to the client and then go back into a wait state on the
* <code>pushNeeded</code> condition variable.
* </p><p>
* Note that this implementation is based upon <code>Object.wait()</code>; it is not a
* non-blocking implementation.
* </p>
*/
public class EndpointPushNotifier extends TimeoutAbstractObject implements EndpointPushHandler, FlexSessionListener, MessageClientListener
{
//--------------------------------------------------------------------------
//
// Constructor
//
//--------------------------------------------------------------------------
/**
* Constructs a PushNotifier instance.
*
* @param endpoint The endpoint that will use this notifier.
* @param flexClient The FlexClient that will use this notifier.
*/
public EndpointPushNotifier(Endpoint endpoint, FlexClient flexClient)
{
notifierId = UUIDUtils.createUUID(false /* doesn't need to be secure */);
this.endpoint = endpoint;
this.flexClient = flexClient;
flexClient.registerEndpointPushHandler(this, endpoint.getId());
flexSession = FlexContext.getFlexSession();
if (flexSession != null)
flexSession.addSessionDestroyedListener(this);
invalidateMessageClientOnStreamingClose = (endpoint instanceof BaseStreamingHTTPEndpoint)?
((BaseStreamingHTTPEndpoint)endpoint).isInvalidateMessageClientOnStreamingClose() : false;
updateLastUse(); // Initialize last use timestamp to construct time.
}
//--------------------------------------------------------------------------
//
// Public Variables
//
//--------------------------------------------------------------------------
/**
* The condition variable that the endpoint waits on for pushed data to arrive.
*/
public final Object pushNeeded = new Object();
//--------------------------------------------------------------------------
//
// Private Variables
//
//--------------------------------------------------------------------------
/**
* Flag indicating whether the notifier has been closed/shut down.
* This is used to signal a waiting endpoint that it should break out of its
* wait loop and close its streaming connection.
*/
private volatile boolean closed;
/**
* Flag indicating that the notifier has started closing; used to allow only
* one thread to execute the close() logic and delay flipping closed to true
* to allow any final messages to be streamed to the client before the endpoint
* using the notifier breaks out of its wait/notify loop and terminates the
* streaming connection.
*/
private volatile boolean closing;
/**
* The number of minutes a client can remain idle before the server
* times the notifier out.
*/
private int idleTimeoutMinutes;
/**
* Whether to invalidate the message-client when the streaming connection is closed.
*/
private final boolean invalidateMessageClientOnStreamingClose;
/**
* The endpoint that uses this notifier.
*/
private final Endpoint endpoint;
/**
* The FlexClient this notifier is associated with.
*/
private final FlexClient flexClient;
/**
* The FlexSession this notifier is associated with.
*/
private final FlexSession flexSession;
/**
* Lock for instance-level synchronization.
*/
private final Object lock = new Object();
/**
* Log category used by the notifier. Initialized to ENDPOINT_GENERAL but
* endpoints using this notifier should set it to their own log categories.
*/
private String logCategory = LogCategories.ENDPOINT_GENERAL;
/**
* Queue of messages that the FlexClient will populate and the endpoint will drain to
* stream to the client.
*/
private List<AsyncMessage> messages;
/**
* List of MessageClient subscriptions using this endpoint push notifier.
* When this notifier is closed, any associated subscriptions need to be invalidated.
*/
private final CopyOnWriteArrayList<MessageClient> messageClients = new CopyOnWriteArrayList<MessageClient>();
/**
* Unique identifier for this instance.
*/
private final String notifierId;
//--------------------------------------------------------------------------
//
// Public Methods
//
//--------------------------------------------------------------------------
/**
* Moves this notifier to a closed state, notifying any listeners,
* associated subscriptions and waiting endpoints.
* Does not attempt to notify the client Channel of the disconnect thereby allowing
* automatic reconnect processing to run.
*/
public void close()
{
close(false);
}
/**
* Moves this notifier to a closed state, notifying any listeners,
* associated subscriptions and waiting endpoints.
* Attempts to notify the client Channel of an explicit disconnect, thereby suppressing
* automatic reconnect processing.
*
* @param disconnectChannel True to attempt to notify the client Channel of the disconnect
* and suppress automatic reconnect processing.
*/
public void close(boolean disconnectChannel)
{
synchronized (lock)
{
if (closed || closing)
return;
closing = true;
}
cancelTimeout();
if (flexSession != null)
flexSession.removeSessionDestroyedListener(this);
// Shut down flow of further messages to this notifier.
flexClient.unregisterEndpointPushHandler(this, endpoint.getId());
// Push a disconnect command down to the client to suppress automatic reconnect.
if (disconnectChannel)
{
ArrayList<AsyncMessage> list = new ArrayList<AsyncMessage>(1);
CommandMessage disconnect = new CommandMessage(CommandMessage.DISCONNECT_OPERATION);
list.add(disconnect);
pushMessages(list);
}
// Invalidate associated subscriptions; this doesn't attempt to notify the client.
// Any client subscriptions made over this endpoint will be automatically invalidated
// on the client when it receives its channel disconnect event.
if (invalidateMessageClientOnStreamingClose)
{
for (Iterator<MessageClient> iter = messageClients.iterator() ; iter.hasNext();)
iter.next().invalidate();
}
// Move to final closed state; after this we need to notify one last time to stream
// any final messages to the client and allow the endpoint to shut down its streaming
// connection.
synchronized (lock)
{
closed = true;
closing = false;
}
synchronized (pushNeeded)
{
pushNeeded.notifyAll();
}
}
/**
* Returns any messages available to push to the client, and removes them
* from this notifier.
* Notified endpoints should invoke this method to retrieve messages, stream them
* to the client and then re-enter the wait state.
* This method acquires a lock on <code>pushNeeded</code>.
*
* @return The messages to push to the client.
*/
public List<AsyncMessage> drainMessages()
{
synchronized (pushNeeded)
{
List<AsyncMessage> messagesToPush = messages;
messages = null;
return messagesToPush;
}
}
/**
* Returns whether the notifier has closed; used to break the endpoint's wait cycle.
*
* @return True if the notifier has closed; otherwise false.
*/
public boolean isClosed()
{
return closed;
}
/**
* Returns the endpoint that is using this notifier.
*
* @return The endpoint using this notifier.
*/
public Endpoint getEndpoint()
{
return endpoint;
}
/**
* Returns the idle timeout minutes used by the notifier.
*
* @return The idle timeout minutes used by the notifier.
*/
public int getIdleTimeoutMinutes()
{
return idleTimeoutMinutes;
}
/**
* Sets the idle timeout minutes used by the notifier.
*
* @param idleTimeoutMinutes The idle timeout minutes used by the notifier.
*/
public void setIdleTimeoutMinutes(int idleTimeoutMinutes)
{
this.idleTimeoutMinutes = idleTimeoutMinutes;
}
/**
* Returns the log category used by this notifier.
*
* @return The log category used by this notifier.
*/
public String getLogCategory()
{
return logCategory;
}
/**
* Sets the log category used by this notifier. Endpoints using this notifier
* should set it to their own log categories.
*
* @param logCategory The log category for the notifier to use.
*/
public void setLogCategory(String logCategory)
{
this.logCategory = logCategory;
}
/**
* Returns the unique id for this notifier.
*
* @return The unique id for this notifier.
*/
public String getNotifierId()
{
return notifierId;
}
/**
*
* Implements TimeoutCapable.
* Determine the time, in milliseconds, that this object is allowed to idle
* before having its timeout method invoked.
*/
public long getTimeoutPeriod()
{
return (idleTimeoutMinutes * 60 * 1000);
}
/**
*
*/
public void messageClientCreated(MessageClient messageClient)
{
// No-op.
}
/**
*
*/
public void messageClientDestroyed(MessageClient messageClient)
{
unregisterMessageClient(messageClient);
}
/**
* Used by FlexClient to push messages to the endpoint.
* This method will automatically notify a waiting endpoint, if one exists
* and it acquires a lock on <code>pushNeeded</code>.
*
* @param messages The messages to push to the client.
*/
public void pushMessages(List messagesToPush)
{
if (!messagesToPush.isEmpty())
{
synchronized (pushNeeded)
{
// Push these straight on through; notify immediately.
if (messages == null)
messages = messagesToPush;
else
messages.addAll(messagesToPush);
// If the notifier isn't closing, notify; otherwise just add and the close will
// notify once it completes.
if (!closing)
pushNeeded.notifyAll();
}
}
}
/**
* Registers a MessageClient subscription that depends on this notifier.
*
* @param messageClient A MessageClient that depends on this notifier.
*/
public void registerMessageClient(MessageClient messageClient)
{
if (messageClient != null)
{
if (messageClients.addIfAbsent(messageClient))
messageClient.addMessageClientDestroyedListener(this);
}
}
/**
* Handle session creation events. This handler is a no-op because the notifier
* is only concerned with its associated session's destruction.
*
* @param flexSession The newly created FlexSession.
*/
public void sessionCreated(FlexSession flexSession)
{
// No-op.
}
/**
* Handle session destruction events. This will be invoked when the notifier's
* associated session is invalidated, and this will trigger the notifier to close.
*
* @param flexSession The FlexSession being invalidated.
*/
public void sessionDestroyed(FlexSession flexSession)
{
if (Log.isInfo())
Log.getLogger(logCategory).info("Endpoint with id '" + endpoint.getId() + "' is closing"
+ " a streaming connection for the FlexClient with id '" + flexClient.getId() + "'"
+ " since its associated session has been destroyed.");
close(true /* disconnect client Channel */);
}
/**
*
* Implements TimeoutCapable.
* Inform the object that it has timed out.
*/
public void timeout()
{
if (Log.isInfo())
Log.getLogger(logCategory).info("Endpoint with id '" + endpoint.getId() + "' is timing out"
+ " a streaming connection for the FlexClient with id '" + flexClient.getId() + "'");
close(true /* disconnect client Channel */);
}
/**
* Unregisters a MessageClient subscription that depended on this notifier.
*
* @param messageClient A MessageClient that depended on this notifier.
*/
public void unregisterMessageClient(MessageClient messageClient)
{
if (messageClient != null)
{
messageClient.removeMessageClientDestroyedListener(this);
messageClients.remove(messageClient);
}
}
}