blob: 3c29fae909653c0b2e12e334c28b00921cd17617 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package flex.messaging.endpoints;
import flex.messaging.FlexContext;
import flex.messaging.FlexSession;
import flex.messaging.client.FlexClient;
import flex.messaging.client.FlushResult;
import flex.messaging.client.PollFlushResult;
import flex.messaging.client.PollWaitListener;
import flex.messaging.client.UserAgentSettings;
import flex.messaging.config.ConfigMap;
import flex.messaging.config.ConfigurationConstants;
import flex.messaging.log.Log;
import flex.messaging.messages.CommandMessage;
import flex.messaging.util.UserAgentManager;
import java.util.concurrent.ConcurrentHashMap;
/**
* Base class for HTTP-based endpoints that support regular polling and long polling,
* which means placing request threads that are polling for messages into a wait
* state until messages are available for delivery or the configurable wait interval
* is reached.
*/
public abstract class BasePollingHTTPEndpoint extends BaseHTTPEndpoint implements PollWaitListener
{
//--------------------------------------------------------------------------
//
// Private Static Constants
//
//--------------------------------------------------------------------------
private static final String POLLING_ENABLED = "polling-enabled";
private static final String POLLING_INTERVAL_MILLIS = "polling-interval-millis";
private static final String POLLING_INTERVAL_SECONDS = "polling-interval-seconds"; // Deprecated configuration option.
private static final String MAX_WAITING_POLL_REQUESTS = "max-waiting-poll-requests";
private static final String WAIT_INTERVAL_MILLIS = "wait-interval-millis";
private static final String CLIENT_WAIT_INTERVAL_MILLIS = "client-wait-interval-millis";
// Force clients that exceed the long-poll limit to wait at least this long between poll requests.
// This matches the default polling interval defined in the client PollingChannel.
private static final int DEFAULT_WAIT_FOR_EXCESS_POLL_WAIT_CLIENTS = 3000;
// User Agent based settings manager
private UserAgentManager userAgentManager = new UserAgentManager();
//--------------------------------------------------------------------------
//
// Constructor
//
//--------------------------------------------------------------------------
/**
* Constructs an unmanaged <code>BasePollingHTTPEndpoint</code>.
*/
public BasePollingHTTPEndpoint()
{
this(false);
}
/**
* Constructs an <code>BasePollingHTTPEndpoint</code> with the indicated management.
*
* @param enableManagement <code>true</code> if the <code>BasePollingHTTPEndpoint</code>
* is manageable; <code>false</code> otherwise.
*/
public BasePollingHTTPEndpoint(boolean enableManagement)
{
super(enableManagement);
}
//--------------------------------------------------------------------------
//
// Initialize, validate, start, and stop methods.
//
//--------------------------------------------------------------------------
/**
* Initializes the <code>Endpoint</code> with the properties.
* If subclasses override this method, they must call <code>super.initialize()</code>.
*
* @param id The ID of the <code>Endpoint</code>.
* @param properties Properties for the <code>Endpoint</code>.
*/
@Override
public void initialize(String id, ConfigMap properties)
{
super.initialize(id, properties);
if (properties == null || properties.size() == 0)
{
// Initialize default user agent manager settings.
UserAgentManager.setupUserAgentManager(null, userAgentManager);
return; // Nothing else to initialize.
}
// General poll props.
pollingEnabled = properties.getPropertyAsBoolean(POLLING_ENABLED, false);
pollingIntervalMillis = properties.getPropertyAsLong(POLLING_INTERVAL_MILLIS, -1);
long pollingIntervalSeconds = properties.getPropertyAsLong(POLLING_INTERVAL_SECONDS, -1); // Deprecated
if (pollingIntervalSeconds > -1)
pollingIntervalMillis = pollingIntervalSeconds * 1000;
// Piggybacking props.
piggybackingEnabled = properties.getPropertyAsBoolean(ConfigurationConstants.PIGGYBACKING_ENABLED_ELEMENT, false);
// HTTP poll wait props.
maxWaitingPollRequests = properties.getPropertyAsInt(MAX_WAITING_POLL_REQUESTS, 0);
waitInterval = properties.getPropertyAsLong(WAIT_INTERVAL_MILLIS, 0);
clientWaitInterval = properties.getPropertyAsInt(CLIENT_WAIT_INTERVAL_MILLIS, 0);
// User Agent props.
UserAgentManager.setupUserAgentManager(properties, userAgentManager);
// Set initial state for the canWait flag based on whether we allow waits or not.
if (maxWaitingPollRequests > 0 && (waitInterval == -1 || waitInterval > 0))
{
waitEnabled = true;
canWait = true;
}
}
//--------------------------------------------------------------------------
//
// Variables
//
//--------------------------------------------------------------------------
/**
* This flag is volatile to allow for consistent reads across thread without
* needing to pay the cost for a synchronized lock for each read.
*/
private volatile boolean canWait;
/**
* Used to synchronize sets and gets to the number of waiting clients.
*/
protected final Object lock = new Object();
/**
* Set when properties are handled; used as a shortcut for logging to determine whether this
* instance attempts to put request threads in a wait state or not.
*/
private boolean waitEnabled;
/**
* A count of the number of request threads that are currently in the wait state (including
* those on their way into or out of it).
*/
protected int waitingPollRequestsCount;
/**
* A Map(notification Object for a waited request thread, Boolean.TRUE).
*/
private ConcurrentHashMap currentWaitedRequests;
//--------------------------------------------------------------------------
//
// Properties
//
//--------------------------------------------------------------------------
//----------------------------------
// clientWaitInterval
//----------------------------------
protected int clientWaitInterval = 0;
/**
* Retrieves the number of milliseconds the client will wait after receiving a response for
* a poll with server wait before it issues its next poll request.
* A value of zero or less causes the client to use its default polling interval (based on the
* channel's polling-interval-millis configuration) and this value is ignored.
* A value greater than zero will cause the client to wait for the specified interval before
* issuing its next poll request with a value of 1 triggering an immediate poll from the client
* as soon as a waited poll response is received.
* @return The client wait interval.
*/
public int getClientWaitInterval()
{
return clientWaitInterval;
}
/**
* Sets the number of milliseconds a client will wait after receiving a response for a poll
* with server wait before it issues its next poll request.
* A value of zero or less causes the client to use its default polling interval (based on the
* channel's polling-interval-millis configuration) and this value is ignored.
* A value greater than zero will cause the client to wait for the specified interval before
* issuing its next poll request with a value of 1 triggering an immediate poll from the client
* as soon as a waited poll response is received.
* This property does not effect polling clients that poll the server without a server wait.
*
* @param value The number of milliseconds a client will wait before issuing its next poll when the
* server is configured to wait.
*/
public void setClientWaitInterval(int value)
{
clientWaitInterval = value;
}
//----------------------------------
// maxWaitingPollRequests
//----------------------------------
protected int maxWaitingPollRequests = 0;
/**
* Retrieves the maximum number of server poll response threads that will be
* waiting for messages to arrive for clients.
* @return The maximum number of waiting poll requests.
*/
public int getMaxWaitingPollRequests()
{
return maxWaitingPollRequests;
}
/**
* Sets the maximum number of server poll response threads that will be
* waiting for messages to arrive for clients.
*
* @param maxWaitingPollRequests The maximum number of server poll response threads
* that will be waiting for messages to arrive for the client.
*/
public void setMaxWaitingPollRequests(int maxWaitingPollRequests)
{
this.maxWaitingPollRequests = maxWaitingPollRequests;
if (maxWaitingPollRequests > 0 && (waitInterval == -1 || waitInterval > 0))
{
waitEnabled = true;
canWait = (waitingPollRequestsCount < maxWaitingPollRequests);
}
}
//----------------------------------
// pollingEnabled
//----------------------------------
/**
*
* This is a property used on the client.
*/
protected boolean piggybackingEnabled;
//----------------------------------
// pollingEnabled
//----------------------------------
/**
*
* This is a property used on the client.
*/
protected boolean pollingEnabled;
//----------------------------------
// pollingIntervalMillis
//----------------------------------
/**
*
* This is a property used on the client.
*/
protected long pollingIntervalMillis = -1;
//----------------------------------
// waitInterval
//----------------------------------
protected long waitInterval = 0;
/**
* Retrieves the number of milliseconds the server poll response thread will be
* waiting for messages to arrive for the client.
* @return The wait interval.
*/
public long getWaitInterval()
{
return waitInterval;
}
/**
* Sets the number of milliseconds the server poll response thread will be
* waiting for messages to arrive for the client.
*
* @param waitInterval The number of milliseconds the server poll response thread will be
* waiting for messages to arrive for the client.
*/
public void setWaitInterval(long waitInterval)
{
this.waitInterval = waitInterval;
if (maxWaitingPollRequests > 0 && (waitInterval == -1 || waitInterval > 0))
{
waitEnabled = true;
canWait = (waitingPollRequestsCount < maxWaitingPollRequests);
}
}
//----------------------------------
// waitingPollRequestsCount
//----------------------------------
/**
* Retrieves the count of the number of request threads that are currently in the wait state
* (including those on their way into or out of it).
*
* @return The count of the number of request threads that are currently in the wait state.
*/
public int getWaitingPollRequestsCount()
{
return waitingPollRequestsCount;
}
//--------------------------------------------------------------------------
//
// Public Methods
//
//--------------------------------------------------------------------------
/**
*
* Returns a <code>ConfigMap</code> of endpoint properties that the client
* needs. This includes properties from <code>super.describeEndpoint</code>
* and additional <code>BaseHTTPEndpoint</code> specific properties under
* "properties" key.
*/
@Override
public ConfigMap describeEndpoint()
{
ConfigMap endpointConfig = super.describeEndpoint();
boolean createdProperties = false;
ConfigMap properties = endpointConfig.getPropertyAsMap(PROPERTIES_ELEMENT, null);
if (properties == null)
{
properties = new ConfigMap();
createdProperties = true;
}
if (pollingEnabled)
{
ConfigMap pollingEnabled = new ConfigMap();
// Adding as a value rather than attribute to the parent
pollingEnabled.addProperty(EMPTY_STRING, TRUE_STRING);
properties.addProperty(POLLING_ENABLED, pollingEnabled);
}
if (pollingIntervalMillis > -1)
{
ConfigMap pollingInterval = new ConfigMap();
// Adding as a value rather than attribute to the parent
pollingInterval.addProperty(EMPTY_STRING, String.valueOf(pollingIntervalMillis));
properties.addProperty(POLLING_INTERVAL_MILLIS, pollingInterval);
}
if (piggybackingEnabled)
{
ConfigMap piggybackingEnabled = new ConfigMap();
// Adding as a value rather than attribute to the parent
piggybackingEnabled.addProperty(EMPTY_STRING, String.valueOf(piggybackingEnabled));
properties.addProperty(ConfigurationConstants.PIGGYBACKING_ENABLED_ELEMENT, piggybackingEnabled);
}
if (createdProperties && properties.size() > 0)
endpointConfig.addProperty(ConfigurationConstants.PROPERTIES_ELEMENT, properties);
return endpointConfig;
}
/**
* Sets up monitoring of waited poll requests so they can be notified and exit when the
* endpoint stops.
*
* @see flex.messaging.endpoints.AbstractEndpoint#start()
*/
@Override
public void start()
{
if (isStarted())
return;
super.start();
currentWaitedRequests = new ConcurrentHashMap();
}
/**
* Ensures that no poll requests in a wait state are left un-notified when the endpoint stops.
*
* @see flex.messaging.endpoints.AbstractEndpoint#stop()
*/
@Override
public void stop()
{
if (!isStarted())
return;
// Notify any currently waiting polls.
for (Object notifier : currentWaitedRequests.keySet())
{
synchronized (notifier)
{
notifier.notifyAll(); // Break any current waits.
}
}
currentWaitedRequests = null;
super.stop();
}
/**
* (non-Javaodc)
* @see flex.messaging.client.PollWaitListener#waitStart(Object)
*/
public void waitStart(Object notifier)
{
currentWaitedRequests.put(notifier, Boolean.TRUE);
}
/**
* (non-Javaodc)
* @see flex.messaging.client.PollWaitListener#waitEnd(Object)
*/
public void waitEnd(Object notifier)
{
if (currentWaitedRequests != null)
currentWaitedRequests.remove(notifier);
}
//--------------------------------------------------------------------------
//
// Protected Methods
//
//--------------------------------------------------------------------------
/**
* Overrides the base poll handling to support optionally putting Http request handling threads
* into a wait state until messages are available to be delivered in the poll response or a timeout is reached.
* The number of threads that may be put in a wait state is bounded by <code>max-waiting-poll-requests</code>
* and waits will only be attempted if the canWait flag that is based on the <code>max-waiting-poll-requests</code>
* and the specified <code>wait-interval</code> is true.
*
* @param flexClient The FlexClient that issued the poll request.
* @param pollCommand The poll command from the client.
* @return The flush info used to build the poll response.
*/
@Override
protected FlushResult handleFlexClientPoll(FlexClient flexClient, CommandMessage pollCommand)
{
FlushResult flushResult = null;
if (canWait && !pollCommand.headerExists(CommandMessage.SUPPRESS_POLL_WAIT_HEADER))
{
FlexSession session = FlexContext.getFlexSession();
// If canWait is true it means we currently have less than the max number of allowed waiting threads.
// We need to protect writes/reads to the wait count with the endpoint's lock.
// Also, we have to be careful to handle the case where two threads get to this point when only
// one wait spot remains; one thread will win and the other needs to revert to a non-waitable poll.
boolean thisThreadCanWait;
synchronized (lock)
{
++waitingPollRequestsCount;
if (waitingPollRequestsCount == maxWaitingPollRequests)
{
thisThreadCanWait = true; // This thread got the last wait spot.
canWait = false;
}
else if (waitingPollRequestsCount > maxWaitingPollRequests)
{
thisThreadCanWait = false; // This thread was beaten out for the last spot.
--waitingPollRequestsCount; // Decrement the count because we're not going to try a poll with wait.
canWait = false; // All the wait spots are currently occupied so prevent further attempts for now.
}
else
{
// We haven't hit the limit yet, allow this thread to wait.
thisThreadCanWait = true;
}
}
// Check the max waiting connections per session count
if (thisThreadCanWait)
{
String userAgentValue = FlexContext.getHttpRequest().getHeader(UserAgentManager.USER_AGENT_HEADER_NAME);
UserAgentSettings agentSettings = userAgentManager.match(userAgentValue);
synchronized(session)
{
if (agentSettings != null)
session.maxConnectionsPerSession = agentSettings.getMaxPersistentConnectionsPerSession();
++session.streamingConnectionsCount;
if (session.maxConnectionsPerSession == FlexSession.MAX_CONNECTIONS_PER_SESSION_UNLIMITED
|| session.streamingConnectionsCount <= session.maxConnectionsPerSession)
{
thisThreadCanWait = true; // We haven't hit the limit yet, allow the wait.
}
else // (session.streamingConnectionsCount > session.maxConnectionsPerSession)
{
thisThreadCanWait = false; // no more from this client
--session.streamingConnectionsCount;
}
}
if (!thisThreadCanWait)
{
// Decrement the waiting poll count, since this poll isn't going to wait.
synchronized (lock)
{
--waitingPollRequestsCount;
if (waitingPollRequestsCount < maxWaitingPollRequests)
canWait = true;
}
if (Log.isDebug())
{
log.debug("Max long-polling requests per session limit (" + session.maxConnectionsPerSession + ") has been reached, this poll won't wait.");
}
}
}
if (thisThreadCanWait)
{
if (Log.isDebug())
log.debug("Number of waiting threads for endpoint with id '"+ getId() +"' is " + waitingPollRequestsCount + ".");
try
{
flushResult = flexClient.pollWithWait(getId(), FlexContext.getFlexSession(), this, waitInterval);
if (flushResult != null)
{
// Prevent busy-polling due to multiple clients sharing a session and swapping each other out too quickly.
if ((flushResult instanceof PollFlushResult) && ((PollFlushResult)flushResult).isAvoidBusyPolling() && (flushResult.getNextFlushWaitTimeMillis() < DEFAULT_WAIT_FOR_EXCESS_POLL_WAIT_CLIENTS))
{
// Force the client polling interval to match the default defined in the client PollingChannel.
flushResult.setNextFlushWaitTimeMillis(DEFAULT_WAIT_FOR_EXCESS_POLL_WAIT_CLIENTS);
}
else if ((clientWaitInterval > 0) && (flushResult.getNextFlushWaitTimeMillis() == 0))
{
// If the FlushResult doesn't specify it's own flush wait time, use the configured clientWaitInterval if defined.
flushResult.setNextFlushWaitTimeMillis(clientWaitInterval);
}
}
}
finally
{
// We're done waiting so decrement the count of waiting threads and update the canWait flag if necessary
synchronized (lock)
{
--waitingPollRequestsCount;
if (waitingPollRequestsCount < maxWaitingPollRequests)
canWait = true;
}
synchronized (session)
{
--session.streamingConnectionsCount;
}
if (Log.isDebug())
log.debug("Number of waiting threads for endpoint with id '"+ getId() +"' is " + waitingPollRequestsCount + ".");
}
}
}
else if (Log.isDebug() && waitEnabled)
{
if (pollCommand.headerExists(CommandMessage.SUPPRESS_POLL_WAIT_HEADER))
log.debug("Suppressing poll wait for this request because it is part of a batch of messages to process.");
else
log.debug("Max waiting poll requests limit '" + maxWaitingPollRequests + "' has been reached for endpoint '" + getId() + "'. FlexClient with id '"+ flexClient.getId() + "' will poll with no wait.");
}
// If we weren't able to do a poll with wait above for any reason just run the base poll handling logic.
if (flushResult == null)
{
flushResult = super.handleFlexClientPoll(flexClient, pollCommand);
// If this is an excess poll request that we couldn't wait on, make sure the client doesn't poll the endpoint too aggressively.
// In this case, force a client wait to match the default polling interval defined in the client PollingChannel.
if ( waitEnabled && (pollingIntervalMillis < DEFAULT_WAIT_FOR_EXCESS_POLL_WAIT_CLIENTS))
{
if (flushResult == null)
{
flushResult = new FlushResult();
}
flushResult.setNextFlushWaitTimeMillis(DEFAULT_WAIT_FOR_EXCESS_POLL_WAIT_CLIENTS);
}
}
return flushResult;
}
}