blob: bc70f70b1a5171e85c7c6081cb0b5aad71ab0f6e [file] [log] [blame]
////////////////////////////////////////////////////////////////////////////////
//
// Licensed to the Apache Software Foundation (ASF) under one or more
// contributor license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright ownership.
// The ASF licenses this file to You under the Apache License, Version 2.0
// (the "License"); you may not use this file except in compliance with
// the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
////////////////////////////////////////////////////////////////////////////////
package mx.messaging
{
import flash.utils.Dictionary;
import mx.core.mx_internal;
import mx.logging.Log;
import mx.messaging.events.MessageEvent;
use namespace mx_internal;
/**
* @private
*
* Helper class that listens for MessageEvents dispatched by ChannelSets that Consumers are subscribed over.
* This class is necessary because the server maintains queues of messages to push to this Flex client on a
* per-endpoint basis but the client may create more than one Channel that connects to a single server endpoint.
* In this scenario, messages can be pushed/polled to the client over a different channel instance than the one
* that the target Consumer subscribed over. The server isn't aware of this difference because both channels are
* pointed at the same endpoint. Here's a diagram to illustrate.
*
* Client:
* Consumer 1 Consumer 2 Consumer 3
* | | /
* ChannelSet 1 ChannelSet 2
* | |
* Channel 1 Channel 2 <- The endpoint URIs for these two channels are identical
* | |
* \_______________________/
* Server: |
* |
* Endpoint (that the two channels point to)
* |
* FlexClientOutboundQueue (for this endpoint for this FlexClient)
* \-- Outbound messages for the three Consumer subscriptions
*
* When the endpoint receives a poll request from Channel 1 it will return queued messages for all three subscriptions
* but back on the client when Channel 1 dispatches message events for Consumer 2 and 3's subscriptions they won't see
* them because they're directly connected to the separate Channel2/ChannelSet2.
* This helper class keeps track of Consumer subscriptions and watches all ChannelSets for message events to
* ensure they're dispatched to the proper Consumer even when the client has been manually (miss)configured as the
* diagram illustrates.
*
* This class is a singleton that maintains a table of all subscribed Consumers and ref-counts the number of active
* subscriptions per ChannelSet to determine whether it needs to be listening for message events from a given
* ChannelSet or not; it dispatches message events from these ChannelSets to the proper Consumer instance
* by invoking the Consumer's messageHandler() method directly.
*/
public class ConsumerMessageDispatcher
{
//--------------------------------------------------------------------------
//
// Class variables
//
//--------------------------------------------------------------------------
/**
* @private
* The sole instance of this singleton class.
*/
private static var _instance:ConsumerMessageDispatcher;
//--------------------------------------------------------------------------
//
// Class methods
//
//--------------------------------------------------------------------------
/**
* Returns the sole instance of this singleton class,
* creating it if it does not already exist.
*
* @langversion 3.0
* @playerversion Flash 9
* @playerversion AIR 1.1
* @productversion BlazeDS 4
* @productversion LCDS 3
*/
public static function getInstance():ConsumerMessageDispatcher
{
if (!_instance)
_instance = new ConsumerMessageDispatcher();
return _instance;
}
//--------------------------------------------------------------------------
//
// Constructor
//
//--------------------------------------------------------------------------
/**
* Constructor.
* Use getInstance() instead of "new" to create.
*
* @langversion 3.0
* @playerversion Flash 9
* @playerversion AIR 1.1
* @productversion BlazeDS 4
* @productversion LCDS 3
*/
public function ConsumerMessageDispatcher()
{
super();
}
//--------------------------------------------------------------------------
//
// Variables
//
//--------------------------------------------------------------------------
/**
* Lookup table for subscribed Consumer instances; Object<Consumer clientId, Consumer>
* This is used to dispatch pushed/polled messages to the proper Consumer instance.
*
* @langversion 3.0
* @playerversion Flash 9
* @playerversion AIR 1.1
* @productversion BlazeDS 4
* @productversion LCDS 3
*/
private const _consumers:Object = {};
/**
* Table of ref-counts per ChannelSet that subscribed Consumer instances are using; Dictionary<ChannelSet, ref-count> (non-weak keys).
* The ref-count is the number of subscribed Consumers for the ChannelSet.
* When we add a new ChannelSet we need to start listening on it for MessageEvents to redispatch to subscribed Consumers.
* When the ref-count drops to zero we need to stop listening on it for MessageEvents and remove it from the table.
*
* @langversion 3.0
* @playerversion Flash 9
* @playerversion AIR 1.1
* @productversion BlazeDS 4
* @productversion LCDS 3
*/
private const _channelSetRefCounts:Dictionary = new Dictionary(/* strong keys */);
/**
* Table used to prevent duplicate delivery of messages to a Consumer when multiple ChannelSets are
* connected to the same server endpoint over a single, underlying shared Channel.
*
* @langversion 3.0
* @playerversion Flash 9
* @playerversion AIR 1.1
* @productversion BlazeDS 4
* @productversion LCDS 3
*/
private const _consumerDuplicateMessageBarrier:Object = {};
//--------------------------------------------------------------------------
//
// Public Methods
//
//--------------------------------------------------------------------------
/**
* Determines whether any subscriptions are using the specified channel.
*
* @langversion 3.0
* @playerversion Flash 9
* @playerversion AIR 1.1
* @productversion BlazeDS 4
* @productversion LCDS 3
*/
public function isChannelUsedForSubscriptions(channel:Channel):Boolean
{
var memberOfChannelSets:Array = channel.channelSets;
var cs:ChannelSet = null;
var n:int = memberOfChannelSets.length;
for (var i:int = 0; i < n; i++)
{
cs = memberOfChannelSets[i];
if ((_channelSetRefCounts[cs] != null) && (cs.currentChannel == channel))
return true;
}
return false;
}
/**
* Registers a Consumer subscription.
* This will cause the ConsumerMessageDispatcher to start listening for MessageEvents
* from the underlying ChannelSet used to subscribe and redispatch messages to Consumers.
*
* @langversion 3.0
* @playerversion Flash 9
* @playerversion AIR 1.1
* @productversion BlazeDS 4
* @productversion LCDS 3
*/
public function registerSubscription(consumer:AbstractConsumer):void
{
_consumers[consumer.clientId] = consumer;
if (_channelSetRefCounts[consumer.channelSet] == null)
{
// If this is the first time we've seen this ChannelSet start listening for message events
// and initialize its ref-count.
consumer.channelSet.addEventListener(MessageEvent.MESSAGE, messageHandler);
_channelSetRefCounts[consumer.channelSet] = 1;
}
else
{
// We're already listening for message events; just increment the ref-count.
_channelSetRefCounts[consumer.channelSet]++;
}
}
/**
* Unregisters a Consumer subscription.
* The ConsumerMessageDispatcher will stop monitoring underlying channels for messages for
* this Consumer.
*
* @langversion 3.0
* @playerversion Flash 9
* @playerversion AIR 1.1
* @productversion BlazeDS 4
* @productversion LCDS 3
*/
public function unregisterSubscription(consumer:AbstractConsumer):void
{
delete _consumers[consumer.clientId];
var refCount:int = _channelSetRefCounts[consumer.channelSet];
if (--refCount == 0)
{
// If this was the last Consumer using this ChannelSet stop listening for message events
// and blow away the ref-count.
consumer.channelSet.removeEventListener(MessageEvent.MESSAGE, messageHandler);
delete _channelSetRefCounts[consumer.channelSet];
// And clean up the duplicate message delivery barrier if necessary.
if (_consumerDuplicateMessageBarrier[consumer.id] != null)
delete _consumerDuplicateMessageBarrier[consumer.id];
}
else
{
// Save the decremented ref-count.
_channelSetRefCounts[consumer.channelSet] = refCount;
}
}
//--------------------------------------------------------------------------
//
// Private Methods
//
//--------------------------------------------------------------------------
/**
* Handles message events from ChannelSets that Consumers are subscribed over.
* We just need to redirect the event to the proper Consumer instance.
*
* @langversion 3.0
* @playerversion Flash 9
* @playerversion AIR 1.1
* @productversion BlazeDS 4
* @productversion LCDS 3
*/
private function messageHandler(event:MessageEvent):void
{
var consumer:AbstractConsumer = _consumers[event.message.clientId];
if (consumer == null)
{
if (Log.isDebug())
Log.getLogger("mx.messaging.Consumer").debug("'{0}' received pushed message for consumer but no longer subscribed: {1}", event.message.clientId, event.message);
return;
}
// Determine how many of these will actually redispatch the same event from the shared underlying channel.
if (event.target.currentChannel.channelSets.length > 1)
{
var count:int = 0;
for each (var cs:ChannelSet in event.target.currentChannel.channelSets)
{
if (_channelSetRefCounts[cs] != null)
++count;
}
if (count > 1)
{
// We need to dispatch this message to the target Consumer only once and filter out
// the duplicate events.
if (_consumerDuplicateMessageBarrier[consumer.id] == null)
{
// Record the number of times we will receive a message event for this message.
_consumerDuplicateMessageBarrier[consumer.id] = [event.messageId, count];
// Dispatch once - only the first time we see this message for this Consumer.
consumer.messageHandler(event);
}
// Cleanup.
var duplicateDispatchGuard:Array = _consumerDuplicateMessageBarrier[consumer.id];
if (duplicateDispatchGuard[0] == event.messageId)
{
if (--duplicateDispatchGuard[1] == 0)
delete _consumerDuplicateMessageBarrier[consumer.id];
}
return; // Exit early.
}
}
// Only one ChannelSet so we don't need to worry about this.
consumer.messageHandler(event);
}
}
}