blob: 1422e26368cd402c5b0892c20121f0828ee8aaf0 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package flex.messaging.endpoints;
import flex.messaging.FlexContext;
import flex.messaging.FlexSession;
import flex.messaging.HttpFlexSession;
import flex.messaging.MessageException;
import flex.messaging.client.EndpointPushNotifier;
import flex.messaging.client.FlexClient;
import flex.messaging.client.FlushResult;
import flex.messaging.client.UserAgentSettings;
import flex.messaging.config.ConfigMap;
import flex.messaging.log.Log;
import flex.messaging.messages.AcknowledgeMessage;
import flex.messaging.messages.AsyncMessage;
import flex.messaging.messages.CommandMessage;
import flex.messaging.messages.Message;
import flex.messaging.messages.MessagePerformanceInfo;
import flex.messaging.messages.MessagePerformanceUtils;
import flex.messaging.util.TimeoutManager;
import flex.messaging.util.UserAgentManager;
import javax.servlet.ServletOutputStream;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import java.io.IOException;
import java.io.UnsupportedEncodingException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ThreadFactory;
/**
* Base class for HTTP-based endpoints that support streaming HTTP connections to
* connected clients.
* Each streaming connection managed by this endpoint consumes one of the request
* handler threads provided by the servlet container, so it is not highly scalable
* but offers performance advantages over client polling for clients receiving
* a steady, rapid stream of pushed messages.
* This endpoint does not support polling clients and will fault any poll requests
* that are received. To support polling clients use subclasses of
* BaseHTTPEndpoint instead.
*/
public abstract class BaseStreamingHTTPEndpoint extends BaseHTTPEndpoint {
//--------------------------------------------------------------------------
//
// Private Static Constants
//
//--------------------------------------------------------------------------
/**
* This token is used in chunked HTTP responses frequently so initialize it statically for general use.
*/
private static final byte[] CRLF_BYTES = {(byte) 13, (byte) 10};
/**
* This token is used for the terminal chunk within a chunked response.
*/
private static final byte ZERO_BYTE = (byte) 48;
/**
* This token is used to signal that a chunk of data should be skipped by the client.
*/
private static final byte NULL_BYTE = (byte) 0;
/**
* Parameter name for 'command' passed in a request for a new streaming connection.
*/
private static final String COMMAND_PARAM_NAME = "command";
/**
* This is the token at the end of the HTTP request line that indicates that it is
* a stream connection that should be held open to push data back to the client,
* as opposed to a regular request-response message.
*/
private static final String OPEN_COMMAND = "open";
/**
* This is the token at the end of the HTTP request line that indicates that it is
* a stream connection that should be closed.
*/
private static final String CLOSE_COMMAND = "close";
/**
* Parameter name for the stream ID; passed with commands for an existing streaming connection.
*/
private static final String STREAM_ID_PARAM_NAME = "streamId";
/**
* Constant for HTTP/1.0.
*/
private static final String HTTP_1_0 = "HTTP/1.0";
/**
* Thread name suffix for request threads that are servicing a pinned open streaming connection.
*/
private static final String STREAMING_THREAD_NAME_EXTENSION = "-in-streaming-mode";
/**
* Configuration constants.
*/
private static final String PROPERTY_CONNECTION_IDLE_TIMEOUT_MINUTES = "connection-idle-timeout-minutes";
private static final String PROPERTY_LEGACY_CONNECTION_IDLE_TIMEOUT_MINUTES = "idle-timeout-minutes";
private static final String MAX_STREAMING_CLIENTS = "max-streaming-clients";
private static final String SERVER_TO_CLIENT_HEARTBEAT_MILLIS = "server-to-client-heartbeat-millis";
private static final String PROPERTY_INVALIDATE_MESSAGECLIENT_ON_STREAMING_CLOSE = "invalidate-messageclient-on-streaming-close";
/**
* Defaults.
*/
private static final boolean DEFAULT_INVALIDATE_MESSAGECLIENT_ON_STREAMING_CLOSE = false;
private static final int DEFAULT_SERVER_TO_CLIENT_HEARTBEAT_MILLIS = 5000;
private static final int DEFAULT_MAX_STREAMING_CLIENTS = 10;
/**
* Errors.
*/
public static final String POLL_NOT_SUPPORTED_CODE = "Server.PollNotSupported";
public static final int POLL_NOT_SUPPORTED_MESSAGE = 10034;
//--------------------------------------------------------------------------
//
// Constructor
//
//--------------------------------------------------------------------------
/**
* Constructs an unmanaged <code>BaseStreamingHTTPEndpoint</code>.
*/
public BaseStreamingHTTPEndpoint() {
this(false);
}
/**
* Constructs an <code>BaseStreamingHTTPEndpoint</code> with the indicated management.
*
* @param enableManagement <code>true</code> if the <code>BaseStreamingHTTPEndpoint</code>
* is manageable; <code>false</code> otherwise.
*/
public BaseStreamingHTTPEndpoint(boolean enableManagement) {
super(enableManagement);
}
//--------------------------------------------------------------------------
//
// Initialize, validate, start, and stop methods.
//
//--------------------------------------------------------------------------
/**
* Initializes the <code>Endpoint</code> with the properties.
* If subclasses override this method, they must call <code>super.initialize()</code>.
*
* @param id The ID of the <code>Endpoint</code>.
* @param properties Properties for the <code>Endpoint</code>.
*/
@Override
public void initialize(String id, ConfigMap properties) {
super.initialize(id, properties);
if (properties == null || properties.size() == 0) {
// Initialize default user agent manager settings.
UserAgentManager.setupUserAgentManager(null, userAgentManager);
return; // Nothing else to initialize.
}
// The interval that the server will check if the client is still available.
serverToClientHeartbeatMillis = properties.getPropertyAsLong(SERVER_TO_CLIENT_HEARTBEAT_MILLIS, DEFAULT_SERVER_TO_CLIENT_HEARTBEAT_MILLIS);
setServerToClientHeartbeatMillis(serverToClientHeartbeatMillis);
setInvalidateMessageClientOnStreamingClose(properties.getPropertyAsBoolean(PROPERTY_INVALIDATE_MESSAGECLIENT_ON_STREAMING_CLOSE, DEFAULT_INVALIDATE_MESSAGECLIENT_ON_STREAMING_CLOSE));
// Number of minutes a client can remain idle before the server times the connection out.
int connectionIdleTimeoutMinutes = properties.getPropertyAsInt(PROPERTY_CONNECTION_IDLE_TIMEOUT_MINUTES, getConnectionIdleTimeoutMinutes());
if (connectionIdleTimeoutMinutes != 0) {
setConnectionIdleTimeoutMinutes(connectionIdleTimeoutMinutes);
} else {
connectionIdleTimeoutMinutes = properties.getPropertyAsInt(PROPERTY_LEGACY_CONNECTION_IDLE_TIMEOUT_MINUTES, getConnectionIdleTimeoutMinutes());
if (connectionIdleTimeoutMinutes != 0)
setConnectionIdleTimeoutMinutes(connectionIdleTimeoutMinutes);
}
// User-agent configuration for kick-start bytes and max streaming connections per session.
UserAgentManager.setupUserAgentManager(properties, userAgentManager);
// Maximum number of clients allowed to have streaming HTTP connections with the endpoint.
maxStreamingClients = properties.getPropertyAsInt(MAX_STREAMING_CLIENTS, DEFAULT_MAX_STREAMING_CLIENTS);
// Set initial state for the canWait flag based on whether we allow waits or not.
canStream = (maxStreamingClients > 0);
}
@Override
public void start() {
if (isStarted())
return;
super.start();
if (connectionIdleTimeoutMinutes > 0) {
pushNotifierTimeoutManager = new TimeoutManager(new ThreadFactory() {
int counter = 1;
public synchronized Thread newThread(Runnable runnable) {
Thread t = new Thread(runnable);
t.setName(getId() + "-StreamingConnectionTimeoutThread-" + counter++);
return t;
}
});
}
currentStreamingRequests = new ConcurrentHashMap<String, EndpointPushNotifier>();
}
/**
* (non-JavaDoc)
*
* @see flex.messaging.endpoints.AbstractEndpoint#stop()
*/
@Override
public void stop() {
if (!isStarted())
return;
// Shutdown the timeout manager for streaming connections cleanly.
if (pushNotifierTimeoutManager != null) {
pushNotifierTimeoutManager.shutdown();
pushNotifierTimeoutManager = null;
}
// Shutdown any currently open streaming connections.
for (EndpointPushNotifier notifier : currentStreamingRequests.values())
notifier.close();
currentStreamingRequests = null;
super.stop();
}
//--------------------------------------------------------------------------
//
// Variables
//
//--------------------------------------------------------------------------
/**
* Used to synchronize sets and gets to the number of streaming clients.
*/
protected final Object lock = new Object();
/**
* Used to keep track of the mapping between user agent match strings and
* the bytes needed to kickstart their streaming connections.
*/
protected UserAgentManager userAgentManager = new UserAgentManager();
/**
* This flag is volatile to allow for consistent reads across thread without
* needing to pay the cost for a synchronized lock for each read.
*/
private volatile boolean canStream = true;
/**
* Manages timing out EndpointPushNotifier instances.
*/
private volatile TimeoutManager pushNotifierTimeoutManager;
/**
* A Map(EndpointPushNotifier, Boolean.TRUE) containing all currently open streaming notifiers
* for this endpoint.
* Used for clean shutdown.
*/
private ConcurrentHashMap<String, EndpointPushNotifier> currentStreamingRequests;
//--------------------------------------------------------------------------
//
// Properties
//
//--------------------------------------------------------------------------
//------------------------------------------
// invalidateMessageClientOnStreamingClose
//-----------------------------------------
private volatile boolean invalidateMessageClientOnStreamingClose = DEFAULT_INVALIDATE_MESSAGECLIENT_ON_STREAMING_CLOSE;
/**
* Returns whether invalidate-messageclient-on-streaming-close is enabled.
* See {@link BaseStreamingHTTPEndpoint#setInvalidateMessageClientOnStreamingClose(boolean)}
* for details.
*
* @return <code>true</code> if the invalidate-messageclient-on-streaming-close is enabled, <code>false</code> otherwise.
*/
public boolean isInvalidateMessageClientOnStreamingClose() {
return invalidateMessageClientOnStreamingClose;
}
/**
* Sets the invalidate-messageclient-on-streaming close property. If enabled,
* when the streaming connection is closed for whatever reason (for example, the client is gone),
* the client's associated MessageClient on the server is invalidated immediately.
* This is useful in scenarios where there is a constant stream of messages, the client is gone,
* and the streaming connection is closed, but the session has not timed out on the server yet.
* In that case, enabling this property will prevent messages accumulating on the session on behalf
* of the MessageClient that will invalidate.
* <p>
* Important: Do not enable this property when reliable messaging is used, otherwise
* reliable reconnect attempts will not happen correctly.</p>
*
* @param value The property value.
*/
public void setInvalidateMessageClientOnStreamingClose(boolean value) {
invalidateMessageClientOnStreamingClose = value;
}
//----------------------------------
// serverToClientHeartbeatMillis
//----------------------------------
private long serverToClientHeartbeatMillis = DEFAULT_SERVER_TO_CLIENT_HEARTBEAT_MILLIS;
/**
* Retrieves the number of milliseconds the server will wait before writing a
* single <code>null</code> byte to the streaming connection, to ensure the client is
* still available.
*
* @return The server-to-client heartbeat time in milliseconds.
*/
public long getServerToClientHeartbeatMillis() {
return serverToClientHeartbeatMillis;
}
/**
* Retrieves the number of milliseconds the server will wait before writing a
* single <code>null</code> byte to the streaming connection to ensure the client is
* still available when there are no new messages for the client.
* A non-positive value means the server will wait forever for new messages and
* it will not write the <code>null</code> byte to determine if the client is available.
*
* @param serverToClientHeartbeatMillis The server-to-client heartbeat time in milliseconds.
*/
public void setServerToClientHeartbeatMillis(long serverToClientHeartbeatMillis) {
if (serverToClientHeartbeatMillis < 0)
serverToClientHeartbeatMillis = 0;
this.serverToClientHeartbeatMillis = serverToClientHeartbeatMillis;
}
//----------------------------------
// connectionIdleTimeoutMinutes
//----------------------------------
private int connectionIdleTimeoutMinutes = 0;
/**
* Retrieves the number of minutes a client can remain idle before the server
* times the connection out. The default value is 0, indicating that connections
* will not be timed out and must be closed by the client or server, either explicitly
* or by either process terminating.
*
* @return The number of minutes a client can remain idle before the server
* times the connection out.
*/
public int getConnectionIdleTimeoutMinutes() {
return connectionIdleTimeoutMinutes;
}
/**
* Sets the number of minutes a client can remain idle before the server
* times the connection out. A value of 0 or below indicates that
* connections will not be timed out.
*
* @param value The number of minutes a client can remain idle
* before the server times the connection out.
*/
public void setConnectionIdleTimeoutMinutes(int value) {
if (value < 0)
value = 0;
this.connectionIdleTimeoutMinutes = value;
}
/**
* (non-JavaDoc)
*
* @deprecated Use {@link BaseStreamingHTTPEndpoint#getConnectionIdleTimeoutMinutes()} instead.
*/
public int getIdleTimeoutMinutes() {
return getConnectionIdleTimeoutMinutes();
}
/**
* (non-JavaDoc)
*
* @deprecated Use {@link BaseStreamingHTTPEndpoint#setConnectionIdleTimeoutMinutes(int)} instead.
*/
public void setIdleTimeoutMinutes(int value) {
setConnectionIdleTimeoutMinutes(value);
}
//----------------------------------
// maxStreamingClients
//----------------------------------
private int maxStreamingClients = DEFAULT_MAX_STREAMING_CLIENTS;
/**
* Retrieves the maximum number of clients that will be allowed to establish
* a streaming HTTP connection with the endpoint.
*
* @return The maximum number of clients that will be allowed to establish
* a streaming HTTP connection with the endpoint.
*/
public int getMaxStreamingClients() {
return maxStreamingClients;
}
/**
* Sets the maximum number of clients that will be allowed to establish
* a streaming HTTP connection with the server.
*
* @param maxStreamingClients The maximum number of clients that will be allowed
* to establish a streaming HTTP connection with the server.
*/
public void setMaxStreamingClients(int maxStreamingClients) {
this.maxStreamingClients = maxStreamingClients;
canStream = (streamingClientsCount < maxStreamingClients);
}
//----------------------------------
// streamingClientsCount
//----------------------------------
protected int streamingClientsCount;
/**
* Retrieves the the number of clients that are currently in the streaming state.
*
* @return The number of clients that are currently in the streaming state.
*/
public int getStreamingClientsCount() {
return streamingClientsCount;
}
//--------------------------------------------------------------------------
//
// Public Methods
//
//--------------------------------------------------------------------------
/**
* Handles HTTP requests targetting this endpoint.
* Two types or requests are supported. If the request is a regular request-response AMF/AMFX
* message it is handled by the base logic in BaseHTTPEndpoint.service. However, if it is a
* request to open a streaming HTTP connection to the client this endpoint performs some
* validation checks and then holds the connection open to stream data back to the client
* over.
*
* @param req The original servlet request.
* @param res The active servlet response.
*/
@Override
public void service(HttpServletRequest req, HttpServletResponse res) {
String command = req.getParameter(COMMAND_PARAM_NAME);
if (command != null)
serviceStreamingRequest(req, res);
else // Let BaseHTTPEndpoint logic handle regular request-response messaging.
super.service(req, res);
}
//--------------------------------------------------------------------------
//
// Protected Methods
//
//--------------------------------------------------------------------------
/**
* If the message has MPI enabled, this method adds all the needed performance
* headers to the message.
*
* @param message Message to add performance headers to.
*/
protected void addPerformanceInfo(Message message) {
MessagePerformanceInfo mpiOriginal = getMPI(message);
if (mpiOriginal == null)
return;
MessagePerformanceInfo mpip = (MessagePerformanceInfo) mpiOriginal.clone();
try {
// Set the original message info as the pushed causer info.
MessagePerformanceUtils.setMPIP(message, mpip);
MessagePerformanceUtils.setMPII(message, null);
} catch (Exception e) {
if (Log.isDebug())
log.debug("MPI exception while streaming the message: " + e.toString());
}
// Overhead only used when MPI is enabled for sizing
MessagePerformanceInfo mpio = new MessagePerformanceInfo();
if (mpip.recordMessageTimes) {
mpio.sendTime = System.currentTimeMillis();
mpio.infoType = "OUT";
}
mpio.pushedFlag = true;
MessagePerformanceUtils.setMPIO(message, mpio);
// If MPI sizing information is enabled serialize again so that we know size
if (mpip.recordMessageSizes) {
try {
// Each subclass serializes the message in their own format to
// get the message size for the MPIO.
long serializationOverhead = System.currentTimeMillis();
mpio.messageSize = getMessageSizeForPerformanceInfo(message);
// Set serialization overhead to the time calculated during serialization above
if (mpip.recordMessageTimes) {
serializationOverhead = System.currentTimeMillis() - serializationOverhead;
mpip.addToOverhead(serializationOverhead);
mpiOriginal.addToOverhead(serializationOverhead);
mpio.sendTime = System.currentTimeMillis();
}
} catch (Exception e) {
log.debug("MPI exception while streaming the message: " + e.toString());
}
}
}
/**
* Utility method to convert streamed push messages to their small versions
* if the channel-endpoint combination supports small messages.
*
* @param message The regular message.
* @return The small message if the message has a small version, or regular message
* if it doesn't .
*/
protected Message convertPushMessageToSmall(Message message) {
FlexSession session = FlexContext.getFlexSession();
if (session != null && session.useSmallMessages())
return convertToSmallMessage(message);
return message;
}
/**
* Used internally for performance information gathering; not intended for
* public use. The default implementation of this method returns zero.
* Subclasses should overwrite if they want to accurately report message
* size information in performance information gathering.
*
* @param message Message to get the size for.
* @return The size of the message after message is serialized.
*/
protected long getMessageSizeForPerformanceInfo(Message message) {
return 0;
}
/**
* This streaming endpoint does not support polling clients.
*
* @param flexClient The FlexClient that issued the poll request.
* @param pollCommand The poll command from the client.
* @return The flush info used to build the poll response.
*/
@Override
protected FlushResult handleFlexClientPoll(FlexClient flexClient, CommandMessage pollCommand) {
MessageException me = new MessageException();
me.setMessage(POLL_NOT_SUPPORTED_MESSAGE);
me.setDetails(POLL_NOT_SUPPORTED_MESSAGE);
me.setCode(POLL_NOT_SUPPORTED_CODE);
throw me;
}
/**
* Handles streaming connection open command sent by the FlexClient.
*
* @param req The <code>HttpServletRequest</code> to service.
* @param res The <code>HttpServletResponse</code> to be used in case an error
* has to be sent back.
* @param flexClient FlexClient that requested the streaming connection.
*/
protected void handleFlexClientStreamingOpenRequest(HttpServletRequest req, HttpServletResponse res, FlexClient flexClient) {
FlexSession session = FlexContext.getFlexSession();
if (canStream && session.canStream) {
// If canStream/session.canStream is true it means we currently have
// less than the max number of allowed streaming threads, per endpoint/session.
// We need to protect writes/reads to the stream count with the endpoint's lock.
// Also, we have to be careful to handle the case where two threads get to this point when only
// one streaming spot remains; one thread will win and the other needs to fault.
boolean thisThreadCanStream;
synchronized (lock) {
++streamingClientsCount;
if (streamingClientsCount == maxStreamingClients) {
thisThreadCanStream = true; // This thread got the last spot.
canStream = false;
} else if (streamingClientsCount > maxStreamingClients) {
thisThreadCanStream = false; // This thread was beaten out for the last spot.
--streamingClientsCount; // Decrement the count because we're not going to grant the streaming right to the client.
} else {
// We haven't hit the limit yet, allow this thread to stream.
thisThreadCanStream = true;
}
}
// If the thread cannot wait due to endpoint streaming connection
// limit, inform the client and return.
if (!thisThreadCanStream) {
String errorMessage = "Endpoint with id '" + getId() + "' cannot grant streaming connection to FlexClient with id '"
+ flexClient.getId() + "' because " + MAX_STREAMING_CLIENTS + " limit of '"
+ maxStreamingClients + "' has been reached.";
if (Log.isError())
log.error(errorMessage);
try {
errorMessage = "Endpoint with id '" + getId() + "' cannot grant streaming connection to FlexClient with id '"
+ flexClient.getId() + "' because " + MAX_STREAMING_CLIENTS + " limit has been reached.";
res.sendError(HttpServletResponse.SC_SERVICE_UNAVAILABLE, errorMessage);
} catch (IOException ignore) {
}
return;
}
// Setup for specific user agents.
byte[] kickStartBytesToStream = null;
String userAgentValue = req.getHeader(UserAgentManager.USER_AGENT_HEADER_NAME);
UserAgentSettings agentSettings = userAgentManager.match(userAgentValue);
if (agentSettings != null) {
synchronized (session) {
session.maxConnectionsPerSession = agentSettings.getMaxPersistentConnectionsPerSession();
}
int kickStartBytes = agentSettings.getKickstartBytes();
if (kickStartBytes > 0) {
// Determine the minimum number of actual bytes that need to be sent to
// kickstart, taking into account transfer-encoding overhead.
try {
int chunkLengthHeaderSize = Integer.toHexString(kickStartBytes).getBytes("ASCII").length;
int chunkOverhead = chunkLengthHeaderSize + 4; // 4 for the 2 wrapping CRLF tokens.
int minimumKickstartBytes = kickStartBytes - chunkOverhead;
kickStartBytesToStream = new byte[(minimumKickstartBytes > 0) ? minimumKickstartBytes :
kickStartBytes];
} catch (UnsupportedEncodingException ignore) {
kickStartBytesToStream = new byte[kickStartBytes];
}
Arrays.fill(kickStartBytesToStream, NULL_BYTE);
}
}
// Now, check with the session before granting the streaming connection.
synchronized (session) {
++session.streamingConnectionsCount;
if (session.maxConnectionsPerSession == FlexSession.MAX_CONNECTIONS_PER_SESSION_UNLIMITED) {
thisThreadCanStream = true;
} else if (session.streamingConnectionsCount == session.maxConnectionsPerSession) {
thisThreadCanStream = true; // This thread got the last spot in the session.
session.canStream = false;
} else if (session.streamingConnectionsCount > session.maxConnectionsPerSession) {
thisThreadCanStream = false; // This thread was beaten out for the last spot.
--session.streamingConnectionsCount;
synchronized (lock) {
// Decrement the endpoint count because we're not going to grant the streaming right to the client.
--streamingClientsCount;
}
} else {
// We haven't hit the limit yet, allow this thread to stream.
thisThreadCanStream = true;
}
}
// If the thread cannot wait due to session streaming connection
// limit, inform the client and return.
if (!thisThreadCanStream) {
if (Log.isError())
log.error("Endpoint with id '" + getId() + "' cannot grant streaming connection to FlexClient with id '"
+ flexClient.getId() + "' because " + UserAgentManager.MAX_PERSISTENT_CONNECTIONS_PER_SESSION + " limit of '" + session.maxConnectionsPerSession
+ ((agentSettings != null) ? "' for user-agent '" + agentSettings.getMatchOn() + "'" : "") + " has been reached.");
try {
// Return an HTTP status code 400.
String errorMessage = "The server cannot grant streaming connection to this client because limit has been reached.";
res.sendError(HttpServletResponse.SC_SERVICE_UNAVAILABLE, errorMessage);
} catch (IOException ignore) {
// NOWARN
}
return;
}
Thread currentThread = Thread.currentThread();
String threadName = currentThread.getName();
EndpointPushNotifier notifier = null;
boolean suppressIOExceptionLogging = false; // Used to suppress logging for IO exception.
try {
currentThread.setName(threadName + STREAMING_THREAD_NAME_EXTENSION);
// Open and commit response headers and get output stream.
if (addNoCacheHeaders)
addNoCacheHeaders(req, res);
res.setContentType(getResponseContentType());
res.setHeader("Transfer-Encoding", "chunked");
res.setHeader("Connection", "close");
ServletOutputStream os = res.getOutputStream();
res.flushBuffer();
// If kickstart-bytes are specified, stream them.
if (kickStartBytesToStream != null) {
if (Log.isDebug())
log.debug("Endpoint with id '" + getId() + "' is streaming " + kickStartBytesToStream.length
+ " bytes (not counting chunk encoding overhead) to kick-start the streaming connection for FlexClient with id '"
+ flexClient.getId() + "'.");
streamChunk(kickStartBytesToStream, os, res);
}
// Setup serialization and type marshalling contexts
setThreadLocals();
// Activate streaming helper for this connection.
// Watch out for duplicate stream issues.
try {
notifier = new EndpointPushNotifier(this, flexClient);
} catch (MessageException me) {
if (me.getNumber() == 10033) // It's a duplicate stream request from the same FlexClient. Leave the current stream in place and fault this.
{
if (Log.isWarn())
log.warn("Endpoint with id '" + getId() + "' received a duplicate streaming connection request from, FlexClient with id '"
+ flexClient.getId() + "'. Faulting request.");
// Rollback counters and send an error response.
synchronized (lock) {
--streamingClientsCount;
canStream = (streamingClientsCount < maxStreamingClients);
synchronized (session) {
--session.streamingConnectionsCount;
session.canStream = (session.maxConnectionsPerSession == FlexSession.MAX_CONNECTIONS_PER_SESSION_UNLIMITED
|| session.streamingConnectionsCount < session.maxConnectionsPerSession);
}
}
try {
res.sendError(HttpServletResponse.SC_BAD_REQUEST);
} catch (IOException ignore) {
// NOWARN
}
return; // Exit early.
}
}
if (connectionIdleTimeoutMinutes > 0)
notifier.setIdleTimeoutMinutes(connectionIdleTimeoutMinutes);
notifier.setLogCategory(getLogCategory());
monitorTimeout(notifier);
currentStreamingRequests.put(notifier.getNotifierId(), notifier);
// Push down an acknowledgement for the 'connect' request containing the unique id for this specific stream.
AcknowledgeMessage connectAck = new AcknowledgeMessage();
connectAck.setBody(notifier.getNotifierId());
connectAck.setCorrelationId(BaseStreamingHTTPEndpoint.OPEN_COMMAND);
ArrayList toPush = new ArrayList(1);
toPush.add(connectAck);
streamMessages(toPush, os, res);
// Output session level streaming count.
if (Log.isDebug())
Log.getLogger(FlexSession.FLEX_SESSION_LOG_CATEGORY).info("Number of streaming clients for FlexSession with id '" + session.getId() + "' is " + session.streamingConnectionsCount + ".");
// Output endpoint level streaming count.
if (Log.isDebug())
log.debug("Number of streaming clients for endpoint with id '" + getId() + "' is " + streamingClientsCount + ".");
// And cycle in a wait-notify loop with the aid of the helper until it
// is closed, we're interrupted or the act of streaming data to the client fails.
while (!notifier.isClosed()) {
try {
// Drain any messages that might have been accumulated
// while the previous drain was being processed.
List<AsyncMessage> messages = null;
synchronized (notifier.pushNeeded) {
messages = notifier.drainMessages();
}
streamMessages(messages, os, res);
synchronized (notifier.pushNeeded) {
notifier.pushNeeded.wait(serverToClientHeartbeatMillis);
messages = notifier.drainMessages();
}
// If there are no messages to send to the client, send an null
// byte as a heartbeat to make sure the client is still valid.
if (messages == null && serverToClientHeartbeatMillis > 0) {
try {
os.write(NULL_BYTE);
res.flushBuffer();
} catch (IOException e) {
if (Log.isWarn())
log.warn("Endpoint with id '" + getId() + "' is closing the streaming connection to FlexClient with id '"
+ flexClient.getId() + "' because endpoint encountered a socket write error" +
", possibly due to an unresponsive FlexClient.", e);
break; // Exit the wait loop.
}
}
// Otherwise stream the messages to the client.
else {
// Update the last time notifier was used to drain messages.
// Important for idle timeout detection.
notifier.updateLastUse();
streamMessages(messages, os, res);
}
} catch (InterruptedException e) {
if (Log.isWarn())
log.warn("Streaming thread '" + threadName + "' for endpoint with id '" + getId() + "' has been interrupted and the streaming connection will be closed.");
os.close();
break; // Exit the wait loop.
}
// Update the FlexClient last use time to prevent FlexClient from
// timing out when the client is still subscribed. It is important
// to do this outside synchronized(notifier.pushNeeded) to avoid
// thread deadlock!
flexClient.updateLastUse();
}
if (Log.isDebug())
log.debug("Streaming thread '" + threadName + "' for endpoint with id '" + getId() + "' is releasing connection and returning to the request handler pool.");
suppressIOExceptionLogging = true;
// Terminate the response.
streamChunk(null, os, res);
} catch (IOException e) {
if (Log.isWarn() && !suppressIOExceptionLogging)
log.warn("Streaming thread '" + threadName + "' for endpoint with id '" + getId() + "' is closing connection due to an IO error.", e);
} finally {
currentThread.setName(threadName);
// We're done so decrement the counts for streaming threads,
// and update the canStream flag if necessary.
synchronized (lock) {
--streamingClientsCount;
canStream = (streamingClientsCount < maxStreamingClients);
synchronized (session) {
--session.streamingConnectionsCount;
session.canStream = (session.maxConnectionsPerSession == FlexSession.MAX_CONNECTIONS_PER_SESSION_UNLIMITED
|| session.streamingConnectionsCount < session.maxConnectionsPerSession);
}
}
if (notifier != null && currentStreamingRequests != null) {
currentStreamingRequests.remove(notifier.getNotifierId());
notifier.close();
}
// Output session level streaming count.
if (Log.isDebug())
Log.getLogger(FlexSession.FLEX_SESSION_LOG_CATEGORY).info("Number of streaming clients for FlexSession with id '" + session.getId() + "' is " + session.streamingConnectionsCount + ".");
// Output endpoint level streaming count.
if (Log.isDebug())
log.debug("Number of streaming clients for endpoint with id '" + getId() + "' is " + streamingClientsCount + ".");
}
}
// Otherwise, client's streaming connection open request could not be granted.
else {
if (Log.isError()) {
String logString = null;
if (!canStream) {
logString = "Endpoint with id '" + getId() + "' cannot grant streaming connection to FlexClient with id '"
+ flexClient.getId() + "' because " + MAX_STREAMING_CLIENTS + " limit of '"
+ maxStreamingClients + "' has been reached.";
} else if (!session.canStream) {
logString = "Endpoint with id '" + getId() + "' cannot grant streaming connection to FlexClient with id '"
+ flexClient.getId() + "' because " + UserAgentManager.MAX_STREAMING_CONNECTIONS_PER_SESSION + " limit of '"
+ session.maxConnectionsPerSession + "' has been reached.";
}
if (logString != null)
log.error(logString);
}
try {
// Return an HTTP status code 400 to indicate that client request can't be processed.
String errorMessage = null;
if (!canStream) {
errorMessage = "Endpoint with id '" + getId() + "' cannot grant streaming connection to FlexClient with id '"
+ flexClient.getId() + "' because " + MAX_STREAMING_CLIENTS + " limit has been reached.";
} else if (!session.canStream) {
errorMessage = "Endpoint with id '" + getId() + "' cannot grant streaming connection to FlexClient with id '"
+ flexClient.getId() + "' because " + UserAgentManager.MAX_STREAMING_CONNECTIONS_PER_SESSION + " limit has been reached.";
}
res.sendError(HttpServletResponse.SC_SERVICE_UNAVAILABLE, errorMessage);
} catch (IOException ignore) {
}
}
}
/**
* Handles streaming connection close command sent by the FlexClient.
*
* @param req The <code>HttpServletRequest</code> to service.
* @param res The <code>HttpServletResponse</code> to be used in case an error
* has to be sent back.
* @param flexClient FlexClient that requested the streaming connection.
* @param streamId The id for the stream to close.
*/
protected void handleFlexClientStreamingCloseRequest(HttpServletRequest req, HttpServletResponse res, FlexClient flexClient, String streamId) {
if (streamId != null) {
EndpointPushNotifier notifier = (EndpointPushNotifier) flexClient.getEndpointPushHandler(getId());
if ((notifier != null) && notifier.getNotifierId().equals(streamId))
notifier.close();
}
}
/**
* Service streaming connection commands.
*
* @param req The <code>HttpServletRequest</code> to service.
* @param res The <code>HttpServletResponse</code> to be used in case an error
* has to be sent back.
*/
protected void serviceStreamingRequest(HttpServletRequest req, HttpServletResponse res) {
// If this is a request for a streaming connection, make sure it's for a valid FlexClient
// and that the FlexSession doesn't already have a streaming connection open.
// Streaming requests are POSTs (to help prevent the possibility of caching) that carry the
// following parameters:
// command - Indicating a custom command for the endpoint; currently 'open' to request a new
// streaming connection be opened, and 'close' to request the streaming connection
// to close.
// version - Indicates the streaming connection 'version' to use; it's here for backward comp. support
// if we need to change how commands are handled in a future product release.
// DSId - The FlexClient id value that uniquely identifies the swf making the request.
String command = req.getParameter(COMMAND_PARAM_NAME);
// Only HTTP 1.1 is supported, disallow HTTP 1.0.
if (req.getProtocol().equals(HTTP_1_0)) {
if (Log.isError())
log.error("Endpoint with id '" + getId() + "' cannot service the streaming request made with " +
" HTTP 1.0. Only HTTP 1.1 is supported.");
try {
// Return an HTTP status code 400 to indicate that the client's request was syntactically invalid (bad command).
res.sendError(HttpServletResponse.SC_BAD_REQUEST);
} catch (IOException ignore) {
}
return; // Abort further server processing.
}
if (!(command.equals(OPEN_COMMAND) || command.equals(CLOSE_COMMAND))) {
if (Log.isError())
log.error("Endpoint with id '" + getId() + "' cannot service the streaming request as the supplied command '"
+ command + "' is invalid.");
try {
// Return an HTTP status code 400 to indicate that the client's request was syntactically invalid (bad command).
res.sendError(HttpServletResponse.SC_BAD_REQUEST);
} catch (IOException ignore) {
}
return; // Abort further server processing.
}
String flexClientId = req.getParameter(Message.FLEX_CLIENT_ID_HEADER);
if (flexClientId == null) {
if (Log.isError())
log.error("Endpoint with id '" + getId() + "' cannot service the streaming request as no FlexClient id"
+ " has been supplied in the request.");
try {
// Return an HTTP status code 400 to indicate that the client's request was syntactically invalid (missing id).
res.sendError(HttpServletResponse.SC_BAD_REQUEST);
} catch (IOException ignore) {
}
return; // Abort further server processing.
}
// Validate that the provided FlexClient id exists and is associated with the current session.
// We don't do this validation with CLOSE_COMMAND because CLOSE_COMMAND can come in on a
// different session. For example, when the session expires due to timeout, the streaming client
// using that session sends a CLOSE_COMMAND on a new session to let the server know to clean client's
// corresponding server constructs. In that case, server already knows that session has expired so
// we can simply omit this validation.
FlexClient flexClient = null;
List<FlexClient> flexClients = FlexContext.getFlexSession().getFlexClients();
boolean validFlexClientId = false;
for (Iterator<FlexClient> iter = flexClients.iterator(); iter.hasNext(); ) {
flexClient = iter.next();
if (flexClient.getId().equals(flexClientId) && flexClient.isValid()) {
validFlexClientId = true;
break;
}
}
if (!command.equals(CLOSE_COMMAND) && !validFlexClientId) {
if (Log.isError())
log.error("Endpoint with id '" + getId() + "' cannot service the streaming request as either the supplied"
+ " FlexClient id '" + flexClientId + " is not valid, or the FlexClient with that id is not valid.");
try {
// Return an HTTP status code 400 to indicate that the client's request was syntactically invalid (invalid id).
res.sendError(HttpServletResponse.SC_BAD_REQUEST);
} catch (IOException ignore) {
}
return; // Abort further server processing.
}
// If a close command is received and we don't have any flex clients registered simply invalidate
// the Flex Session. This will take care of the Flex Session that got created when the MB servlet
// was processing the CLOSE request.
if (command.equals(CLOSE_COMMAND) && flexClients.size() == 0) {
FlexSession flexSession = FlexContext.getFlexSession();
if (flexSession instanceof HttpFlexSession) {
((HttpFlexSession) flexSession).invalidate(false);
}
return;
}
if (flexClient != null) {
if (command.equals(OPEN_COMMAND))
handleFlexClientStreamingOpenRequest(req, res, flexClient);
else if (command.equals(CLOSE_COMMAND))
handleFlexClientStreamingCloseRequest(req, res, flexClient, req.getParameter(STREAM_ID_PARAM_NAME));
}
}
/**
* Helper method to write a chunk of bytes to the output stream in an HTTP
* "Transfer-Encoding: chunked" format.
* If the bytes array is null or empty, a terminal chunk will be written to
* signal the end of the response.
* Once the chunk is written to the output stream, the stream will be flushed immediately (no buffering).
*
* @param bytes The array of bytes to write as a chunk in the response; or if null, the signal to write the final chunk to complete the response.
* @param os The output stream the chunk will be written to.
* @param response The HttpServletResponse, used to flush the chunk to the client.
* @throws IOException if writing the chunk to the output stream fails.
*/
protected void streamChunk(byte[] bytes, ServletOutputStream os, HttpServletResponse response) throws IOException {
if ((bytes != null) && (bytes.length > 0)) {
byte[] chunkLength = Integer.toHexString(bytes.length).getBytes("ASCII");
os.write(chunkLength);
os.write(CRLF_BYTES);
os.write(bytes);
os.write(CRLF_BYTES);
response.flushBuffer();
} else // Send final 'EOF' chunk for the response.
{
os.write(ZERO_BYTE);
os.write(CRLF_BYTES);
response.flushBuffer();
}
}
/**
* Helper method invoked by the endpoint request handler thread cycling in wait-notify.
* Serializes messages and streams each to the client as a response chunk using streamChunk().
*
* @param messages The messages to serialize and push to the client.
* @param os The output stream the chunk will be written to.
* @param response The HttpServletResponse, used to flush the chunk to the client.
*/
protected abstract void streamMessages(List messages, ServletOutputStream os, HttpServletResponse response) throws IOException;
/**
* Given a message, returns the MessagePerformanceInfo object if the message
* performance gathering is enabled, returns null otherwise.
*
* @param message The message.
* @return MessagePerformanceInfo if the message performance gathering is enabled,
* null otherwise.
*/
protected MessagePerformanceInfo getMPI(Message message) {
return (isRecordMessageSizes() || isRecordMessageTimes()) ?
MessagePerformanceUtils.getMPII(message) : null;
}
//--------------------------------------------------------------------------
//
// Private methods.
//
//--------------------------------------------------------------------------
/**
* Utility method used at EndpointPushNotifier construction to monitor it for timeout.
*
* @param notifier The EndpointPushNotifier to monitor.
*/
private void monitorTimeout(EndpointPushNotifier notifier) {
if (pushNotifierTimeoutManager != null)
pushNotifierTimeoutManager.scheduleTimeout(notifier);
}
}