| /* |
| * |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| * |
| */ |
| using System; |
| using System.Threading; |
| using log4net; |
| using Qpid.Client.Message; |
| using Qpid.Messaging; |
| |
| namespace Qpid.Client |
| { |
| public class BasicMessageProducer : Closeable, IMessagePublisher |
| { |
| protected readonly ILog _logger = LogManager.GetLogger(typeof(BasicMessageProducer)); |
| |
| /// <summary> |
| /// If true, messages will not get a timestamp. |
| /// </summary> |
| private bool _disableTimestamps; |
| |
| /// <summary> |
| /// Priority of messages created by this producer. |
| /// </summary> |
| private int _messagePriority; |
| |
| /// <summary> |
| /// Time to live of messages. Specified in milliseconds but AMQ has 1 second resolution. |
| /// |
| private long _timeToLive; |
| |
| /// <summary> |
| /// Delivery mode used for this producer. |
| /// </summary> |
| private DeliveryMode _deliveryMode; |
| |
| private bool _immediate; |
| private bool _mandatory; |
| |
| string _exchangeName; |
| string _routingKey; |
| |
| /// <summary> |
| /// Default encoding used for messages produced by this producer. |
| /// </summary> |
| private string _encoding; |
| |
| /// <summary> |
| /// Default encoding used for message produced by this producer. |
| /// </summary> |
| private string _mimeType; |
| |
| /// <summary> |
| /// True if this producer was created from a transacted session |
| /// </summary> |
| private bool _transacted; |
| |
| private ushort _channelId; |
| |
| /// <summary> |
| /// This is an id generated by the session and is used to tie individual producers to the session. This means we |
| /// can deregister a producer with the session when the producer is closed. We need to be able to tie producers |
| /// to the session so that when an error is propagated to the session it can close the producer (meaning that |
| /// a client that happens to hold onto a producer reference will get an error if he tries to use it subsequently). |
| /// </summary> |
| private long _producerId; |
| |
| /// <summary> |
| /// The session used to create this producer |
| /// </summary> |
| private AmqChannel _channel; |
| |
| /// <summary> |
| /// Default value for immediate flag is false, i.e. a consumer does not need to be attached to a queue |
| /// </summary> |
| protected const bool DEFAULT_IMMEDIATE = false; |
| |
| /// <summary> |
| /// Default value for mandatory flag is true, i.e. server will not silently drop messages where no queue is |
| /// connected to the exchange for the message |
| /// </summary> |
| protected const bool DEFAULT_MANDATORY = true; |
| |
| public BasicMessageProducer(string exchangeName, string routingKey, |
| bool transacted, |
| ushort channelId, |
| AmqChannel channel, |
| long producerId, |
| DeliveryMode deliveryMode, |
| long timeToLive, |
| bool immediate, |
| bool mandatory, |
| int priority) |
| { |
| _exchangeName = exchangeName; |
| _routingKey = routingKey; |
| _transacted = transacted; |
| _channelId = channelId; |
| _channel = channel; |
| _producerId = producerId; |
| _deliveryMode = deliveryMode; |
| _timeToLive = timeToLive; |
| _immediate = immediate; |
| _mandatory = mandatory; |
| _messagePriority = priority; |
| |
| _channel.RegisterProducer(producerId, this); |
| } |
| |
| |
| #region IMessagePublisher Members |
| |
| public DeliveryMode DeliveryMode |
| { |
| get |
| { |
| CheckNotClosed(); |
| return _deliveryMode; |
| } |
| set |
| { |
| CheckNotClosed(); |
| _deliveryMode = value; |
| } |
| } |
| |
| public string ExchangeName |
| { |
| get { return _exchangeName; } |
| } |
| |
| public string RoutingKey |
| { |
| get { return _routingKey; } |
| } |
| |
| public bool DisableMessageID |
| { |
| get |
| { |
| throw new Exception("The method or operation is not implemented."); |
| } |
| set |
| { |
| throw new Exception("The method or operation is not implemented."); |
| } |
| } |
| |
| public bool DisableMessageTimestamp |
| { |
| get |
| { |
| CheckNotClosed(); |
| return _disableTimestamps; |
| } |
| set |
| { |
| CheckNotClosed(); |
| _disableTimestamps = value; |
| } |
| } |
| |
| public int Priority |
| { |
| get |
| { |
| CheckNotClosed(); |
| return _messagePriority; |
| } |
| set |
| { |
| CheckNotClosed(); |
| if (value < 0 || value > 9) |
| { |
| throw new ArgumentOutOfRangeException("Priority of " + value + " is illegal. Value must be in range 0 to 9"); |
| } |
| _messagePriority = value; |
| } |
| } |
| |
| public override void Close() |
| { |
| _logger.Info("Closing producer " + this); |
| Interlocked.Exchange(ref _closed, CLOSED); |
| _channel.DeregisterProducer(_producerId); |
| } |
| |
| public void Send(IMessage msg, DeliveryMode deliveryMode, int priority, long timeToLive) |
| { |
| CheckNotClosed(); |
| SendImpl(_exchangeName, _routingKey, (AbstractQmsMessage)msg, deliveryMode, priority, (uint)timeToLive, DEFAULT_MANDATORY, |
| DEFAULT_IMMEDIATE); |
| } |
| |
| public void Send(IMessage msg) |
| { |
| CheckNotClosed(); |
| SendImpl(_exchangeName, _routingKey, (AbstractQmsMessage)msg, _deliveryMode, _messagePriority, (uint)_timeToLive, |
| DEFAULT_MANDATORY, DEFAULT_IMMEDIATE); |
| } |
| |
| // This is a short-term hack (knowing that this code will be re-vamped sometime soon) |
| // to facilitate publishing messages to potentially non-existent recipients. |
| public void Send(IMessage msg, bool mandatory) |
| { |
| CheckNotClosed(); |
| SendImpl(_exchangeName, _routingKey, (AbstractQmsMessage)msg, _deliveryMode, _messagePriority, (uint)_timeToLive, |
| mandatory, DEFAULT_IMMEDIATE); |
| } |
| |
| public long TimeToLive |
| { |
| get |
| { |
| CheckNotClosed(); |
| return _timeToLive; |
| } |
| set |
| { |
| CheckNotClosed(); |
| if (value < 0) |
| { |
| throw new ArgumentOutOfRangeException("Time to live must be non-negative - supplied value was " + value); |
| } |
| _timeToLive = value; |
| } |
| } |
| |
| #endregion |
| |
| private void SendImpl(string exchangeName, string routingKey, AbstractQmsMessage message, DeliveryMode deliveryMode, int priority, uint timeToLive, bool mandatory, bool immediate) |
| { |
| _channel.BasicPublish(exchangeName, routingKey, mandatory, immediate, message, deliveryMode, priority, timeToLive, _disableTimestamps); |
| } |
| |
| public string MimeType |
| { |
| set |
| { |
| CheckNotClosed(); |
| _mimeType = value; |
| } |
| } |
| |
| public string Encoding |
| { |
| set |
| { |
| CheckNotClosed(); |
| _encoding = value; |
| } |
| } |
| |
| public void Dispose() |
| { |
| Close(); |
| } |
| } |
| } |