| //////////////////////////////////////////////////////////////////////////////// |
| // |
| // Licensed to the Apache Software Foundation (ASF) under one or more |
| // contributor license agreements. See the NOTICE file distributed with |
| // this work for additional information regarding copyright ownership. |
| // The ASF licenses this file to You under the Apache License, Version 2.0 |
| // (the "License"); you may not use this file except in compliance with |
| // the License. You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, software |
| // distributed under the License is distributed on an "AS IS" BASIS, |
| // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| // See the License for the specific language governing permissions and |
| // limitations under the License. |
| // |
| //////////////////////////////////////////////////////////////////////////////// |
| |
| package mx.messaging |
| { |
| |
| import flash.errors.IllegalOperationError; |
| import flash.events.TimerEvent; |
| import flash.utils.Timer; |
| |
| import mx.core.mx_internal; |
| import mx.events.PropertyChangeEvent; |
| import mx.logging.Log; |
| import mx.messaging.config.ConfigMap; |
| import mx.messaging.config.ServerConfig; |
| import mx.messaging.errors.InvalidDestinationError; |
| import mx.messaging.errors.MessagingError; |
| import mx.messaging.events.ChannelEvent; |
| import mx.messaging.events.ChannelFaultEvent; |
| import mx.messaging.events.MessageAckEvent; |
| import mx.messaging.events.MessageEvent; |
| import mx.messaging.events.MessageFaultEvent; |
| import mx.messaging.messages.AbstractMessage; |
| import mx.messaging.messages.AcknowledgeMessage; |
| import mx.messaging.messages.AsyncMessage; |
| import mx.messaging.messages.CommandMessage; |
| import mx.messaging.messages.ErrorMessage; |
| import mx.messaging.messages.IMessage; |
| import mx.resources.IResourceManager; |
| import mx.resources.ResourceManager; |
| |
| use namespace mx_internal; |
| |
| [ResourceBundle("messaging")] |
| |
| /** |
| * The AbstractProducer is the base class for the Producer and |
| * MultiTopicConsumer classes. |
| * You use these classes to push messages to the server. |
| * |
| * @langversion 3.0 |
| * @playerversion Flash 9 |
| * @playerversion AIR 1.1 |
| * @productversion BlazeDS 4 |
| * @productversion LCDS 3 |
| */ |
| public class AbstractProducer extends MessageAgent |
| { |
| //-------------------------------------------------------------------------- |
| // |
| // Constructor |
| // |
| //-------------------------------------------------------------------------- |
| |
| /** |
| * @private |
| */ |
| public function AbstractProducer() |
| { |
| super(); |
| } |
| |
| //-------------------------------------------------------------------------- |
| // |
| // Variables |
| // |
| //-------------------------------------------------------------------------- |
| |
| /** |
| * @private |
| * A connect message to use for (re)connect attempts which allows the underlying |
| * ChannelSet to de-dupe if multiple reconnects queue up at the channel layer. |
| */ |
| private var _connectMsg:CommandMessage; |
| |
| /** |
| * @private |
| * This is the current number of reconnect attempts that we've done. |
| */ |
| private var _currentAttempt:int; |
| |
| /** |
| * @private |
| * The timer used for reconnect attempts. |
| */ |
| private var _reconnectTimer:Timer; |
| |
| /** |
| * @private |
| * Indicates whether this agent should be connected or not. |
| */ |
| protected var _shouldBeConnected:Boolean; |
| |
| /** |
| * @private |
| */ |
| private var resourceManager:IResourceManager = |
| ResourceManager.getInstance(); |
| |
| //-------------------------------------------------------------------------- |
| // |
| // Properties |
| // |
| //-------------------------------------------------------------------------- |
| |
| //---------------------------------- |
| // autoConnect |
| //---------------------------------- |
| |
| /** |
| * @private |
| */ |
| private var _autoConnect:Boolean = true; |
| |
| [Bindable(event="propertyChange")] |
| |
| /** |
| * If <code>true</code> the Producer automatically connects to its destination the |
| * first time the <code>send()</code> method is called. |
| * If <code>false</code> then the <code>connect()</code> method must be called explicitly to |
| * establish a connection to the destination. |
| * By default this property is <code>true</code>, but applications that need to operate |
| * in an offline mode may set this to <code>false</code> to prevent the <code>send()</code> method |
| * from connecting implicitly. |
| * |
| * @langversion 3.0 |
| * @playerversion Flash 9 |
| * @playerversion AIR 1.1 |
| * @productversion BlazeDS 4 |
| * @productversion LCDS 3 |
| */ |
| public function get autoConnect():Boolean |
| { |
| return _autoConnect; |
| } |
| |
| /** |
| * @private |
| */ |
| public function set autoConnect(value:Boolean):void |
| { |
| if (_autoConnect != value) |
| { |
| var event:PropertyChangeEvent = PropertyChangeEvent.createUpdateEvent(this, "autoConnect", _autoConnect, value); |
| _autoConnect = value; |
| dispatchEvent(event); |
| } |
| } |
| |
| //---------------------------------- |
| // defaultHeaders |
| //---------------------------------- |
| |
| /** |
| * @private |
| */ |
| private var _defaultHeaders:Object; |
| |
| [Bindable(event="propertyChange")] |
| |
| /** |
| * The default headers to apply to messages sent by the Producer. |
| * Any default headers that do not exist in the message will be created. |
| * If the message already contains a matching header, the value in the |
| * message takes precedence and the default header value is ignored. |
| * |
| * @langversion 3.0 |
| * @playerversion Flash 9 |
| * @playerversion AIR 1.1 |
| * @productversion BlazeDS 4 |
| * @productversion LCDS 3 |
| */ |
| public function get defaultHeaders():Object |
| { |
| return _defaultHeaders; |
| } |
| |
| /** |
| * @private |
| */ |
| public function set defaultHeaders(value:Object):void |
| { |
| if (_defaultHeaders != value) |
| { |
| var event:PropertyChangeEvent = PropertyChangeEvent.createUpdateEvent(this, "defaultHeaders", _defaultHeaders, value); |
| _defaultHeaders = value; |
| dispatchEvent(event); |
| } |
| } |
| |
| //---------------------------------- |
| // priority |
| //---------------------------------- |
| |
| /** |
| * @private |
| */ |
| private var _priority:int = -1; |
| |
| [Bindable(event="propertyChange")] |
| |
| /** |
| * The default message priority for the messages sent by the Producer. The |
| * valid values are 0 to 9 (0 being lowest) and -1 means that the Producer |
| * does not have a priority set. Note that if the message already has a |
| * priority defined, that takes precedence over Producer's priority. |
| * |
| * @langversion 3.0 |
| * @playerversion Flash 9 |
| * @playerversion AIR 1.1 |
| * @productversion BlazeDS 4 |
| * @productversion LCDS 3 |
| */ |
| public function get priority():int |
| { |
| return _priority; |
| } |
| |
| /** |
| * @private |
| */ |
| public function set priority(value:int):void |
| { |
| if (_priority != value) |
| { |
| value = value < 0? 0 : value > 9? 9 : value; |
| |
| var event:PropertyChangeEvent = PropertyChangeEvent.createUpdateEvent(this, "priority", _priority, value); |
| _priority = value; |
| dispatchEvent(event); |
| } |
| } |
| |
| //---------------------------------- |
| // reconnectAttempts |
| //---------------------------------- |
| |
| /** |
| * @private |
| */ |
| private var _reconnectAttempts:int; |
| |
| [Bindable(event="propertyChange")] |
| |
| /** |
| * The number of reconnect attempts that the Producer makes in the event |
| * that the destination is unavailable or the connection to the destination closes. |
| * A value of -1 enables infinite attempts. |
| * A value of zero disables reconnect attempts. |
| * |
| * <p>Reconnect attempts are made at a constant rate according to the reconnect interval |
| * value. When a reconnect attempt is made if the underlying channel for the Producer is not |
| * connected or attempting to connect the channel will start a connect attempt. |
| * Subsequent Producer reconnect attempts that occur while the underlying |
| * channel connect attempt is outstanding are effectively ignored until |
| * the outstanding channel connect attempt succeeds or fails.</p> |
| * |
| * @see mx.messaging.Producer#reconnectInterval |
| * |
| * @langversion 3.0 |
| * @playerversion Flash 9 |
| * @playerversion AIR 1.1 |
| * @productversion BlazeDS 4 |
| * @productversion LCDS 3 |
| */ |
| public function get reconnectAttempts():int |
| { |
| return _reconnectAttempts; |
| } |
| |
| /** |
| * @private |
| */ |
| public function set reconnectAttempts(value:int):void |
| { |
| if (_reconnectAttempts != value) |
| { |
| if (value == 0) |
| stopReconnectTimer(); |
| |
| var event:PropertyChangeEvent = PropertyChangeEvent.createUpdateEvent(this, "reconnectAttempts", _reconnectAttempts, value); |
| _reconnectAttempts = value; |
| dispatchEvent(event); |
| } |
| } |
| |
| //---------------------------------- |
| // reconnectInterval |
| //---------------------------------- |
| |
| /** |
| * @private |
| */ |
| private var _reconnectInterval:int; |
| |
| [Bindable(event="propertyChange")] |
| |
| /** |
| * The number of milliseconds between reconnect attempts. |
| * If a Producer doesn't receive an acknowledgement for a connect |
| * attempt, it will wait the specified number of milliseconds before |
| * making a subsequent reconnect attempt. |
| * Setting the value to zero disables reconnect attempts. |
| * |
| * <p>Reconnect attempts are made at a constant rate according to this |
| * value. When a reconnect attempt is made if the underlying channel for the Producer is not |
| * connected or attempting to connect the channel will start a connect attempt. |
| * Subsequent Producer reconnect attempts that occur while the underlying |
| * channel connect attempt is outstanding are effectively ignored until |
| * the outstanding channel connect attempt succeeds or fails.</p> |
| * |
| * @see mx.messaging.Producer#reconnectInterval |
| * |
| * @throws ArgumentError If the assigned value is negative. |
| * |
| * @langversion 3.0 |
| * @playerversion Flash 9 |
| * @playerversion AIR 1.1 |
| * @productversion BlazeDS 4 |
| * @productversion LCDS 3 |
| */ |
| public function get reconnectInterval():int |
| { |
| return _reconnectInterval; |
| } |
| |
| /** |
| * @private |
| */ |
| public function set reconnectInterval(value:int):void |
| { |
| if (_reconnectInterval != value) |
| { |
| if (value < 0) |
| { |
| var message:String = resourceManager.getString( |
| "messaging", "reconnectIntervalNegative"); |
| throw new ArgumentError(message); |
| } |
| else if (value == 0) |
| { |
| stopReconnectTimer(); |
| } |
| else if (_reconnectTimer != null) |
| { |
| _reconnectTimer.delay = value; |
| } |
| |
| var event:PropertyChangeEvent = PropertyChangeEvent.createUpdateEvent(this, "reconnectInterval", _reconnectInterval, value); |
| _reconnectInterval = value; |
| dispatchEvent(event); |
| } |
| } |
| |
| //-------------------------------------------------------------------------- |
| // |
| // Overridden Public Methods |
| // |
| //-------------------------------------------------------------------------- |
| |
| /** |
| * @private |
| * Custom processing for message acknowledgments. |
| * Specifically, re/connect acknowledgements. |
| * |
| * @param ackMsg The AcknowledgeMessage. |
| * |
| * @param msg The original message. |
| */ |
| override public function acknowledge(ackMsg:AcknowledgeMessage, msg:IMessage):void |
| { |
| // Ignore acks for any outstanding messages that return after disconnect() is invoked. |
| if (_disconnectBarrier) |
| return; |
| |
| super.acknowledge(ackMsg, msg); |
| if (msg is CommandMessage && CommandMessage(msg).operation == CommandMessage.TRIGGER_CONNECT_OPERATION) |
| stopReconnectTimer(); |
| } |
| |
| /** |
| * @private |
| * The Producer suppresses ErrorMessage processing if the fault is for a connect |
| * attempt that is being retried. |
| * |
| * @param errMsg The ErrorMessage describing the fault. |
| * |
| * @param msg The original message. |
| */ |
| override public function fault(errMsg:ErrorMessage, msg:IMessage):void |
| { |
| internalFault(errMsg, msg); |
| } |
| |
| /** |
| * @private |
| * Custom processing to start up a reconnect timer if our channel is |
| * disconnected when we should be connected. |
| * |
| * @param event The ChannelEvent. |
| */ |
| override public function channelDisconnectHandler(event:ChannelEvent):void |
| { |
| super.channelDisconnectHandler(event); |
| |
| if (_shouldBeConnected && !event.rejected) |
| startReconnectTimer(); |
| } |
| |
| /** |
| * @private |
| * Custom processing to start up a reconnect timer if our channel faults |
| * when we should be connected. |
| * |
| * @param event The ChannelFaultEvent. |
| */ |
| override public function channelFaultHandler(event:ChannelFaultEvent):void |
| { |
| super.channelFaultHandler(event); |
| |
| if (_shouldBeConnected && !event.rejected && !event.channel.connected) |
| startReconnectTimer(); |
| } |
| |
| /** |
| * Disconnects the Producer from its remote destination. |
| * This method does not wait for outstanding network operations to complete. |
| * After invoking <code>disconnect()</code>, the Producer will report that it is not |
| * connected and it will not receive any outstanding message acknowledgements or faults. |
| * Disconnecting stops automatic reconnect attempts if they are running. |
| * |
| * @langversion 3.0 |
| * @playerversion Flash 9 |
| * @playerversion AIR 1.1 |
| * @productversion BlazeDS 4 |
| * @productversion LCDS 3 |
| */ |
| override public function disconnect():void |
| { |
| _shouldBeConnected = false; // Prevent reconnect attempts. |
| |
| stopReconnectTimer(); |
| |
| super.disconnect(); |
| } |
| |
| //-------------------------------------------------------------------------- |
| // |
| // Public Methods |
| // |
| //-------------------------------------------------------------------------- |
| |
| /** |
| * Connects the Producer to its target destination. |
| * When a connection is established the <code>connected</code> property will |
| * change to <code>true</code> and this property is bindable and generates |
| * <code>PropertyChangeEvent</code>s. |
| * The internal TRIGGER_CONNECT_OPERATION CommandMessage that is sent will result |
| * in an acknowledge or fault event depending upon whether the underlying channel |
| * establishes its connection. |
| * |
| * @throws mx.messaging.errors.InvalidDestinationError If no destination is set. |
| * |
| * @example |
| * <pre> |
| * var producer:Producer = new Producer(); |
| * producer.destination = "TestTopic"; |
| * producer.addEventListener(PropertyChangeEvent.PROPERTY_CHANGE, handleConnect); |
| * producer.connect(); |
| * </pre> |
| * |
| * @langversion 3.0 |
| * @playerversion Flash 9 |
| * @playerversion AIR 1.1 |
| * @productversion BlazeDS 4 |
| * @productversion LCDS 3 |
| */ |
| public function connect():void |
| { |
| if (!connected) |
| { |
| _shouldBeConnected = true; |
| |
| if (_connectMsg == null) |
| _connectMsg = buildConnectMessage(); |
| internalSend(_connectMsg, false); |
| } |
| } |
| |
| //-------------------------------------------------------------------------- |
| // |
| // Methods |
| // |
| //-------------------------------------------------------------------------- |
| |
| /** |
| * Sends the specified message to its destination. |
| * If the producer is being used for publish/subscribe messaging, only messages of type AsyncMessage |
| * should be sent unless a custom message type is being used and the |
| * message destination on the server has been configured to process the |
| * custom message type. |
| * |
| * @param message The Message to send. |
| * |
| * @throws mx.messaging.errors.InvalidDestinationError If no destination is set. |
| * |
| * @example |
| * <pre> |
| * var producer:Producer = new Producer(); |
| * producer.destination = "TestTopic"; |
| * var msg:AsyncMessage = new AsyncMessage(); |
| * msg.body = "test message"; |
| * producer.send(msg); |
| * </pre> |
| * |
| * |
| * @langversion 3.0 |
| * @playerversion Flash 9 |
| * @playerversion AIR 1.1 |
| * @productversion BlazeDS 4 |
| * @productversion LCDS 3 |
| */ |
| public function send(message:IMessage):void |
| { |
| if (!connected && autoConnect) |
| _shouldBeConnected = true; |
| |
| if (defaultHeaders != null) |
| { |
| for (var header:String in defaultHeaders) |
| { |
| if (!message.headers.hasOwnProperty(header)) |
| message.headers[header] = defaultHeaders[header]; |
| } |
| } |
| |
| if (!connected && !autoConnect) |
| { |
| _shouldBeConnected = false; |
| var errMsg2:ErrorMessage = new ErrorMessage(); |
| errMsg2.faultCode = "Client.Error.MessageSend"; |
| errMsg2.faultString = resourceManager.getString( |
| "messaging", "producerSendError"); |
| errMsg2.faultDetail = resourceManager.getString( |
| "messaging", "producerSendErrorDetails"); |
| errMsg2.correlationId = message.messageId; |
| internalFault(errMsg2, message, false, true); |
| } |
| else |
| { |
| if (Log.isInfo()) |
| _log.info("'{0}' {1} sending message '{2}'", id, _agentType, message.messageId); |
| internalSend(message); |
| } |
| } |
| |
| //-------------------------------------------------------------------------- |
| // |
| // Internal Methods |
| // |
| //-------------------------------------------------------------------------- |
| |
| /** |
| * @private |
| * The Producer suppresses ErrorMessage processing if the fault is for a connect |
| * attempt that is being retried. |
| * |
| * @param errMsg The ErrorMessage describing the fault. |
| * |
| * @param msg The original message. |
| * |
| * @param routeToStore currently not used. Previously was a flag used to |
| * indicate if the faulted message shoudl be stored offline to retry. |
| * |
| * @param ignoreDisconnectBarrier If true the message is faulted regardless |
| * of whether disconnect() has been invoked. Generally a disconnect() will |
| * suppress pending acks and faults. |
| */ |
| mx_internal function internalFault(errMsg:ErrorMessage, msg:IMessage, |
| routeToStore:Boolean = true, |
| ignoreDisconnectBarrier:Boolean = false):void |
| { |
| // Ignore faults for any outstanding messages that return after disconnect() is invoked. |
| if (_disconnectBarrier && !ignoreDisconnectBarrier) |
| return; |
| |
| if (msg is CommandMessage && CommandMessage(msg).operation == CommandMessage.TRIGGER_CONNECT_OPERATION) |
| { |
| if (_reconnectTimer == null) |
| { |
| // If this error correlates to our current connect message, |
| // we should no longer be connected. |
| if ((_connectMsg != null) && (errMsg.correlationId == _connectMsg.messageId)) |
| { |
| _shouldBeConnected = false; |
| // Improve the messaging. |
| var errMsg2:ErrorMessage = buildConnectErrorMessage(); |
| errMsg2.rootCause = errMsg.rootCause; |
| super.fault(errMsg2, msg); |
| } |
| else |
| { |
| super.fault(errMsg, msg); |
| } |
| } |
| // Else, suppress the fault dispatch because the reconnect timer |
| // is running and will generate a fault when it runs out of |
| // allowed reconnect attempts. |
| } |
| else |
| { |
| super.fault(errMsg, msg); |
| } |
| } |
| |
| //-------------------------------------------------------------------------- |
| // |
| // Protected Methods |
| // |
| //-------------------------------------------------------------------------- |
| |
| /** |
| * @private |
| * Attempt to reconnect. This can be called directly or |
| * from a Timer's event handler. |
| * |
| * @param event The timer event for reconnect attempts. |
| */ |
| protected function reconnect(event:TimerEvent):void |
| { |
| // If we're past our limit of attempts, fault out. |
| if ((_reconnectAttempts != -1) && |
| (_currentAttempt >= _reconnectAttempts)) |
| { |
| stopReconnectTimer(); |
| _shouldBeConnected = false; |
| fault(buildConnectErrorMessage(), _connectMsg); |
| return; |
| } |
| |
| if (Log.isDebug()) |
| _log.debug("'{0}' {1} trying to reconnect.", id, _agentType); |
| |
| _reconnectTimer.delay = _reconnectInterval; |
| _currentAttempt++; |
| if (_connectMsg == null) |
| _connectMsg = buildConnectMessage(); |
| |
| internalSend(_connectMsg, false); |
| } |
| |
| /** |
| * @private |
| * This method will start a timer which attempts to reconnect |
| * periodically. |
| */ |
| protected function startReconnectTimer():void |
| { |
| if (_shouldBeConnected && (_reconnectTimer == null)) |
| { |
| // If we're configured for reconnect set up the timer. |
| if ((_reconnectAttempts != 0) && (_reconnectInterval > 0)) |
| { |
| if (Log.isDebug()) |
| _log.debug("'{0}' {1} starting reconnect timer.", id, _agentType); |
| /* |
| * Initially, the timeout is set to 1 so we try to |
| * reconnect immediately (perhaps to a different channel). |
| * after that, it will poll at the configured time interval. |
| * |
| * @langversion 3.0 |
| * @playerversion Flash 9 |
| * @playerversion AIR 1.1 |
| * @productversion BlazeDS 4 |
| * @productversion LCDS 3 |
| */ |
| _reconnectTimer = new Timer(1); |
| _reconnectTimer.addEventListener(TimerEvent.TIMER, reconnect); |
| _reconnectTimer.start(); |
| _currentAttempt = 0; |
| } |
| } |
| } |
| |
| /** |
| * @private |
| * Stops a reconnect timer if one is running. |
| */ |
| protected function stopReconnectTimer():void |
| { |
| if (_reconnectTimer != null) |
| { |
| if (Log.isDebug()) |
| _log.debug("'{0}' {1} stopping reconnect timer.", id, _agentType); |
| |
| _reconnectTimer.removeEventListener(TimerEvent.TIMER, reconnect); |
| _reconnectTimer.reset(); |
| _reconnectTimer = null; |
| } |
| } |
| |
| //-------------------------------------------------------------------------- |
| // |
| // Private Methods |
| // |
| //-------------------------------------------------------------------------- |
| |
| /** |
| * @private |
| * Builds an ErrorMessage for a failed connect attempt. |
| * |
| * @return The ErrorMessage. |
| */ |
| private function buildConnectErrorMessage():ErrorMessage |
| { |
| var errMsg:ErrorMessage = new ErrorMessage(); |
| errMsg.faultCode = "Client.Error.Connect"; |
| errMsg.faultString = resourceManager.getString( |
| "messaging", "producerConnectError"); |
| errMsg.faultDetail = resourceManager.getString( |
| "messaging", "failedToConnect"); |
| errMsg.correlationId = _connectMsg.messageId; |
| return errMsg; |
| } |
| |
| /** |
| * @private |
| * Builds a 'connect' message to use for a connect attempt. |
| * |
| * @return The 'connect' CommandMessage. |
| */ |
| private function buildConnectMessage():CommandMessage |
| { |
| var msg:CommandMessage = new CommandMessage(); |
| msg.operation = CommandMessage.TRIGGER_CONNECT_OPERATION; |
| msg.clientId = clientId; |
| msg.destination = destination; |
| return msg; |
| } |
| |
| } |
| |
| } |