| //////////////////////////////////////////////////////////////////////////////// |
| // |
| // Licensed to the Apache Software Foundation (ASF) under one or more |
| // contributor license agreements. See the NOTICE file distributed with |
| // this work for additional information regarding copyright ownership. |
| // The ASF licenses this file to You under the Apache License, Version 2.0 |
| // (the "License"); you may not use this file except in compliance with |
| // the License. You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, software |
| // distributed under the License is distributed on an "AS IS" BASIS, |
| // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| // See the License for the specific language governing permissions and |
| // limitations under the License. |
| // |
| //////////////////////////////////////////////////////////////////////////////// |
| |
| package mx.messaging |
| { |
| |
| import org.apache.royale.events.Event; |
| import org.apache.royale.utils.Timer; |
| |
| import mx.core.mx_internal; |
| import mx.events.PropertyChangeEvent; |
| import mx.logging.Log; |
| import mx.messaging.channels.PollingChannel; |
| import mx.messaging.errors.ArgumentError; |
| import mx.messaging.events.ChannelEvent; |
| import mx.messaging.events.ChannelFaultEvent; |
| import mx.messaging.events.MessageEvent; |
| import mx.messaging.events.MessageFaultEvent; |
| import mx.messaging.messages.AcknowledgeMessage; |
| import mx.messaging.messages.CommandMessage; |
| import mx.messaging.messages.ErrorMessage; |
| import mx.messaging.messages.IMessage; |
| import mx.resources.IResourceManager; |
| import mx.resources.ResourceManager; |
| |
| use namespace mx_internal; |
| |
| /** |
| * Dispatched when a message is received by the Consumer. |
| * |
| * @eventType mx.messaging.events.MessageEvent.MESSAGE |
| * |
| * @langversion 3.0 |
| * @playerversion Flash 9 |
| * @playerversion AIR 1.1 |
| * @productversion BlazeDS 4 |
| * @productversion LCDS 3 |
| */ |
| [Event(name="message", type="mx.messaging.events.MessageEvent")] |
| |
| //[ResourceBundle("messaging")] |
| |
| /** |
| * The AbstractConsumer is the base class for both the Consumer and |
| * MultiTopicConsumer classes. You use those classes to receive pushed |
| * messages from the server. |
| * |
| * @langversion 3.0 |
| * @playerversion Flash 9 |
| * @playerversion AIR 1.1 |
| * @productversion BlazeDS 4 |
| * @productversion LCDS 3 |
| */ |
| public class AbstractConsumer extends MessageAgent |
| { |
| //-------------------------------------------------------------------------- |
| // |
| // Constructor |
| // |
| //-------------------------------------------------------------------------- |
| |
| /** |
| * Constructs a Consumer. |
| * |
| * |
| * @example |
| * <listing version="3.0"> |
| * function initConsumer():void |
| * { |
| * var consumer:Consumer = new Consumer(); |
| * consumer.destination = "NASDAQ"; |
| * consumer.selector = "operation IN ('Bid','Ask')"; |
| * consumer.addEventListener(MessageEvent.MESSAGE, messageHandler); |
| * consumer.subscribe(); |
| * } |
| * |
| * function messageHandler(event:MessageEvent):void |
| * { |
| * var msg:IMessage = event.message; |
| * var info:Object = msg.body; |
| * trace("-App recieved message: " + msg.toString()); |
| * } |
| * </listing> |
| * |
| * @langversion 3.0 |
| * @playerversion Flash 9 |
| * @playerversion AIR 1.1 |
| * @productversion BlazeDS 4 |
| * @productversion LCDS 3 |
| */ |
| public function AbstractConsumer() |
| { |
| super(); |
| _log = Log.getLogger("mx.messaging.Consumer"); |
| _agentType = "consumer"; |
| } |
| |
| //-------------------------------------------------------------------------- |
| // |
| // Variables |
| // |
| //-------------------------------------------------------------------------- |
| |
| /** |
| * @private |
| * This is the current number of resubscribe attempts that we've done. |
| */ |
| private var _currentAttempt:int; |
| |
| /** |
| * @private |
| * The timer used for resubscribe attempts. |
| */ |
| private var _resubscribeTimer:Timer; |
| |
| /** |
| * Flag indicating whether this consumer should be subscribed or not. |
| * |
| * @langversion 3.0 |
| * @playerversion Flash 9 |
| * @playerversion AIR 1.1 |
| * @productversion BlazeDS 4 |
| * @productversion LCDS 3 |
| */ |
| protected var _shouldBeSubscribed:Boolean; |
| |
| /** |
| * @private |
| * Current subscribe message - used for resubscribe attempts. |
| */ |
| private var _subscribeMsg:CommandMessage; |
| |
| /** |
| * @private |
| */ |
| private var resourceManager:IResourceManager = |
| ResourceManager.getInstance(); |
| |
| //-------------------------------------------------------------------------- |
| // |
| // Properties |
| // |
| //-------------------------------------------------------------------------- |
| |
| //---------------------------------- |
| // clientId |
| //---------------------------------- |
| |
| /** |
| * @private |
| * If our clientId has changed we may need to unsubscribe() using the |
| * current clientId and then resubscribe using the new clientId. |
| * // TODO - remove this? |
| * |
| * @param value The clientId value. |
| */ |
| override mx_internal function setClientId(value:String):void |
| { |
| if (super.clientId != value) |
| { |
| var resetSubscription:Boolean = false; |
| if (subscribed) |
| { |
| unsubscribe(); |
| resetSubscription = true; |
| } |
| |
| super.setClientId(value); |
| |
| if (resetSubscription) |
| subscribe(value); |
| } |
| } |
| |
| //---------------------------------- |
| // destination |
| //---------------------------------- |
| |
| /** |
| * @private |
| * Updates the destination for this Consumer and resubscribes if the |
| * Consumer is currently subscribed. |
| */ |
| override public function set destination(value:String):void |
| { |
| if (destination != value) |
| { |
| var resetSubscription:Boolean = false; |
| if (subscribed) |
| { |
| unsubscribe(); |
| resetSubscription = true; |
| } |
| |
| super.destination = value; |
| |
| if (resetSubscription) |
| subscribe(); |
| } |
| } |
| |
| //---------------------------------- |
| // maxFrequency |
| //---------------------------------- |
| |
| /** |
| * @private |
| */ |
| private var _maxFrequency:uint = 0; |
| |
| [Bindable(event="propertyChange")] |
| /** |
| * Determines the maximum number of messages per second the Consumer wants |
| * to receive. A server that understands this value will use it as an input |
| * while it determines how fast to send messages to the Consumer. Default is 0 |
| * which means Consumer does not have a preference for the message rate. |
| * Note that this property should be set before the Consumer subscribes and |
| * any changes after Consumer subscription will not have any effect until |
| * Consumer unsubscribes and resubscribes. |
| * |
| * @langversion 3.0 |
| * @playerversion Flash 9 |
| * @playerversion AIR 1.1 |
| * @productversion BlazeDS 4 |
| * @productversion LCDS 3 |
| */ |
| public function get maxFrequency():uint |
| { |
| return _maxFrequency; |
| } |
| |
| /** |
| * @private |
| */ |
| public function set maxFrequency(value:uint):void |
| { |
| var event:PropertyChangeEvent = PropertyChangeEvent.createUpdateEvent(this, "maxFrequency", _maxFrequency, value); |
| _maxFrequency = value; |
| dispatchEvent(event); |
| } |
| |
| //---------------------------------- |
| // resubscribeAttempts |
| //---------------------------------- |
| |
| /** |
| * @private |
| */ |
| private var _resubscribeAttempts:int = 5; |
| |
| [Bindable(event="propertyChange")] |
| |
| /** |
| * The number of resubscribe attempts that the Consumer makes in the event |
| * that the destination is unavailable or the connection to the destination fails. |
| * A value of -1 enables infinite attempts. |
| * A value of zero disables resubscribe attempts. |
| * <p> |
| * Resubscribe attempts are made at a constant rate according to the resubscribe interval |
| * value. When a resubscribe attempt is made if the underlying channel for the Consumer is not |
| * connected or attempting to connect the channel will start a connect attempt. |
| * Subsequent Consumer resubscribe attempts that occur while the underlying |
| * channel connect attempt is outstanding are effectively ignored until |
| * the outstanding channel connect attempt succeeds or fails. |
| * </p> |
| * |
| * @see mx.messaging.Consumer#resubscribeInterval |
| * |
| * @langversion 3.0 |
| * @playerversion Flash 9 |
| * @playerversion AIR 1.1 |
| * @productversion BlazeDS 4 |
| * @productversion LCDS 3 |
| */ |
| public function get resubscribeAttempts():int |
| { |
| return _resubscribeAttempts; |
| } |
| |
| /** |
| * @private |
| */ |
| public function set resubscribeAttempts(value:int):void |
| { |
| if (_resubscribeAttempts != value) |
| { |
| if (value == 0) |
| stopResubscribeTimer(); |
| |
| var event:PropertyChangeEvent = PropertyChangeEvent.createUpdateEvent(this, "resubscribeAttempts", _resubscribeAttempts, value); |
| _resubscribeAttempts = value; |
| dispatchEvent(event); |
| } |
| } |
| |
| //---------------------------------- |
| // resubscribeInterval |
| //---------------------------------- |
| |
| /** |
| * @private |
| */ |
| private var _resubscribeInterval:int = 5000; |
| |
| [Bindable(event="propertyChange")] |
| |
| /** |
| * The number of milliseconds between resubscribe attempts. |
| * If a Consumer doesn't receive an acknowledgement for a subscription |
| * request, it will wait the specified number of milliseconds before |
| * attempting to resubscribe. |
| * Setting the value to zero disables resubscriptions. |
| * <p> |
| * Resubscribe attempts are made at a constant rate according to this |
| * value. When a resubscribe attempt is made if the underlying channel for the Consumer is not |
| * connected or attempting to connect the channel will start a connect attempt. |
| * Subsequent Consumer resubscribe attempts that occur while the underlying |
| * channel connect attempt is outstanding are effectively ignored until |
| * the outstanding channel connect attempt succeeds or fails. |
| * </p> |
| * |
| * @see mx.messaging.Consumer#resubscribeInterval |
| * |
| * @throws ArgumentError If the assigned value is negative. |
| * |
| * @langversion 3.0 |
| * @playerversion Flash 9 |
| * @playerversion AIR 1.1 |
| * @productversion BlazeDS 4 |
| * @productversion LCDS 3 |
| */ |
| public function get resubscribeInterval():int |
| { |
| return _resubscribeInterval; |
| } |
| |
| /** |
| * @private |
| */ |
| public function set resubscribeInterval(value:int):void |
| { |
| if (_resubscribeInterval != value) |
| { |
| if (value < 0) |
| { |
| var message:String = resourceManager.getString( |
| "messaging", "resubscribeIntervalNegative"); |
| throw new ArgumentError(message); |
| } |
| else if (value == 0) |
| { |
| stopResubscribeTimer(); |
| } |
| else if (_resubscribeTimer != null) |
| { |
| _resubscribeTimer.delay = value; |
| } |
| |
| var event:PropertyChangeEvent = PropertyChangeEvent.createUpdateEvent(this, "resubscribeInterval", _resubscribeInterval, value); |
| _resubscribeInterval = value; |
| dispatchEvent(event); |
| } |
| } |
| |
| //---------------------------------- |
| // subscribed |
| //---------------------------------- |
| |
| /** |
| * @private |
| */ |
| private var _subscribed:Boolean; |
| |
| [Bindable(event="propertyChange")] |
| |
| /** |
| * Indicates whether the Consumer is currently subscribed. The <code>propertyChange</code> |
| * event is dispatched when this property changes. |
| * |
| * @langversion 3.0 |
| * @playerversion Flash 9 |
| * @playerversion AIR 1.1 |
| * @productversion BlazeDS 4 |
| * @productversion LCDS 3 |
| */ |
| public function get subscribed():Boolean |
| { |
| return _subscribed; |
| } |
| |
| /** |
| * @private |
| */ |
| protected function setSubscribed(value:Boolean):void |
| { |
| if (_subscribed != value) |
| { |
| var event:PropertyChangeEvent = PropertyChangeEvent.createUpdateEvent(this, "subscribed", _subscribed, value); |
| _subscribed = value; |
| |
| // Register or unregister our subscription state with the ConsumerMessageDispatcher. |
| // This allows the singleton ConsumerMessageDispatcher to start or stop listening for |
| // messages on our behalf. |
| if (_subscribed) |
| { |
| ConsumerMessageDispatcher.getInstance().registerSubscription(this); |
| if (channelSet != null && channelSet.currentChannel != null && channelSet.currentChannel is PollingChannel) |
| PollingChannel(channelSet.currentChannel).enablePolling(); |
| } |
| else |
| { |
| ConsumerMessageDispatcher.getInstance().unregisterSubscription(this); |
| if (channelSet != null && channelSet.currentChannel != null && channelSet.currentChannel is PollingChannel) |
| PollingChannel(channelSet.currentChannel).disablePolling(); |
| } |
| |
| dispatchEvent(event); |
| } |
| } |
| |
| //---------------------------------- |
| // timestamp |
| //---------------------------------- |
| |
| /** |
| * @private |
| */ |
| private var _timestamp:Number = -1; |
| |
| [Bindable(event="propertyChange")] |
| |
| /** |
| * Contains the timestamp of the most recent message this Consumer |
| * has received. |
| * This value is passed to the destination in a <code>receive()</code> call |
| * to request that it deliver messages for the Consumer from the timestamp |
| * forward. |
| * All messages with a timestamp value greater than the |
| * <code>timestamp</code> value will be returned during a poll operation. |
| * Setting this value to -1 will retrieve all cached messages from the |
| * destination. |
| * |
| * @langversion 3.0 |
| * @playerversion Flash 9 |
| * @playerversion AIR 1.1 |
| * @productversion BlazeDS 4 |
| * @productversion LCDS 3 |
| */ |
| public function get timestamp():Number |
| { |
| return _timestamp; |
| } |
| |
| /** |
| * @private |
| */ |
| public function set timestamp(value:Number):void |
| { |
| if (_timestamp != value) |
| { |
| var event:PropertyChangeEvent = PropertyChangeEvent.createUpdateEvent(this, "timestamp", _timestamp, value); |
| _timestamp = value; |
| dispatchEvent(event); |
| } |
| } |
| |
| //-------------------------------------------------------------------------- |
| // |
| // Overridden Methods |
| // |
| //-------------------------------------------------------------------------- |
| |
| /** |
| * @private |
| * Custom processing for subscribe, unsubscribe and poll message |
| * acknowledgments. |
| * |
| * @param ackMsg The AcknowledgeMessage. |
| * |
| * @param msg The original subscribe, unsubscribe or poll message. |
| */ |
| override public function acknowledge(ackMsg:AcknowledgeMessage, msg:IMessage):void |
| { |
| // Ignore acks for any outstanding messages that return after disconnect() is invoked. |
| if (_disconnectBarrier) |
| return; |
| |
| // Only run Consumer processing if this isn't an error. |
| if (!ackMsg.headers[AcknowledgeMessage.ERROR_HINT_HEADER] && (msg is CommandMessage)) |
| { |
| var command:CommandMessage = msg as CommandMessage; |
| |
| var op:int = command.operation; |
| |
| // For MultiTopicConsumers, the message gets marked if this is the |
| // message completely unsubscribes the client. |
| if (op == CommandMessage.MULTI_SUBSCRIBE_OPERATION) |
| { |
| if (msg.headers.DSlastUnsub != null) |
| op = CommandMessage.UNSUBSCRIBE_OPERATION; |
| else |
| op = CommandMessage.SUBSCRIBE_OPERATION; |
| } |
| |
| switch (op) |
| { |
| case CommandMessage.UNSUBSCRIBE_OPERATION: |
| if (Log.isInfo()) |
| _log.info("'{0}' {1} acknowledge for unsubscribe.", id, _agentType); |
| super.setClientId(null); |
| setSubscribed(false); // Stop listening for messages. |
| ackMsg.clientId = null; // Force the ack's clientId to null as well before ack'ing it. |
| super.acknowledge(ackMsg, msg); |
| break; |
| |
| case CommandMessage.SUBSCRIBE_OPERATION: |
| stopResubscribeTimer(); |
| // NOTE: the -1 in the timestamp assignment below. |
| // This works around a bug where if a Producer sends |
| // a message in the same batch as the subscribe, |
| // it will end up with (likely) the same timestamp |
| // as the consumer. Because the message is sent |
| // by the client after the subscribe though, it |
| // should still be delivered. |
| // TODO: Improve solution here. |
| if (ackMsg.timestamp > _timestamp) |
| _timestamp = ackMsg.timestamp - 1; |
| |
| if (Log.isInfo()) |
| _log.info("'{0}' {1} acknowledge for subscribe. Client id '{2}' new timestamp {3}", |
| id, _agentType, ackMsg.clientId, _timestamp); |
| super.setClientId(ackMsg.clientId); |
| setSubscribed(true); |
| super.acknowledge(ackMsg, msg); |
| break; |
| |
| // Handle the result of a receive() invocation (a Consumer instance-specific poll request). |
| case CommandMessage.POLL_OPERATION: |
| if ((ackMsg.body != null) && (ackMsg.body is Array)) |
| { |
| var messageList:Array = ackMsg.body as Array; |
| for each (var message:IMessage in messageList) |
| messageHandler(MessageEvent.createEvent(MessageEvent.MESSAGE, message)); |
| } |
| super.acknowledge(ackMsg, msg); |
| break; |
| } |
| } |
| else |
| { |
| super.acknowledge(ackMsg, msg); |
| } |
| } |
| |
| /** |
| * Disconnects the Consumer from its remote destination. |
| * This method should be invoked on a Consumer that is no longer |
| * needed by an application after unsubscribing. |
| * This method does not wait for outstanding network operations to complete |
| * and does not send an unsubscribe message to the server. |
| * After invoking disconnect(), the Consumer will report that it is in an |
| * disconnected, unsubscribed state because it will not receive any more |
| * messages until it has reconnected and resubscribed. |
| * Disconnecting stops automatic resubscription attempts if they are running. |
| * |
| * @langversion 3.0 |
| * @playerversion Flash 9 |
| * @playerversion AIR 1.1 |
| * @productversion BlazeDS 4 |
| * @productversion LCDS 3 |
| */ |
| override public function disconnect():void |
| { |
| // We don't invoke unsubscribe() in this case because a Consumer subscribed to a |
| // JMS destination durably will blow away the durable subscription. |
| _shouldBeSubscribed = false; // Prevent resubscribe attempts. |
| stopResubscribeTimer(); |
| setSubscribed(false); |
| |
| super.disconnect(); |
| } |
| |
| /** |
| * @private |
| * The Consumer supresses ErrorMessage processing if the error is |
| * retryable and it is configured to resubscribe. |
| * |
| * @param errMsg The ErrorMessage describing the fault. |
| * |
| * @param msg The original message (generally a subscribe). |
| */ |
| override public function fault(errMsg:ErrorMessage, msg:IMessage):void |
| { |
| // Ignore faults for any outstanding messages that return after disconnect() is invoked. |
| if (_disconnectBarrier) |
| return; |
| |
| if (errMsg.headers[ErrorMessage.RETRYABLE_HINT_HEADER]) |
| { |
| if (_resubscribeTimer == null) |
| { |
| // If this error correlates to our current subscribe message, |
| // we should no longer be subscribed. |
| if ((_subscribeMsg != null) && (errMsg.correlationId == _subscribeMsg.messageId)) |
| _shouldBeSubscribed = false; |
| super.fault(errMsg, msg); |
| } |
| // Else, suppress the fault dispatch because the resubscribe |
| // timer is running and will generate a fault when it runs out of |
| // allowed resubscribe attempts. |
| } |
| else |
| { |
| super.fault(errMsg, msg); |
| } |
| } |
| |
| /** |
| * @private |
| * Custom processing to warn the user if the consumer is connected over |
| * a non-real channel. |
| * |
| * @param event The ChannelEvent. |
| */ |
| override public function channelConnectHandler(event:ChannelEvent):void |
| { |
| super.channelConnectHandler(event); |
| |
| if (connected && channelSet != null && channelSet.currentChannel != null |
| && !channelSet.currentChannel.realtime && Log.isWarn()) |
| { |
| _log.warn("'{0}' {1} connected over a non-realtime channel '{2}'" |
| + " which means channel is not automatically receiving updates via polling or server push." |
| , id, _agentType, channelSet.currentChannel.id); |
| } |
| } |
| |
| /** |
| * @private |
| * Custom processing to start up a resubscribe timer if our channel is |
| * disconnected when we should be subscribed. |
| * |
| * @param event The ChannelEvent. |
| */ |
| override public function channelDisconnectHandler(event:ChannelEvent):void |
| { |
| setSubscribed(false); |
| |
| super.channelDisconnectHandler(event); |
| |
| if (_shouldBeSubscribed && !event.rejected) |
| startResubscribeTimer(); |
| } |
| |
| /** |
| * @private |
| * Custom processing to start up a resubscribe timer if our channel faults |
| * when we should be subscribed. |
| * |
| * @param event The ChannelFaultEvent. |
| */ |
| override public function channelFaultHandler(event:ChannelFaultEvent):void |
| { |
| if (!event.channel.connected) |
| setSubscribed(false); |
| |
| super.channelFaultHandler(event); |
| |
| if (_shouldBeSubscribed && !event.rejected && !event.channel.connected) |
| startResubscribeTimer(); |
| } |
| |
| //-------------------------------------------------------------------------- |
| // |
| // Methods |
| // |
| //-------------------------------------------------------------------------- |
| |
| /** |
| * Requests any messages that are queued for this Consumer on the server. |
| * This method should only be used for Consumers that subscribe over non-realtime, |
| * non-polling channels. |
| * This method is a no-op if the Consumer is not subscribed. |
| * |
| * @param timestamp This argument is deprecated and is ignored. |
| * |
| * @langversion 3.0 |
| * @playerversion Flash 9 |
| * @playerversion AIR 1.1 |
| * @productversion BlazeDS 4 |
| * @productversion LCDS 3 |
| */ |
| public function receive(timestamp:Number = 0):void |
| { |
| if (clientId != null) // We need a clientId to distinguish this from a generic poll request sent by a polling channel. |
| { |
| var msg:CommandMessage = new CommandMessage(); |
| msg.operation = CommandMessage.POLL_OPERATION; |
| msg.destination = destination; |
| internalSend(msg); |
| } |
| } |
| |
| /** |
| * Subscribes to the remote destination. |
| * |
| * @param clientId The client id to subscribe with. Use null for non-durable Consumers. If the subscription is durable, a consistent |
| * value must be supplied every time the Consumer subscribes in order |
| * to reconnect to the correct durable subscription in the remote destination. |
| * |
| * @throws mx.messaging.errors.InvalidDestinationError If no destination is set. |
| * |
| * @langversion 3.0 |
| * @playerversion Flash 9 |
| * @playerversion AIR 1.1 |
| * @productversion BlazeDS 4 |
| * @productversion LCDS 3 |
| */ |
| public function subscribe(clientId:String = null):void |
| { |
| // Set a flag to determine whether the passed clientId differs from the |
| // current value and should be assigned. |
| var resetClientId:Boolean = ((clientId != null) && |
| (super.clientId != clientId)) ? true : false; |
| |
| if (subscribed && resetClientId) |
| { |
| // We're already subscribed, but we need to resubscribe under |
| // the new clientId. |
| unsubscribe(); |
| } |
| |
| // Make sure any resubscribe timer is stopped. |
| stopResubscribeTimer(); |
| |
| _shouldBeSubscribed = true; |
| if (resetClientId) |
| super.setClientId(clientId); |
| if (Log.isInfo()) |
| _log.info("'{0}' {1} subscribe.", id, _agentType); |
| _subscribeMsg = buildSubscribeMessage(); |
| |
| internalSend(_subscribeMsg); |
| } |
| |
| /** |
| * Unsubscribes from the remote destination. In the case of durable JMS |
| * subscriptions, this will destroy the durable subscription on the JMS server. |
| * |
| * @param preserveDurable - when true, durable JMS subscriptions are not destroyed |
| * allowing consumers to later resubscribe and receive missed messages |
| * |
| * @langversion 3.0 |
| * @playerversion Flash 9 |
| * @playerversion AIR 1.1 |
| * @productversion BlazeDS 4 |
| * @productversion LCDS 3 |
| */ |
| public function unsubscribe(preserveDurable:Boolean = false):void |
| { |
| _shouldBeSubscribed = false; |
| if (subscribed) |
| { |
| // Stop listening now for any messages as we could be set to a new |
| // channel before the ack comes back, and once the ack returns we |
| // will no longer have a valid client id. |
| if (channelSet != null) |
| channelSet.removeEventListener(destination, messageHandler); |
| |
| if (Log.isInfo()) |
| _log.info("'{0}' {1} unsubscribe.", id, _agentType); |
| |
| internalSend(buildUnsubscribeMessage(preserveDurable)); |
| } |
| else |
| { |
| stopResubscribeTimer(); |
| } |
| } |
| |
| //-------------------------------------------------------------------------- |
| // |
| // Internal Methods |
| // |
| //-------------------------------------------------------------------------- |
| |
| /** |
| * @private |
| * Consumers subscribe for messages from a destination and this is the handler |
| * method that is invoked when a message for this Consumer is pushed or polled |
| * from the server. |
| * |
| * @param event The MessageEvent. |
| */ |
| mx_internal function messageHandler(event:MessageEvent):void |
| { |
| // NOTE: This method is invoked directly by the ConsumerMessageDispatcher. |
| // The event flow for a pushed message is: |
| // 1. Channel receives a pushed/polled message and dispatches a message event |
| // 2. Any ChannelSets connected to the Channel will handle these events in ChannelSet.messageHandler(); |
| // simply redispatching them. |
| // 3. Consumers that subscribe to a destination trigger the internal use of a shared ConsumerMessageDispatcher |
| // that listens for message events from any ChannelSets that Consumers have subscribed over and this helper routes pushed messages to the proper Consumer instances. |
| var message:IMessage = event.message; |
| if (message is CommandMessage) |
| { |
| var command:CommandMessage = message as CommandMessage; |
| switch (command.operation) |
| { |
| case CommandMessage.SUBSCRIPTION_INVALIDATE_OPERATION: |
| // We've been unsubscribed but it wasn't the result of an unsubscribe |
| // message this agent sent. Set unsubscribe to false which will inform |
| // the polling channel to stop polling if a polling channel is being used. |
| setSubscribed(false); |
| break; |
| default: |
| if (Log.isWarn()) |
| _log.warn("'{0}' received a CommandMessage '{1}' that could not be handled.", id, CommandMessage.getOperationAsString(command.operation)); |
| } |
| /* |
| * Command messages are handled internally by the Consumer and |
| * are not dispatched to message listeners via MessageEvents. |
| */ |
| return; |
| } |
| |
| if (message.timestamp > _timestamp) |
| _timestamp = message.timestamp; |
| |
| // Server might push out error messages (eg. during MessageClient.invalidate) |
| // that need to be dispatched as message fault events. |
| if (message is ErrorMessage) |
| dispatchEvent(MessageFaultEvent.createEvent(ErrorMessage(message))); |
| else |
| dispatchEvent(MessageEvent.createEvent(MessageEvent.MESSAGE, message)); |
| } |
| |
| //-------------------------------------------------------------------------- |
| // |
| // Protected Methods |
| // |
| //-------------------------------------------------------------------------- |
| |
| /** |
| * Returns a subscribe message. |
| * This method should be overridden by subclasses if they need custom |
| * subscribe messages. |
| * |
| * @return The subscribe CommandMessage. |
| * |
| * @langversion 3.0 |
| * @playerversion Flash 9 |
| * @playerversion AIR 1.1 |
| * @productversion BlazeDS 4 |
| * @productversion LCDS 3 |
| */ |
| protected function buildSubscribeMessage():CommandMessage |
| { |
| var msg:CommandMessage = new CommandMessage(); |
| msg.operation = CommandMessage.SUBSCRIBE_OPERATION; |
| msg.clientId = clientId; |
| msg.destination = destination; |
| if (maxFrequency > 0) |
| msg.headers[CommandMessage.MAX_FREQUENCY_HEADER] = maxFrequency; |
| return msg; |
| } |
| |
| /** |
| * Returns an unsubscribe message. |
| * This method should be overridden by subclasses if they need custom |
| * unsubscribe messages. |
| * |
| * @param preserveDurable - when true, durable JMS subscriptions are not destroyed |
| * allowing consumers to later resubscribe and receive missed messages |
| * |
| * @return The unsubscribe CommandMessage. |
| * |
| * @langversion 3.0 |
| * @playerversion Flash 9 |
| * @playerversion AIR 1.1 |
| * @productversion BlazeDS 4 |
| * @productversion LCDS 3 |
| */ |
| protected function buildUnsubscribeMessage(preserveDurable:Boolean):CommandMessage |
| { |
| var msg:CommandMessage = new CommandMessage(); |
| msg.operation = CommandMessage.UNSUBSCRIBE_OPERATION; |
| msg.clientId = clientId; |
| msg.destination = destination; |
| |
| // only include the PRESERVE_DURABLE_HEADER param in the message if |
| // its value is true |
| if (preserveDurable) |
| msg.headers[CommandMessage.PRESERVE_DURABLE_HEADER] = preserveDurable; |
| |
| return msg; |
| } |
| |
| /** |
| * @private |
| * Attempt to resubscribe. |
| * This can be called directly or from a Timer's event handler. |
| * |
| * @param event The timer event for resubscribe attempts. |
| */ |
| protected function resubscribe(event:Event):void |
| { |
| // If we're past our limit of attempts, fault out. |
| if ((_resubscribeAttempts != -1) && |
| (_currentAttempt >= _resubscribeAttempts)) |
| { |
| stopResubscribeTimer(); |
| _shouldBeSubscribed = false; |
| var errMsg:ErrorMessage = new ErrorMessage(); |
| errMsg.faultCode = "Client.Error.Subscribe"; |
| errMsg.faultString = resourceManager.getString( |
| "messaging", "consumerSubscribeError"); |
| errMsg.faultDetail = resourceManager.getString( |
| "messaging", "failedToSubscribe"); |
| errMsg.correlationId = _subscribeMsg.messageId; |
| fault(errMsg, _subscribeMsg); |
| return; |
| } |
| |
| if (Log.isDebug()) |
| _log.debug("'{0}' {1} trying to resubscribe.", id, _agentType); |
| |
| _resubscribeTimer.delay = _resubscribeInterval; |
| _currentAttempt++; |
| // Send the resubscribe message, skipping the MessageAgent's queue that blocks |
| // messages until the clientId is set. |
| internalSend(_subscribeMsg, false); |
| } |
| |
| /** |
| * @private |
| * This method will start a timer which attempts to resubscribe |
| * periodically. |
| */ |
| protected function startResubscribeTimer():void |
| { |
| if (_shouldBeSubscribed && (_resubscribeTimer == null)) |
| { |
| // If we're configured for resubscribe start up the timer. |
| if ((_resubscribeAttempts != 0) && (_resubscribeInterval > 0)) |
| { |
| if (Log.isDebug()) |
| _log.debug("'{0}' {1} starting resubscribe timer.", id, _agentType); |
| /* |
| * Initially, the timeout is set to 1 so we try to |
| * reconnect immediately (perhaps to a different channel). |
| * after that, it will poll at the configured time interval. |
| * |
| */ |
| _resubscribeTimer = new Timer(1); |
| _resubscribeTimer.addEventListener(Timer.TIMER, resubscribe); |
| _resubscribeTimer.start(); |
| _currentAttempt = 0; |
| } |
| } |
| } |
| |
| /** |
| * @private |
| * Stops a resubscribe timer if one is running. |
| */ |
| protected function stopResubscribeTimer():void |
| { |
| if (_resubscribeTimer != null) |
| { |
| if (Log.isDebug()) |
| _log.debug("'{0}' {1} stopping resubscribe timer.", id, _agentType); |
| |
| _resubscribeTimer.removeEventListener(Timer.TIMER, resubscribe); |
| _resubscribeTimer.reset(); |
| _resubscribeTimer = null; |
| } |
| } |
| } |
| |
| } |