blob: 38ce5c54d817e42ffaa9314a32d5c9e63283f93e [file] [log] [blame]
////////////////////////////////////////////////////////////////////////////////
//
// Licensed to the Apache Software Foundation (ASF) under one or more
// contributor license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright ownership.
// The ASF licenses this file to You under the Apache License, Version 2.0
// (the "License"); you may not use this file except in compliance with
// the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
////////////////////////////////////////////////////////////////////////////////
package mx.messaging.channels
{
import flash.events.Event;
import flash.events.EventDispatcher;
import flash.events.HTTPStatusEvent;
import flash.events.IOErrorEvent;
import flash.events.ProgressEvent;
import flash.events.SecurityErrorEvent;
import flash.events.StatusEvent;
import flash.net.ObjectEncoding;
import flash.net.URLRequest;
import flash.net.URLRequestMethod;
import flash.net.URLStream;
import flash.net.URLVariables;
import flash.utils.ByteArray;
import mx.core.mx_internal;
import mx.logging.ILogger;
import mx.logging.Log;
import mx.messaging.Channel;
import mx.messaging.FlexClient;
import mx.messaging.events.MessageEvent;
import mx.messaging.messages.AbstractMessage;
import mx.messaging.messages.AcknowledgeMessage;
import mx.messaging.messages.CommandMessage;
import mx.messaging.messages.IMessage;
use namespace mx_internal;
/**
* Dispatched when the StreamingConnectionHandler receives a status command from the server.
*
* @eventType flash.events.StatusEvent
*
* @langversion 3.0
* @playerversion Flash 9
* @playerversion AIR 1.1
* @productversion BlazeDS 4
* @productversion LCDS 3
*/
[Event(name="status", type="flash.events.StatusEvent")]
/**
* A helper class that is used by the streaming channels to open an internal
* HTTP connection to the server that is held open to allow the server to
* stream data down to the client with no poll overhead.
*
* @langversion 3.0
* @playerversion Flash 9
* @playerversion AIR 1.1
* @productversion BlazeDS 4
* @productversion LCDS 3
*/
public class StreamingConnectionHandler extends EventDispatcher
{
//--------------------------------------------------------------------------
//
// Public Static Constants
//
//--------------------------------------------------------------------------
/**
* The code for the StatusEvent dispatched by this handler when a disconnect
* command is received from the server.
*
* @langversion 3.0
* @playerversion Flash 9
* @playerversion AIR 1.1
* @productversion BlazeDS 4
* @productversion LCDS 3
*/
public static const DISCONNECT_CODE:String = "disconnect";
//--------------------------------------------------------------------------
//
// Private Static Constants
//
//--------------------------------------------------------------------------
/**
* Parameter name for the command passed in the request for a new streaming connection.
*
* @langversion 3.0
* @playerversion Flash 9
* @playerversion AIR 1.1
* @productversion BlazeDS 4
* @productversion LCDS 3
*/
private static const COMMAND_PARAM_NAME:String = "command";
/**
* A request to open a streaming connection passes this 'command' in the request URI to the
* remote endpoint.
*
* @langversion 3.0
* @playerversion Flash 9
* @playerversion AIR 1.1
* @productversion BlazeDS 4
* @productversion LCDS 3
*/
private static const OPEN_COMMAND:String = "open";
/**
* A request to close a streaming connection passes this 'command' in the request URI to the
* remote endpoint.
*
* @langversion 3.0
* @playerversion Flash 9
* @playerversion AIR 1.1
* @productversion BlazeDS 4
* @productversion LCDS 3
*/
private static const CLOSE_COMMAND:String = "close";
/**
* Parameter name for the stream id; passed with commands for an existing streaming connection.
*
* @langversion 3.0
* @playerversion Flash 9
* @playerversion AIR 1.1
* @productversion BlazeDS 4
* @productversion LCDS 3
*/
private static const STREAM_ID_PARAM_NAME:String = "streamId";
/**
* Parameter name for the version param passed in the request for a new streaming connection.
*
* @langversion 3.0
* @playerversion Flash 9
* @playerversion AIR 1.1
* @productversion BlazeDS 4
* @productversion LCDS 3
*/
private static const VERSION_PARAM_NAME:String = "version";
/**
* Indicates the stream version used for this channel's stream connection.
* Currently just version 1. If the protocol over the wire needs to change in the future
* this gives us a way to indicate the change.
*
* @langversion 3.0
* @playerversion Flash 9
* @playerversion AIR 1.1
* @productversion BlazeDS 4
* @productversion LCDS 3
*/
private static const VERSION_1:String = "1";
// Constants for bytes used in parsing response chunks.
private static const CR_BYTE:int = 13;
private static const LF_BYTE:int = 10;
private static const NULL_BYTE:int = 0;
// Parse states; streamProgressHandler() uses these states to parse incoming HTTP response chunks.
private static const INIT_STATE:int = 0;
private static const CR_STATE:int = 1;
private static const LF_STATE:int = 2;
private static const SIZE_STATE:int = 3;
private static const TEST_CHUNK_STATE:int = 4;
private static const SKIP_STATE:int = 5;
private static const DATA_STATE:int = 6;
private static const RESET_BUFFER_STATE:int = 7;
private static const CLOSED_STATE:int = 8;
//--------------------------------------------------------------------------
//
// Constructor
//
//--------------------------------------------------------------------------
/**
* Constructor.
*
* @param channel The Channel that uses this class.
* @param log Reference to the logger for the associated Channel.
*
* @langversion 3.0
* @playerversion Flash 9
* @playerversion AIR 1.1
* @productversion BlazeDS 4
* @productversion LCDS 3
*/
public function StreamingConnectionHandler(channel:Channel, log:ILogger)
{
super();
this.channel = channel;
this._log = log;
}
//--------------------------------------------------------------------------
//
// Variables
//
//--------------------------------------------------------------------------
/**
* The Channel that uses this class.
*
* @langversion 3.0
* @playerversion Flash 9
* @playerversion AIR 1.1
* @productversion BlazeDS 4
* @productversion LCDS 3
*/
protected var channel:Channel;
/**
* Byte buffer used to store the current chunk from the remote endpoint.
* Once a full chunk has been buffered, a message instance encoded in binary
* AMF format can be read from the chunk and dispatched.
*
* @langversion 3.0
* @playerversion Flash 9
* @playerversion AIR 1.1
* @productversion BlazeDS 4
* @productversion LCDS 3
*/
protected var chunkBuffer:ByteArray;
/**
* Counter that keeps track of how many data bytes remain to be read for the current chunk.
* A sentinal value of -1 indicates an initial state (either waiting for the first chunk or
* just finished parsing the previous chunk).
*
* @langversion 3.0
* @playerversion Flash 9
* @playerversion AIR 1.1
* @productversion BlazeDS 4
* @productversion LCDS 3
*/
protected var dataBytesToRead:int = -1;
/**
* Index into the chunk buffer pointing to the first byte of chunk data.
*
* @langversion 3.0
* @playerversion Flash 9
* @playerversion AIR 1.1
* @productversion BlazeDS 4
* @productversion LCDS 3
*/
protected var dataOffset:int;
/**
* @private
* Reference to the logger for the associated Channel.
*/
protected var _log:ILogger;
/**
* @private
* The server-assigned id for the streaming connection.
*/
protected var streamId:String;
/**
* @private
* Optional token to append to request URLs; generally contains a session token.
*/
private var _appendToURL:String;
/**
* Storage for the hex-format chunk size value from the byte stream.
*
* @langversion 3.0
* @playerversion Flash 9
* @playerversion AIR 1.1
* @productversion BlazeDS 4
* @productversion LCDS 3
*/
private var hexChunkSize:String;
/**
* Current parse state on the streaming connection.
*
* @langversion 3.0
* @playerversion Flash 9
* @playerversion AIR 1.1
* @productversion BlazeDS 4
* @productversion LCDS 3
*/
private var state:int = INIT_STATE;
/**
* URLStream used to open a streaming connection from the server to
* the client over HTTP.
*
* @langversion 3.0
* @playerversion Flash 9
* @playerversion AIR 1.1
* @productversion BlazeDS 4
* @productversion LCDS 3
*/
private var streamingConnection:URLStream;
/**
* URLStream used to close the original streaming connection opened from
* the server to the client over HTTP.
*
* @langversion 3.0
* @playerversion Flash 9
* @playerversion AIR 1.1
* @productversion BlazeDS 4
* @productversion LCDS 3
*/
private var streamingConnectionCloser:URLStream;
//--------------------------------------------------------------------------
//
// Public Methods
//
//--------------------------------------------------------------------------
/**
* Used by the streaming channels to set up the streaming connection if
* necessary and issue the open request to the server.
*
* @param appendToURL The string to append such as session id to the endpoint
* url while making the streaming connection request.
*
* @langversion 3.0
* @playerversion Flash 9
* @playerversion AIR 1.1
* @productversion BlazeDS 4
* @productversion LCDS 3
*/
public function openStreamingConnection(appendToURL:String=null):void
{
state = INIT_STATE;
_appendToURL = appendToURL;
// Construct the streaming connection if needed.
if (streamingConnection == null)
{
streamingConnection = new URLStream();
streamingConnection.addEventListener(Event.OPEN, streamOpenHandler);
streamingConnection.addEventListener(ProgressEvent.PROGRESS, streamProgressHandler);
streamingConnection.addEventListener(Event.COMPLETE, streamCompleteHandler);
streamingConnection.addEventListener(HTTPStatusEvent.HTTP_STATUS, streamHttpStatusHandler);
streamingConnection.addEventListener(IOErrorEvent.IO_ERROR, streamIoErrorHandler);
streamingConnection.addEventListener(SecurityErrorEvent.SECURITY_ERROR, streamSecurityErrorHandler);
}
// Open the streaming connection, only if not already requested to open.
if (!streamingConnection.connected)
{
var request:URLRequest = new URLRequest();
var url:String = channel.endpoint;
if (url.charAt(url.length - 1) == '/')
url = url.substring(0, url.length -1);
if (_appendToURL != null)
url += _appendToURL;
request.url = url + "?" + COMMAND_PARAM_NAME + "=" + OPEN_COMMAND + "&" + VERSION_PARAM_NAME + "=" + VERSION_1;
request.method = URLRequestMethod.POST;
var postParams:URLVariables = new URLVariables();
postParams[AbstractMessage.FLEX_CLIENT_ID_HEADER] = FlexClient.getInstance().id
request.data = postParams;
streamingConnection.load(request);
}
}
/**
* Used by the streaming channels to shut down the streaming connection.
*
* @langversion 3.0
* @playerversion Flash 9
* @playerversion AIR 1.1
* @productversion BlazeDS 4
* @productversion LCDS 3
*/
public function closeStreamingConnection():void
{
state = CLOSED_STATE;
chunkBuffer = null;
hexChunkSize = null;
dataBytesToRead = -1;
dataOffset = 0;
// First, close the existing connection.
if (streamingConnection != null)
{
if (streamingConnection.connected)
{
try
{
streamingConnection.close();
}
catch(ignore:Error)
{
}
}
}
// Then, let the server know that streaming connection can be cleaned up.
if (streamId != null)
{
if (streamingConnectionCloser == null)
{
var process:Function = function(event:Event):void
{
if (streamingConnectionCloser.connected)
{
try
{
streamId = null;
streamingConnectionCloser.close();
}
catch (ignore:Error)
{
}
}
}
var ignore:Function = function(event:Event):void
{
// Ignore.
}
streamingConnectionCloser = new URLStream();
streamingConnectionCloser.addEventListener(Event.COMPLETE, process);
streamingConnectionCloser.addEventListener(IOErrorEvent.IO_ERROR, process);
// Ignore the following events.
streamingConnectionCloser.addEventListener(HTTPStatusEvent.HTTP_STATUS, ignore);
streamingConnectionCloser.addEventListener(SecurityErrorEvent.SECURITY_ERROR, ignore);
}
// Request the streaming connection close, only if not already requested to close.
if (!streamingConnectionCloser.connected)
{
var request:URLRequest = new URLRequest();
var url:String = channel.endpoint;
if (_appendToURL != null)
url += _appendToURL;
request.url = url + "?" + COMMAND_PARAM_NAME + "=" + CLOSE_COMMAND + "&"
+ STREAM_ID_PARAM_NAME + "=" + streamId + "&" + VERSION_PARAM_NAME + "=" + VERSION_1;
request.method = URLRequestMethod.POST;
var postParams:URLVariables = new URLVariables();
postParams[AbstractMessage.FLEX_CLIENT_ID_HEADER] = FlexClient.getInstance().id
request.data = postParams;
streamingConnectionCloser.load(request);
}
}
}
//--------------------------------------------------------------------------
//
// Protected Methods
//
//--------------------------------------------------------------------------
/**
* Used by the streamProgressHandler to read a message. Default implementation
* returns null and subclasses must override this method.
*
* @return Returns the message that was read.
*
* @langversion 3.0
* @playerversion Flash 9
* @playerversion AIR 1.1
* @productversion BlazeDS 4
* @productversion LCDS 3
*/
protected function readMessage():IMessage
{
return null;
}
//--------------------------------------------------------------------------
//
// Private Methods
//
//--------------------------------------------------------------------------
/**
/**
* Handles a complete event that indicates that the streaming connection
* has been closed by the server by re-dispatching the event for the channel.
*
* @param event The COMPLETE Event.
*
* @langversion 3.0
* @playerversion Flash 9
* @playerversion AIR 1.1
* @productversion BlazeDS 4
* @productversion LCDS 3
*/
private function streamCompleteHandler(event:Event):void
{
dispatchEvent(event);
}
/**
* Handles HTTP status events dispatched by the streaming connection by
* re-dispatching the event for the channel.
*
* @param event The HTTPStatusEvent.
*
* @langversion 3.0
* @playerversion Flash 9
* @playerversion AIR 1.1
* @productversion BlazeDS 4
* @productversion LCDS 3
*/
private function streamHttpStatusHandler(event:HTTPStatusEvent):void
{
dispatchEvent(event);
}
/**
* Handles IO error events dispatched by the streaming connection by
* re-dispatching the event for the channel.
*
* @param event The IOErrorEvent.
*
* @langversion 3.0
* @playerversion Flash 9
* @playerversion AIR 1.1
* @productversion BlazeDS 4
* @productversion LCDS 3
*/
private function streamIoErrorHandler(event:IOErrorEvent):void
{
dispatchEvent(event);
}
/**
* The open event is dispatched when the streaming connection has been established
* to the server, but it may still fail or be rejected so ignore this event and do
* not advance the channel to a connected state yet.
*
* @param event The OPEN Event.
*
* @langversion 3.0
* @playerversion Flash 9
* @playerversion AIR 1.1
* @productversion BlazeDS 4
* @productversion LCDS 3
*/
private function streamOpenHandler(event:Event):void
{
// Ignore.
}
/**
* The arrival of data from the remote endpoint triggers progress events.
* The format of the data stream is HTTP Transfer-Encoding chunked, and each chunk contains either an object
* encoded in binary AMF format or a block of bytes to read off the network but skip any processing of.
*
* @param event The ProgressEvent.
*
* @langversion 3.0
* @playerversion Flash 9
* @playerversion AIR 1.1
* @productversion BlazeDS 4
* @productversion LCDS 3
*/
private function streamProgressHandler(event:ProgressEvent):void
{
if (chunkBuffer == null)
{
chunkBuffer = new ByteArray();
chunkBuffer.objectEncoding = ObjectEncoding.AMF3;
}
var n:int = streamingConnection.bytesAvailable;
if (n > 0)
{
// Move available bytes to chunk buffer.
streamingConnection.readBytes(chunkBuffer, chunkBuffer.length, n);
// Parse chunk buffer. Currently no validation is done of the chunk format.
// There's little need for it with our known server but validation could be added.
var value:int; // Storage for current byte read.
while (chunkBuffer.bytesAvailable > 0)
{
if (state == INIT_STATE)
{
value = chunkBuffer.readByte();
if (value == NULL_BYTE) // Ignore the null heartbeat byte.
continue;
dataBytesToRead = -1;
hexChunkSize = "";
chunkBuffer.position--; // Push back byte read.
state = CR_STATE;
}
if (state == CR_STATE)
{
value = chunkBuffer.readByte();
// This state skips past CRLF pairs but may also be hit at the start of a stream
// in which case we need to advance to the size state to finish reading the size header.
if (value == CR_BYTE)
{
state = LF_STATE;
}
else
{
// We seem to get into this state when a high number of messages
// are being streamed in a very short amount of time.
chunkBuffer.position--; // Push back byte read.
value = chunkBuffer.readByte();
if (value == LF_BYTE)
{
state = (dataBytesToRead == -1) ? SIZE_STATE : // Parsing a new chunk, read size next.
TEST_CHUNK_STATE; // Done with the size, now read the data.
}
else
{
chunkBuffer.position--; // Push back byte read.
state = SIZE_STATE;
}
}
if (chunkBuffer.bytesAvailable == 0)
break;
}
if (state == LF_STATE)
{
value = chunkBuffer.readByte();
if (value == LF_BYTE)
state = (dataBytesToRead == -1) ? SIZE_STATE : // Parsing a new chunk, read size next.
TEST_CHUNK_STATE; // Done with the size, now read the data.
if (chunkBuffer.bytesAvailable == 0)
break;
}
if (state == SIZE_STATE)
{
value = chunkBuffer.readByte();
if (value == NULL_BYTE) // Ignore the null heartbeat byte.
continue;
// CR indicates that we've finished reading the size.
if (value == CR_BYTE)
{
dataBytesToRead = parseInt(hexChunkSize, 16);
state = LF_STATE;
}
else // Hex digit.
{
hexChunkSize += String.fromCharCode(value);
}
if (chunkBuffer.bytesAvailable == 0)
break;
}
if (state == TEST_CHUNK_STATE)
{
// A leading NULL byte in a chunk body indicates that the chunk should be read
// off the network but not processed further.
value = chunkBuffer.readByte();
dataOffset = chunkBuffer.position;
state = (value == NULL_BYTE) ? SKIP_STATE :
DATA_STATE;
chunkBuffer.position--;
if (chunkBuffer.bytesAvailable == 0)
break;
}
if (state == SKIP_STATE)
{
if (chunkBuffer.bytesAvailable >= dataBytesToRead)
{
chunkBuffer.position += dataBytesToRead; // Skip over all bytes for the chunk.
state = RESET_BUFFER_STATE;
}
else // Wait for the rest of the chunk to arrive.
{
break;
}
}
if (state == DATA_STATE)
{
if (chunkBuffer.bytesAvailable >= dataBytesToRead)
{
var message:IMessage = readMessage();
// Dispatch a message event from the channel.
// Prepare for the next chunk.
if (message != null)
{
if (Log.isDebug())
_log.debug("'{0}' channel got message\n{1}\n", channel.id, message.toString());
if ((message is AcknowledgeMessage) && (AcknowledgeMessage(message).correlationId == OPEN_COMMAND))
{
// Store the server-assigned stream id for use during disconnect.
streamId = String(message.body);
// Move the channel to a connected state.
var openEvent:Event = new Event(Event.OPEN);
dispatchEvent(openEvent);
}
else if ((message is CommandMessage) && (CommandMessage(message).operation == CommandMessage.DISCONNECT_OPERATION))
{
// Watch for stream disconnect commands from the server.
// When one is received, do not dispatch it, and instead notify the channel to
// shut down and not attempt to reconnect.
var statusEvent:StatusEvent = new StatusEvent(StatusEvent.STATUS, false, false, DISCONNECT_CODE, "status");
dispatchEvent(statusEvent);
// Exit parse loop - channel shut down will reset parser state.
return;
}
else // Regular message; dispatch it.
{
channel.dispatchEvent(MessageEvent.createEvent(MessageEvent.MESSAGE, message));
}
// We've just finsihed the one point in the parse loop where we step outside of local control.
// The connection may now be closed, so check for this case here.
if (state == CLOSED_STATE)
return;
}
state = RESET_BUFFER_STATE;
}
else // Wait for the rest of the chunk to arrive.
{
break;
}
}
if (state == RESET_BUFFER_STATE)
{
if (chunkBuffer.bytesAvailable > 0) // Copy unparsed bytes for the next chunk over into a new buffer.
{
for (var j:int = 0; j < chunkBuffer.bytesAvailable; j++)
var x:int = chunkBuffer[j];
var tempBuffer:ByteArray = new ByteArray();
tempBuffer.objectEncoding = ObjectEncoding.AMF3;
chunkBuffer.readBytes(tempBuffer, 0, chunkBuffer.bytesAvailable);
chunkBuffer = tempBuffer;
}
else // Nothing left in the buffer; let the GC collect the bytes for the old chunk.
{
chunkBuffer = new ByteArray();
chunkBuffer.objectEncoding = ObjectEncoding.AMF3;
}
state = INIT_STATE;
if (chunkBuffer.bytesAvailable == 0)
break;
}
if (state == CLOSED_STATE)
{
return; // Exit parse loop.
}
}
}
}
/**
* Handles security error events dispatched by the streaming connection by
* re-dispatching the event for the channel.
*
* @param event The SecurityErrorEvent.
*
* @langversion 3.0
* @playerversion Flash 9
* @playerversion AIR 1.1
* @productversion BlazeDS 4
* @productversion LCDS 3
*/
private function streamSecurityErrorHandler(event:SecurityErrorEvent):void
{
dispatchEvent(event);
}
}
}