| /* |
| * |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| * |
| */ |
| using System; |
| using System.Collections; |
| using System.Text.RegularExpressions; |
| using System.Threading; |
| using log4net; |
| using Apache.Qpid.Buffer; |
| using Apache.Qpid.Client.Message; |
| using Apache.Qpid.Client.Util; |
| using Apache.Qpid.Collections; |
| using Apache.Qpid.Framing; |
| using Apache.Qpid.Messaging; |
| using Apache.Qpid.Protocol; |
| |
| namespace Apache.Qpid.Client |
| { |
| /// <summary> |
| /// <p/><table id="crc"><caption>CRC Card</caption> |
| /// <tr><th> Responsibilities <th> Collaborations |
| /// <tr><td> Declare queues. |
| /// <tr><td> Declare exchanges. |
| /// <tr><td> Bind queues to exchanges. |
| /// <tr><td> Create messages. |
| /// <tr><td> Set up message consumers on the channel. |
| /// <tr><td> Set up message producers on the channel. |
| /// <tr><td> Commit the current transaction. |
| /// <tr><td> Roll-back the current transaction. |
| /// <tr><td> Close the channel. |
| /// </table> |
| /// </summary> |
| public class AmqChannel : Closeable, IChannel |
| { |
| private static readonly ILog _logger = LogManager.GetLogger(typeof(AmqChannel)); |
| |
| internal const int BASIC_CONTENT_TYPE = 60; |
| |
| public const int DEFAULT_PREFETCH_HIGH_MARK = 5000; |
| |
| public const int DEFAULT_PREFETCH_LOW_MARK = 2500; |
| |
| private static int _nextSessionNumber = 0; |
| |
| private AMQConnection _connection; |
| |
| private int _sessionNumber; |
| |
| private bool _suspended; |
| |
| private object _suspensionLock = new object(); |
| |
| // Used in the consume method. We generate the consume tag on the client so that we can use the nowait feature. |
| private int _nextConsumerNumber = 1; |
| |
| private bool _transacted; |
| |
| private AcknowledgeMode _acknowledgeMode; |
| |
| private ushort _channelId; |
| |
| private int _defaultPrefetchHighMark = DEFAULT_PREFETCH_HIGH_MARK; |
| |
| private int _defaultPrefetchLowMark = DEFAULT_PREFETCH_LOW_MARK; |
| |
| private FlowControlQueue _queue; |
| |
| private Dispatcher _dispatcher; |
| |
| private MessageFactoryRegistry _messageFactoryRegistry; |
| |
| /// <summary> Holds all of the producers created by this channel. </summary> |
| private Hashtable _producers = Hashtable.Synchronized(new Hashtable()); |
| |
| /// <summary> Holds all of the consumers created by this channel. </summary> |
| private Hashtable _consumers = Hashtable.Synchronized(new Hashtable()); |
| |
| private ArrayList _replayFrames = new ArrayList(); |
| |
| /// <summary> |
| /// The counter of the _next producer id. This id is generated by the session and used only to allow the |
| /// producer to identify itself to the session when deregistering itself. |
| /// |
| /// Access to this id does not require to be synchronized since according to the JMS specification only one |
| /// thread of control is allowed to create producers for any given session instance. |
| /// </summary> |
| private long _nextProducerId; |
| |
| /// <summary> |
| /// Initializes a new instance of the <see cref="AmqChannel"/> class. |
| /// </summary> |
| /// <param name="con">The connection.</param> |
| /// <param name="channelId">The channel id.</param> |
| /// <param name="transacted">if set to <c>true</c> [transacted].</param> |
| /// <param name="acknowledgeMode">The acknowledge mode.</param> |
| /// <param name="defaultPrefetchHigh">Default prefetch high value</param> |
| /// <param name="defaultPrefetchLow">Default prefetch low value</param> |
| internal AmqChannel(AMQConnection con, ushort channelId, bool transacted, AcknowledgeMode acknowledgeMode, |
| int defaultPrefetchHigh, int defaultPrefetchLow) |
| : this() |
| { |
| _sessionNumber = Interlocked.Increment(ref _nextSessionNumber); |
| _connection = con; |
| _transacted = transacted; |
| |
| if ( transacted ) |
| { |
| _acknowledgeMode = AcknowledgeMode.SessionTransacted; |
| } |
| else |
| { |
| _acknowledgeMode = acknowledgeMode; |
| } |
| |
| _channelId = channelId; |
| _defaultPrefetchHighMark = defaultPrefetchHigh; |
| _defaultPrefetchLowMark = defaultPrefetchLow; |
| |
| if ( _acknowledgeMode == AcknowledgeMode.NoAcknowledge ) |
| { |
| _queue = new FlowControlQueue(_defaultPrefetchLowMark, _defaultPrefetchHighMark, |
| new ThresholdMethod(OnPrefetchLowMark), |
| new ThresholdMethod(OnPrefetchHighMark)); |
| } |
| else |
| { |
| // low and upper are the same |
| _queue = new FlowControlQueue(_defaultPrefetchHighMark, _defaultPrefetchHighMark, |
| null, null); |
| } |
| } |
| |
| private AmqChannel() |
| { |
| _messageFactoryRegistry = MessageFactoryRegistry.NewDefaultRegistry(); |
| } |
| |
| /// <summary> |
| /// Acknowledge mode for messages received. |
| /// </summary> |
| public AcknowledgeMode AcknowledgeMode |
| { |
| get |
| { |
| CheckNotClosed(); |
| return _acknowledgeMode; |
| } |
| } |
| |
| /// <summary> |
| /// True if the channel should use transactions. |
| /// </summary> |
| public bool Transacted |
| { |
| get |
| { |
| CheckNotClosed(); |
| return _transacted; |
| } |
| } |
| |
| /// <summary> |
| /// Prefetch value to be used as the default for |
| /// consumers created on this channel. |
| /// </summary> |
| public int DefaultPrefetch |
| { |
| get { return DefaultPrefetchHigh; } |
| } |
| |
| /// <summary> |
| /// Prefetch low value to be used as the default for |
| /// consumers created on this channel. |
| /// </summary> |
| public int DefaultPrefetchLow |
| { |
| get { return _defaultPrefetchLowMark; } |
| } |
| |
| /// <summary> |
| /// Prefetch high value to be used as the default for |
| /// consumers created on this channel. |
| /// </summary> |
| public int DefaultPrefetchHigh |
| { |
| get { return _defaultPrefetchHighMark; } |
| } |
| |
| /// <summary> Indicates whether or not this channel is currently suspended. </summary> |
| public bool IsSuspended |
| { |
| get { return _suspended; } |
| } |
| |
| /// <summary> Provides the channels number within the the connection. </summary> |
| public ushort ChannelId |
| { |
| get { return _channelId; } |
| } |
| |
| /// <summary> Provides the connection that this channel runs over. </summary> |
| public AMQConnection Connection |
| { |
| get { return _connection; } |
| } |
| |
| /// <summary> |
| /// Declare a new exchange. |
| /// </summary> |
| /// <param name="exchangeName">Name of the exchange</param> |
| /// <param name="exchangeClass">Class of the exchange, from <see cref="ExchangeClassConstants"/></param> |
| public void DeclareExchange(String exchangeName, String exchangeClass) |
| { |
| _logger.Debug(string.Format("DeclareExchange vame={0} exchangeClass={1}", exchangeName, exchangeClass)); |
| |
| DeclareExchange(_channelId, 0, exchangeName, exchangeClass, false, false, false, false, true, null); |
| } |
| |
| /// <summary> |
| /// Declare a new exchange using the default exchange class. |
| /// </summary> |
| /// <param name="exchangeName">Name of the exchange</param> |
| public void DeleteExchange(string exchangeName) |
| { |
| throw new NotImplementedException(); |
| } |
| |
| /// <summary> |
| /// Declare a new queue with the specified set of arguments. |
| /// </summary> |
| /// <param name="queueName">Name of the queue</param> |
| /// <param name="isDurable">True if the queue should be durable</param> |
| /// <param name="isExclusive">True if the queue should be exclusive to this channel</param> |
| /// <param name="isAutoDelete">True if the queue should be deleted when the channel closes</param> |
| public void DeclareQueue(string queueName, bool isDurable, bool isExclusive, bool isAutoDelete) |
| { |
| DoQueueDeclare(queueName, isDurable, isExclusive, isAutoDelete); |
| } |
| |
| /// <summary> |
| /// Delete a queue with the specifies arguments. |
| /// </summary> |
| /// <param name="queueName">Name of the queue to delete</param> |
| /// <param name="ifUnused">If true, the queue will not deleted if it has no consumers</param> |
| /// <param name="ifEmpty">If true, the queue will not deleted if it has no messages</param> |
| /// <param name="noWait">If true, the server will not respond to the method</param> |
| public void DeleteQueue(string queueName, bool ifUnused, bool ifEmpty, bool noWait) |
| { |
| DoDeleteQueue(queueName, ifUnused, ifEmpty, noWait); |
| } |
| |
| /// <summary> |
| /// Generate a new Unique name to use for a queue. |
| /// </summary> |
| /// <returns>A unique name to this channel</returns> |
| public string GenerateUniqueName() |
| { |
| string result = _connection.ProtocolSession.GenerateQueueName(); |
| return Regex.Replace(result, "[^a-z0-9_]", "_"); |
| } |
| |
| /// <summary> |
| /// Removes all messages from a queue. |
| /// </summary> |
| /// <param name="queueName">Name of the queue to delete</param> |
| /// <param name="noWait">If true, the server will not respond to the method</param> |
| public void PurgeQueue(string queueName, bool noWait) |
| { |
| DoPurgeQueue(queueName, noWait); |
| } |
| |
| /// <summary> |
| /// Bind a queue to the specified exchange. |
| /// </summary> |
| /// <param name="queueName">Name of queue to bind</param> |
| /// <param name="exchangeName">Name of exchange to bind to</param> |
| /// <param name="routingKey">Routing key</param> |
| public void Bind(string queueName, string exchangeName, string routingKey) |
| { |
| DoBind(queueName, exchangeName, routingKey, new FieldTable()); |
| } |
| |
| /// <summary> |
| /// Bind a queue to the specified exchange. |
| /// </summary> |
| /// <param name="queueName">Name of queue to bind</param> |
| /// <param name="exchangeName">Name of exchange to bind to</param> |
| /// <param name="routingKey">Routing key</param> |
| /// <param name="args">Table of arguments for the binding. Used to bind with a Headers Exchange</param> |
| public void Bind(string queueName, string exchangeName, string routingKey, IFieldTable args) |
| { |
| DoBind(queueName, exchangeName, routingKey, (FieldTable)args); |
| } |
| |
| /// <summary> |
| /// Create a new empty message with no body. |
| /// </summary> |
| /// <returns>The new message</returns> |
| public IMessage CreateMessage() |
| { |
| return (IBytesMessage)_messageFactoryRegistry.CreateMessage("application/octet-stream"); |
| } |
| |
| /// <summary> |
| /// Create a new message of the specified MIME type. |
| /// </summary> |
| /// <param name="mimeType">The mime type to create</param> |
| /// <returns>The new message</returns> |
| public IMessage CreateMessage(string mimeType) |
| { |
| return _messageFactoryRegistry.CreateMessage(mimeType); |
| } |
| |
| /// <summary> |
| /// Creates a new message for bytes (application/octet-stream). |
| /// </summary> |
| /// <returns>The new message</returns> |
| public IBytesMessage CreateBytesMessage() |
| { |
| return (IBytesMessage)_messageFactoryRegistry.CreateMessage("application/octet-stream"); |
| } |
| |
| /// <summary> |
| /// Creates a new text message (text/plain) with empty content. |
| /// </summary> |
| /// <returns>The new message</returns> |
| public ITextMessage CreateTextMessage() |
| { |
| return CreateTextMessage(String.Empty); |
| } |
| |
| /// <summary> |
| /// Creates a new text message (text/plain) with a body. |
| /// </summary> |
| /// <param name="text">Initial body of the message</param> |
| /// <returns>The new message</returns> |
| public ITextMessage CreateTextMessage(string text) |
| { |
| ITextMessage msg = (ITextMessage)_messageFactoryRegistry.CreateMessage("text/plain"); |
| msg.Text = text; |
| return msg; |
| } |
| |
| /// <summary> |
| /// Creates a new Consumer using the builder pattern. |
| /// </summary> |
| /// <param name="queueName">Name of queue to receive messages from</param> |
| /// <returns>The builder object</returns> |
| public MessageConsumerBuilder CreateConsumerBuilder(string queueName) |
| { |
| return new MessageConsumerBuilder(this, queueName); |
| } |
| |
| /// <summary> |
| /// Creates a new consumer. |
| /// </summary> |
| /// <param name="queueName">Name of queue to receive messages from</param> |
| /// <param name="prefetchLow">Low prefetch value</param> |
| /// <param name="prefetchHigh">High prefetch value</param> |
| /// <param name="noLocal">If true, messages sent on this channel will not be received by this consumer</param> |
| /// <param name="exclusive">If true, the consumer opens the queue in exclusive mode</param> |
| /// <returns>The new consumer</returns> |
| public IMessageConsumer CreateConsumer(string queueName, |
| int prefetchLow, |
| int prefetchHigh, |
| bool noLocal, |
| bool exclusive) |
| { |
| _logger.Debug(String.Format("CreateConsumer queueName={0} prefetchLow={1} prefetchHigh={2} noLocal={3} exclusive={4} ", |
| queueName, prefetchLow, prefetchHigh, noLocal, exclusive)); |
| |
| return CreateConsumerImpl(queueName, prefetchLow, prefetchHigh, noLocal, exclusive); |
| } |
| |
| /// <summary> |
| /// Unsubscribe from a queue. |
| /// </summary> |
| /// <param name="subscriptionName">Subscription name</param> |
| public void Unsubscribe(String name) |
| { |
| throw new NotImplementedException(); |
| } |
| |
| /// <summary> |
| /// Create a new message publisher using the builder pattern. |
| /// </summary> |
| /// <returns>The builder object</returns> |
| public MessagePublisherBuilder CreatePublisherBuilder() |
| { |
| return new MessagePublisherBuilder(this); |
| } |
| |
| /// <summary> |
| /// Create a new message publisher. |
| /// </summary> |
| /// <param name="exchangeName">Name of exchange to publish to</param> |
| /// <param name="routingKey">Routing key</param> |
| /// <param name="deliveryMode">Default delivery mode</param> |
| /// <param name="timeToLive">Default TTL time of messages</param> |
| /// <param name="immediate">If true, sent immediately</param> |
| /// <param name="mandatory">If true, the broker will return an error |
| /// (as a connection exception) if the message cannot be delivered</param> |
| /// <param name="priority">Default message priority</param> |
| /// <returns>The new message publisher</returns> |
| public IMessagePublisher CreatePublisher(string exchangeName, string routingKey, DeliveryMode deliveryMode, |
| long timeToLive, bool immediate, bool mandatory, int priority) |
| { |
| _logger.Debug(string.Format("Using new CreatePublisher exchangeName={0}, exchangeClass={1} routingKey={2}", |
| exchangeName, "none", routingKey)); |
| |
| return CreateProducerImpl(exchangeName, routingKey, deliveryMode, |
| timeToLive, immediate, mandatory, priority); |
| } |
| |
| /// <summary> |
| /// Recover after transaction failure. |
| /// </summary> |
| /// <remarks>The 0-8 protocol does not support this, not implemented exception will always be thrown.</remarks> |
| public void Recover() |
| { |
| CheckNotClosed(); |
| CheckNotTransacted(); |
| |
| throw new NotImplementedException(); |
| } |
| |
| /// <summary> |
| /// Commit the transaction. |
| /// </summary> |
| public void Commit() |
| { |
| // FIXME: Fail over safety. Needs FailoverSupport? |
| CheckNotClosed(); |
| CheckTransacted(); // throws IllegalOperationException if not a transacted session |
| |
| try |
| { |
| // Acknowledge up to message last delivered (if any) for each consumer. |
| // Need to send ack for messages delivered to consumers so far. |
| foreach (BasicMessageConsumer consumer in _consumers.Values) |
| { |
| // Sends acknowledgement to server. |
| consumer.AcknowledgeDelivered(); |
| } |
| |
| // Commits outstanding messages sent and outstanding acknowledgements. |
| _connection.ConvenientProtocolWriter.SyncWrite(TxCommitBody.CreateAMQFrame(_channelId), typeof(TxCommitOkBody)); |
| } |
| catch (AMQException e) |
| { |
| throw new QpidException("Failed to commit", e); |
| } |
| } |
| |
| /// <summary> |
| /// Rollback the transaction. |
| /// </summary> |
| public void Rollback() |
| { |
| lock (_suspensionLock) |
| { |
| CheckTransacted(); // throws IllegalOperationException if not a transacted session |
| |
| try |
| { |
| bool suspended = IsSuspended; |
| if (!suspended) |
| { |
| Suspend(true); |
| } |
| |
| // Reject up to message last delivered (if any) for each consumer. |
| // Need to send reject for messages delivered to consumers so far. |
| foreach (BasicMessageConsumer consumer in _consumers.Values) |
| { |
| // Sends acknowledgement to server. |
| consumer.RejectUnacked(); |
| } |
| |
| _connection.ConvenientProtocolWriter.SyncWrite(TxRollbackBody.CreateAMQFrame(_channelId), typeof(TxRollbackOkBody)); |
| |
| if ( !suspended ) |
| { |
| Suspend(false); |
| } |
| } |
| catch (AMQException e) |
| { |
| throw new QpidException("Failed to rollback", e); |
| } |
| } |
| } |
| |
| /// <summary> |
| /// Create a disconnected channel that will fault |
| /// for most things, but is useful for testing |
| /// </summary> |
| /// <returns>A new disconnected channel</returns> |
| public static IChannel CreateDisconnectedChannel() |
| { |
| return new AmqChannel(); |
| } |
| |
| public override void Close() |
| { |
| lock (_connection.FailoverMutex) |
| { |
| // We must close down all producers and consumers in an orderly fashion. This is the only method |
| // that can be called from a different thread of control from the one controlling the session |
| |
| lock (_closingLock) |
| { |
| SetClosed(); |
| |
| // we pass null since this is not an error case |
| CloseProducersAndConsumers(null); |
| |
| try |
| { |
| _connection.CloseSession(this); |
| } |
| catch (AMQException e) |
| { |
| throw new QpidException("Error closing session: " + e); |
| } |
| finally |
| { |
| _connection.DeregisterSession(_channelId); |
| } |
| } |
| } |
| } |
| |
| /** |
| * Called when the server initiates the closure of the session |
| * unilaterally. |
| * @param e the exception that caused this session to be closed. Null causes the |
| */ |
| public void ClosedWithException(Exception e) |
| { |
| lock (_connection.FailoverMutex) |
| { |
| // An AMQException has an error code and message already and will be passed in when closure occurs as a |
| // result of a channel close request |
| SetClosed(); |
| AMQException amqe; |
| |
| if (e is AMQException) |
| { |
| amqe = (AMQException) e; |
| } |
| else |
| { |
| amqe = new AMQException("Closing session forcibly", e); |
| } |
| |
| _connection.DeregisterSession(_channelId); |
| CloseProducersAndConsumers(amqe); |
| } |
| } |
| |
| public void MessageReceived(UnprocessedMessage message) |
| { |
| if (_logger.IsDebugEnabled) |
| { |
| _logger.Debug("Message received in session with channel id " + _channelId); |
| } |
| |
| if ( message.DeliverBody == null ) |
| { |
| ReturnBouncedMessage(message); |
| } |
| else |
| { |
| _queue.Enqueue(message); |
| } |
| } |
| |
| public void Dispose() |
| { |
| Close(); |
| } |
| |
| private void SetClosed() |
| { |
| Interlocked.Exchange(ref _closed, CLOSED); |
| } |
| |
| /// <summary> |
| /// Close all producers or consumers. This is called either in the error case or when closing the session normally. |
| /// <param name="amqe">the exception, may be null to indicate no error has occurred</param> |
| /// |
| private void CloseProducersAndConsumers(AMQException amqe) |
| { |
| try |
| { |
| CloseProducers(); |
| } |
| catch (QpidException e) |
| { |
| _logger.Error("Error closing session: " + e, e); |
| } |
| try |
| { |
| CloseConsumers(amqe); |
| } |
| catch (QpidException e) |
| { |
| _logger.Error("Error closing session: " + e, e); |
| } |
| } |
| |
| /// <summary> |
| /// Called to close message producers cleanly. This may or may <b>not</b> be as a result of an error. There is |
| /// currently no way of propagating errors to message producers (this is a JMS limitation). |
| /// </summary> |
| private void CloseProducers() |
| { |
| _logger.Debug("Closing producers on session " + this); |
| |
| // we need to clone the list of producers since the close() method updates the _producers collection |
| // which would result in a concurrent modification exception |
| ArrayList clonedProducers = new ArrayList(_producers.Values); |
| |
| foreach (BasicMessageProducer prod in clonedProducers) |
| { |
| _logger.Debug("Closing producer " + prod); |
| prod.Close(); |
| } |
| |
| // at this point the _producers map is empty |
| } |
| |
| /// <summary> |
| /// Called to close message consumers cleanly. This may or may <b>not</b> be as a result of an error. |
| /// <param name="error">not null if this is a result of an error occurring at the connection level</param> |
| private void CloseConsumers(Exception error) |
| { |
| if (_dispatcher != null) |
| { |
| _dispatcher.StopDispatcher(); |
| } |
| |
| // we need to clone the list of consumers since the close() method updates the _consumers collection |
| // which would result in a concurrent modification exception |
| ArrayList clonedConsumers = new ArrayList(_consumers.Values); |
| |
| foreach (BasicMessageConsumer con in clonedConsumers) |
| { |
| if (error != null) |
| { |
| con.NotifyError(error); |
| } |
| else |
| { |
| con.Close(); |
| } |
| } |
| |
| // at this point the _consumers map will be empty |
| } |
| |
| private IMessagePublisher CreateProducerImpl(string exchangeName, string routingKey, |
| DeliveryMode deliveryMode, |
| long timeToLive, bool immediate, bool mandatory, int priority) |
| { |
| lock (_closingLock) |
| { |
| CheckNotClosed(); |
| |
| try |
| { |
| return new BasicMessageProducer(exchangeName, routingKey, _transacted, _channelId, |
| this, GetNextProducerId(), |
| deliveryMode, timeToLive, immediate, mandatory, priority); |
| } |
| catch (AMQException e) |
| { |
| _logger.Error("Error creating message producer: " + e, e); |
| throw new QpidException("Error creating message producer", e); |
| } |
| } |
| } |
| |
| /// <summary> Creates a message consumer on this channel.</summary> |
| /// |
| /// <param name="queueName">The name of the queue to attach the consumer to.</param> |
| /// <param name="prefetchLow">The pre-fetch buffer low-water mark.</param> |
| /// <param name="prefetchHigh">The pre-fetch buffer high-water mark.</param> |
| /// <param name="noLocal">The no-local flag, <tt>true</tt> means that the consumer does not receive messages sent on this channel.</param> |
| /// <param name="exclusive">The exclusive flag, <tt>true</tt> gives this consumer exclusive receive access to the queue.</param> |
| /// |
| /// <return>The message consumer.</return> |
| private IMessageConsumer CreateConsumerImpl(string queueName, |
| int prefetchLow, |
| int prefetchHigh, |
| bool noLocal, |
| bool exclusive) |
| { |
| lock (_closingLock) |
| { |
| CheckNotClosed(); |
| |
| BasicMessageConsumer consumer = new BasicMessageConsumer(_channelId, queueName, noLocal, |
| _messageFactoryRegistry, this, |
| prefetchHigh, prefetchLow, exclusive); |
| try |
| { |
| RegisterConsumer(consumer); |
| } |
| catch (AMQException e) |
| { |
| throw new QpidException("Error registering consumer: " + e, e); |
| } |
| |
| return consumer; |
| } |
| } |
| |
| private void CheckTransacted() |
| { |
| if (!Transacted) |
| { |
| throw new InvalidOperationException("Channel is not transacted"); |
| } |
| } |
| |
| private void CheckNotTransacted() |
| { |
| if (Transacted) |
| { |
| throw new InvalidOperationException("Channel is transacted"); |
| } |
| } |
| |
| internal void Start() |
| { |
| _dispatcher = new Dispatcher(this); |
| Thread dispatcherThread = new Thread(new ThreadStart(_dispatcher.RunDispatcher)); |
| dispatcherThread.IsBackground = true; |
| dispatcherThread.Start(); |
| } |
| |
| internal void Stop() |
| { |
| Suspend(true); |
| if (_dispatcher != null) |
| { |
| _dispatcher.StopDispatcher(); |
| } |
| } |
| |
| internal void RegisterConsumer(string consumerTag, IMessageConsumer consumer) |
| { |
| _consumers[consumerTag] = consumer; |
| } |
| |
| /// <summary> |
| /// Called by the MessageConsumer when closing, to deregister the consumer from the |
| /// map from consumerTag to consumer instance. |
| /// </summary> |
| /// <param name="consumerTag">the consumer tag, that was broker-generated</param> |
| internal void DeregisterConsumer(string consumerTag) |
| { |
| _consumers.Remove(consumerTag); |
| } |
| |
| internal void RegisterProducer(long producerId, IMessagePublisher publisher) |
| { |
| _producers[producerId] = publisher; |
| } |
| |
| internal void DeregisterProducer(long producerId) |
| { |
| _producers.Remove(producerId); |
| } |
| |
| private long GetNextProducerId() |
| { |
| return ++_nextProducerId; |
| } |
| |
| /** |
| * Called to mark the session as being closed. Useful when the session needs to be made invalid, e.g. after |
| * failover when the client has veoted resubscription. |
| * |
| * The caller of this method must already hold the failover mutex. |
| */ |
| internal void MarkClosed() |
| { |
| SetClosed(); |
| _connection.DeregisterSession(_channelId); |
| MarkClosedProducersAndConsumers(); |
| } |
| |
| private void MarkClosedProducersAndConsumers() |
| { |
| try |
| { |
| // no need for a markClosed* method in this case since there is no protocol traffic closing a producer |
| CloseProducers(); |
| } |
| catch (QpidException e) |
| { |
| _logger.Error("Error closing session: " + e, e); |
| } |
| try |
| { |
| MarkClosedConsumers(); |
| } |
| catch (QpidException e) |
| { |
| _logger.Error("Error closing session: " + e, e); |
| } |
| } |
| |
| private void MarkClosedConsumers() |
| { |
| if (_dispatcher != null) |
| { |
| _dispatcher.StopDispatcher(); |
| } |
| // we need to clone the list of consumers since the close() method updates the _consumers collection |
| // which would result in a concurrent modification exception |
| ArrayList clonedConsumers = new ArrayList(_consumers.Values); |
| |
| foreach (BasicMessageConsumer consumer in clonedConsumers) |
| { |
| consumer.MarkClosed(); |
| } |
| // at this point the _consumers map will be empty |
| } |
| |
| private void DoPurgeQueue(string queueName, bool noWait) |
| { |
| try |
| { |
| _logger.DebugFormat("PurgeQueue {0}", queueName); |
| |
| AMQFrame purgeQueue = QueuePurgeBody.CreateAMQFrame(_channelId, 0, queueName, noWait); |
| |
| if (noWait) |
| _connection.ProtocolWriter.Write(purgeQueue); |
| else |
| _connection.ConvenientProtocolWriter.SyncWrite(purgeQueue, typeof(QueuePurgeOkBody)); |
| } |
| catch (AMQException) |
| { |
| throw; |
| } |
| } |
| |
| /** |
| * Replays frame on fail over. |
| * |
| * @throws AMQException |
| */ |
| internal void ReplayOnFailOver() |
| { |
| _logger.Debug(string.Format("Replaying frames for channel {0}", _channelId)); |
| foreach (AMQFrame frame in _replayFrames) |
| { |
| _logger.Debug(string.Format("Replaying frame=[{0}]", frame)); |
| _connection.ProtocolWriter.Write(frame); |
| } |
| } |
| |
| /// <summary> |
| /// Callers must hold the failover mutex before calling this method. |
| /// </summary> |
| /// <param name="consumer"></param> |
| private void RegisterConsumer(BasicMessageConsumer consumer) |
| { |
| // Need to generate a consumer tag on the client so we can exploit the nowait flag. |
| String tag = string.Format("{0}-{1}", _sessionNumber, _nextConsumerNumber++); |
| consumer.ConsumerTag = tag; |
| _consumers.Add(tag, consumer); |
| |
| String consumerTag = ConsumeFromQueue(consumer.QueueName, consumer.NoLocal, |
| consumer.Exclusive, consumer.AcknowledgeMode, tag); |
| |
| } |
| |
| internal void DoBind(string queueName, string exchangeName, string routingKey, FieldTable args) |
| { |
| |
| _logger.Debug(string.Format("QueueBind queueName={0} exchangeName={1} routingKey={2}, arg={3}", |
| queueName, exchangeName, routingKey, args)); |
| |
| AMQFrame queueBind = QueueBindBody.CreateAMQFrame(_channelId, 0, |
| queueName, exchangeName, |
| routingKey, false, args); |
| |
| |
| lock (_connection.FailoverMutex) |
| { |
| _connection.ConvenientProtocolWriter.SyncWrite(queueBind, typeof(QueueBindOkBody)); |
| } |
| // AS FIXME: wasnae me |
| _replayFrames.Add(QueueBindBody.CreateAMQFrame(_channelId, 0, |
| queueName, exchangeName, |
| routingKey, true, args)); |
| } |
| |
| private String ConsumeFromQueue(String queueName, bool noLocal, bool exclusive, AcknowledgeMode acknowledgeMode, String tag) |
| { |
| |
| AMQFrame basicConsume = BasicConsumeBody.CreateAMQFrame(_channelId, 0, |
| queueName, tag, noLocal, |
| acknowledgeMode == AcknowledgeMode.NoAcknowledge, |
| exclusive, true, new FieldTable()); |
| |
| _replayFrames.Add(basicConsume); |
| |
| _connection.ProtocolWriter.Write(basicConsume); |
| return tag; |
| } |
| |
| private void DoDeleteQueue(string queueName, bool ifUnused, bool ifEmpty, bool noWait) |
| { |
| try |
| { |
| _logger.Debug(string.Format("DeleteQueue name={0}", queueName)); |
| |
| AMQFrame queueDelete = QueueDeleteBody.CreateAMQFrame(_channelId, 0, queueName, ifUnused, ifEmpty, noWait); |
| |
| if (noWait) |
| { |
| _connection.ProtocolWriter.Write(queueDelete); |
| } |
| else |
| { |
| _connection.ConvenientProtocolWriter.SyncWrite(queueDelete, typeof(QueueDeleteOkBody)); |
| } |
| // AS FIXME: wasnae me |
| _replayFrames.Add(QueueDeleteBody.CreateAMQFrame(_channelId, 0, queueName, ifUnused, ifEmpty, true)); |
| } |
| catch (AMQException) |
| { |
| throw; |
| } |
| } |
| |
| private void DoQueueDeclare(string queueName, bool isDurable, bool isExclusive, bool isAutoDelete) |
| { |
| _logger.Debug(string.Format("DeclareQueue name={0} durable={1} exclusive={2}, auto-delete={3}", |
| queueName, isDurable, isExclusive, isAutoDelete)); |
| |
| AMQFrame queueDeclare = QueueDeclareBody.CreateAMQFrame(_channelId, 0, queueName, false, isDurable, isExclusive, |
| isAutoDelete, false, null); |
| |
| |
| lock (_connection.FailoverMutex) |
| { |
| _connection.ConvenientProtocolWriter.SyncWrite(queueDeclare, typeof(QueueDeclareOkBody)); |
| } |
| // AS FIXME: wasnae me |
| _replayFrames.Add(QueueDeclareBody.CreateAMQFrame(_channelId, 0, queueName, false, isDurable, isExclusive, |
| isAutoDelete, true, null)); |
| } |
| |
| // AMQP-level method. |
| private void DeclareExchange(ushort channelId, ushort ticket, string exchangeName, |
| string exchangeClass, bool passive, bool durable, |
| bool autoDelete, bool xinternal, bool noWait, FieldTable args) |
| { |
| _logger.Debug(String.Format("DeclareExchange channelId={0} exchangeName={1} exchangeClass={2}", |
| _channelId, exchangeName, exchangeClass)); |
| |
| AMQFrame declareExchange = ExchangeDeclareBody.CreateAMQFrame(channelId, ticket, exchangeName, exchangeClass, passive, |
| durable, autoDelete, xinternal, noWait, args); |
| |
| if (noWait) |
| { |
| lock (_connection.FailoverMutex) |
| { |
| _connection.ProtocolWriter.Write(declareExchange); |
| } |
| // AS FIXME: wasnae me |
| _replayFrames.Add(declareExchange); |
| } |
| else |
| { |
| throw new NotImplementedException("Don't use nowait=false with DeclareExchange"); |
| //_connection.ConvenientProtocolWriter.SyncWrite(declareExchange, typeof (ExchangeDeclareOkBody)); |
| } |
| } |
| |
| /** |
| * Acknowledge a message or several messages. This method can be called via AbstractJMSMessage or from |
| * a BasicConsumer. The former where the mode is CLIENT_ACK and the latter where the mode is |
| * AUTO_ACK or similar. |
| * |
| * @param deliveryTag the tag of the last message to be acknowledged |
| * @param multiple if true will acknowledge all messages up to and including the one specified by the |
| * delivery tag |
| */ |
| internal void AcknowledgeMessage(ulong deliveryTag, bool multiple) |
| { |
| AMQFrame ackFrame = BasicAckBody.CreateAMQFrame(_channelId, deliveryTag, multiple); |
| if (_logger.IsDebugEnabled) |
| { |
| _logger.Debug("Sending ack for delivery tag " + deliveryTag + " on channel " + _channelId); |
| } |
| // FIXME: lock FailoverMutex here? |
| _connection.ProtocolWriter.Write(ackFrame); |
| } |
| |
| public void RejectMessage(ulong deliveryTag, bool requeue) |
| { |
| if ((_acknowledgeMode == AcknowledgeMode.ClientAcknowledge) || (_acknowledgeMode == AcknowledgeMode.SessionTransacted)) |
| { |
| AMQFrame rejectFrame = BasicRejectBody.CreateAMQFrame(_channelId, deliveryTag, requeue); |
| _connection.ProtocolWriter.Write(rejectFrame); |
| } |
| } |
| |
| /// <summary> |
| /// Handle a message that bounced from the server, creating |
| /// the corresponding exception and notifying the connection about it |
| /// </summary> |
| /// <param name="message">Unprocessed message</param> |
| private void ReturnBouncedMessage(UnprocessedMessage message) |
| { |
| try |
| { |
| AbstractQmsMessage bouncedMessage = |
| _messageFactoryRegistry.CreateMessage(0, false, message.ContentHeader, message.Bodies); |
| |
| int errorCode = message.BounceBody.ReplyCode; |
| string reason = message.BounceBody.ReplyText; |
| _logger.Debug("Message returned with error code " + errorCode + " (" + reason + ")"); |
| AMQException exception; |
| |
| if (errorCode == AMQConstant.NO_CONSUMERS.Code) |
| { |
| exception = new AMQNoConsumersException(reason, bouncedMessage); |
| } |
| else if (errorCode == AMQConstant.NO_ROUTE.Code) |
| { |
| exception = new AMQNoRouteException(reason, bouncedMessage); |
| } |
| else |
| { |
| exception = new AMQUndeliveredException(errorCode, reason, bouncedMessage); |
| } |
| |
| _connection.ExceptionReceived(exception); |
| } |
| catch (Exception ex) |
| { |
| _logger.Error("Caught exception trying to raise undelivered message exception (dump follows) - ignoring...", ex); |
| } |
| } |
| |
| private void OnPrefetchLowMark(int count) |
| { |
| if (_acknowledgeMode == AcknowledgeMode.NoAcknowledge) |
| { |
| _logger.Warn("Below threshold(" + _defaultPrefetchLowMark + ") so unsuspending channel. Current value is " + count); |
| Suspend(false); |
| } |
| } |
| |
| private void OnPrefetchHighMark(int count) |
| { |
| if (_acknowledgeMode == AcknowledgeMode.NoAcknowledge) |
| { |
| _logger.Warn("Above threshold(" + _defaultPrefetchHighMark + ") so suspending channel. Current value is " + count); |
| Suspend(true); |
| } |
| } |
| |
| private void Suspend(bool suspend) |
| { |
| lock (_suspensionLock) |
| { |
| if (_logger.IsDebugEnabled) |
| { |
| _logger.Debug("Setting channel flow : " + (suspend ? "suspended" : "unsuspended")); |
| } |
| |
| _suspended = suspend; |
| AMQFrame frame = ChannelFlowBody.CreateAMQFrame(_channelId, !suspend); |
| |
| Connection.ConvenientProtocolWriter.SyncWrite(frame, typeof(ChannelFlowOkBody)); |
| } |
| } |
| |
| /// <summary>A Dispatcher turns the consumption of incoming messages from an arrival queue, into event notifications on consumers. |
| /// The arrival queue is typically a blocking queue, on which a dispatcher waits for messages to consume. Upon receipt of a message |
| /// the dispatcher finds the consumer that is listening to the queue to which the message has been send and notifies it of the new |
| /// message. |
| /// |
| /// <p/>The Dispatcher also contains logic to recognize bounced messages. Bounced messages returned from the broker can be |
| /// told apart from regular deliveries because they do not have a delivery queue set on them. When the dispatcher receives a |
| /// bounced message it creates an exception and notifies the connection, to which its containing channel belongs, of the condition. |
| /// |
| /// <p/><table id="crc"><caption>CRC Card</caption> |
| /// <tr><th> Responsibilities <th> Collaborations |
| /// <tr><td> Notify consumers of message arrivals on their queues. <td> <see cref="BasicMessageConsumer"/> |
| /// <tr><td> Notify the containing connection of bounced message arrivals. <td> <see cref="AMQConnection"/> |
| /// </table> |
| /// </summary> |
| /// |
| /// <remarks>Stop mechanism seems wrong, as queue consume is evaluated after stop flag, so could consume and notify one more message. |
| /// Placing stop check after consume may also be wrong as it may cause a message to be thrown away. Seems more correct to use interupt on |
| /// the block thread to cause it to prematurely return from its wait, whereupon it can be made to re-check the stop flag.</remarks> |
| /// |
| /// <remarks>Exception swallowed, if there is an exception whilst notifying the connection on bounced messages. Unhandled excetpion should |
| /// fall through and terminate the loop, as it is a bug if it occurrs.</remarks> |
| private class Dispatcher |
| { |
| /// <summary> Flag used to indicate when this dispatcher is to be stopped (0=go, 1=stop). </summary> |
| private int _stopped = 0; |
| |
| /// <summary> The channel for which this is a dispatcher. </summary> |
| private AmqChannel _containingChannel; |
| |
| /// <summary> Creates a dispatcher on the specified channel. </summary> |
| /// |
| /// <param name="containingChannel"> The channel on which this is a dispatcher. </param> |
| public Dispatcher(AmqChannel containingChannel) |
| { |
| _containingChannel = containingChannel; |
| } |
| |
| /// <summary>The message dispatch loop. Consumes messages from the channels queue, notifying consumers of regular deliveries, and |
| /// the connection of bounced messages.</summary> |
| public void RunDispatcher() |
| { |
| UnprocessedMessage message; |
| |
| while (_stopped == 0 && (message = (UnprocessedMessage)_containingChannel._queue.Dequeue()) != null) |
| { |
| if (message.DeliverBody != null) |
| { |
| BasicMessageConsumer consumer = (BasicMessageConsumer) _containingChannel._consumers[message.DeliverBody.ConsumerTag]; |
| |
| if (consumer == null) |
| { |
| _logger.Warn("Received a message from queue " + message.DeliverBody.ConsumerTag + " without a f - ignoring..."); |
| } |
| else |
| { |
| consumer.NotifyMessage(message, _containingChannel.ChannelId); |
| } |
| } |
| else |
| { |
| try |
| { |
| // Bounced message is processed here, away from the transport thread |
| AbstractQmsMessage bouncedMessage = _containingChannel._messageFactoryRegistry. |
| CreateMessage(0, false, message.ContentHeader, message.Bodies); |
| |
| int errorCode = message.BounceBody.ReplyCode; |
| string reason = message.BounceBody.ReplyText; |
| |
| _logger.Debug("Message returned with error code " + errorCode + " (" + reason + ")"); |
| |
| _containingChannel._connection.ExceptionReceived(new AMQUndeliveredException(errorCode, "Error: " + reason, bouncedMessage)); |
| } |
| catch (Exception e) |
| { |
| _logger.Error("Caught exception trying to raise undelivered message exception (dump follows) - ignoring...", e); |
| } |
| } |
| } |
| |
| _logger.Debug("Dispatcher thread terminating for channel: " + _containingChannel._channelId + "."); |
| } |
| |
| /// <summary> Sets a stop flag on this dispatcher, which causes its dispatch loop to exit at the next available opportunity. </summary> |
| public void StopDispatcher() |
| { |
| Interlocked.Exchange(ref _stopped, 1); |
| } |
| } |
| } |
| } |