| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| using System; |
| using System.Threading; |
| using Apache.NMS.Util; |
| using Apache.NMS.Stomp.Commands; |
| using Apache.NMS.Stomp.Util; |
| |
| namespace Apache.NMS.Stomp |
| { |
| /// <summary> |
| /// An object capable of sending messages to some destination |
| /// </summary> |
| public class MessageProducer : IMessageProducer |
| { |
| private Session session; |
| private bool closed = false; |
| private object closedLock = new object(); |
| private readonly ProducerInfo info; |
| private int producerSequenceId = 0; |
| |
| private MsgDeliveryMode msgDeliveryMode = NMSConstants.defaultDeliveryMode; |
| private TimeSpan requestTimeout = NMSConstants.defaultRequestTimeout; |
| private TimeSpan msgTimeToLive = NMSConstants.defaultTimeToLive; |
| private MsgPriority msgPriority = NMSConstants.defaultPriority; |
| private bool disableMessageID = false; |
| private bool disableMessageTimestamp = false; |
| protected bool disposed = false; |
| |
| public MessageProducer(Session session, ProducerInfo info) |
| { |
| this.session = session; |
| this.info = info; |
| this.RequestTimeout = session.RequestTimeout; |
| } |
| |
| ~MessageProducer() |
| { |
| Dispose(false); |
| } |
| |
| public void Dispose() |
| { |
| Dispose(true); |
| GC.SuppressFinalize(this); |
| } |
| |
| protected void Dispose(bool disposing) |
| { |
| if(disposed) |
| { |
| return; |
| } |
| |
| if(disposing) |
| { |
| // Dispose managed code here. |
| } |
| |
| try |
| { |
| Close(); |
| } |
| catch |
| { |
| // Ignore network errors. |
| } |
| |
| disposed = true; |
| } |
| |
| public void Close() |
| { |
| lock(closedLock) |
| { |
| if(closed) |
| { |
| return; |
| } |
| |
| DoClose(); |
| this.session = null; |
| } |
| } |
| |
| internal void DoClose() |
| { |
| lock(closedLock) |
| { |
| if(closed) |
| { |
| return; |
| } |
| |
| try |
| { |
| session.DisposeOf(info.ProducerId); |
| } |
| catch(Exception ex) |
| { |
| Tracer.ErrorFormat("Error during producer close: {0}", ex); |
| } |
| |
| closed = true; |
| } |
| } |
| |
| public void Send(IMessage message) |
| { |
| Send(info.Destination, message, this.msgDeliveryMode, this.msgPriority, this.msgTimeToLive, false); |
| } |
| |
| public void Send(IDestination destination, IMessage message) |
| { |
| Send(destination, message, this.msgDeliveryMode, this.msgPriority, this.msgTimeToLive, false); |
| } |
| |
| public void Send(IMessage message, MsgDeliveryMode deliveryMode, MsgPriority priority, TimeSpan timeToLive) |
| { |
| Send(info.Destination, message, deliveryMode, priority, timeToLive, true); |
| } |
| |
| public void Send(IDestination destination, IMessage message, MsgDeliveryMode deliveryMode, MsgPriority priority, TimeSpan timeToLive) |
| { |
| Send(destination, message, deliveryMode, priority, timeToLive, true); |
| } |
| |
| protected void Send(IDestination destination, IMessage message, MsgDeliveryMode deliveryMode, MsgPriority priority, TimeSpan timeToLive, bool specifiedTimeToLive) |
| { |
| if(null == destination) |
| { |
| // See if this producer was created without a destination. |
| if(null == info.Destination) |
| { |
| throw new NotSupportedException(); |
| } |
| |
| // The producer was created with a destination, but an invalid destination |
| // was specified. |
| throw new Apache.NMS.InvalidDestinationException(); |
| } |
| |
| Destination dest = null; |
| |
| if(destination == this.info.Destination) |
| { |
| dest = destination as Destination; |
| } |
| else if(info.Destination == null) |
| { |
| dest = Destination.Transform(destination); |
| } |
| else |
| { |
| throw new NotSupportedException("This producer can only send messages to: " + this.info.Destination.PhysicalName); |
| } |
| |
| Message stompMessage = (Message) message; |
| |
| stompMessage.ProducerId = info.ProducerId; |
| stompMessage.FromDestination = dest; |
| stompMessage.NMSDeliveryMode = deliveryMode; |
| stompMessage.NMSPriority = priority; |
| |
| // Always set the message Id regardless of the disable flag. |
| MessageId id = new MessageId(); |
| id.ProducerId = info.ProducerId; |
| id.ProducerSequenceId = Interlocked.Increment(ref this.producerSequenceId); |
| stompMessage.MessageId = id; |
| |
| if(!disableMessageTimestamp) |
| { |
| stompMessage.NMSTimestamp = DateTime.UtcNow; |
| } |
| |
| if(specifiedTimeToLive) |
| { |
| stompMessage.NMSTimeToLive = timeToLive; |
| } |
| |
| lock(closedLock) |
| { |
| if(closed) |
| { |
| throw new ConnectionClosedException(); |
| } |
| session.DoSend(stompMessage, this, this.RequestTimeout); |
| } |
| } |
| |
| public ProducerId ProducerId |
| { |
| get { return info.ProducerId; } |
| } |
| |
| public MsgDeliveryMode DeliveryMode |
| { |
| get { return msgDeliveryMode; } |
| set { this.msgDeliveryMode = value; } |
| } |
| |
| public TimeSpan TimeToLive |
| { |
| get { return msgTimeToLive; } |
| set { this.msgTimeToLive = value; } |
| } |
| |
| public TimeSpan RequestTimeout |
| { |
| get { return requestTimeout; } |
| set { this.requestTimeout = value; } |
| } |
| |
| public MsgPriority Priority |
| { |
| get { return msgPriority; } |
| set { this.msgPriority = value; } |
| } |
| |
| public bool DisableMessageID |
| { |
| get { return disableMessageID; } |
| set { this.disableMessageID = value; } |
| } |
| |
| public bool DisableMessageTimestamp |
| { |
| get { return disableMessageTimestamp; } |
| set { this.disableMessageTimestamp = value; } |
| } |
| |
| public IMessage CreateMessage() |
| { |
| return session.CreateMessage(); |
| } |
| |
| public ITextMessage CreateTextMessage() |
| { |
| return session.CreateTextMessage(); |
| } |
| |
| public ITextMessage CreateTextMessage(string text) |
| { |
| return session.CreateTextMessage(text); |
| } |
| |
| public IMapMessage CreateMapMessage() |
| { |
| return session.CreateMapMessage(); |
| } |
| |
| public IObjectMessage CreateObjectMessage(object body) |
| { |
| throw new NotSupportedException("No Object Message in Stomp"); |
| } |
| |
| public IBytesMessage CreateBytesMessage() |
| { |
| return session.CreateBytesMessage(); |
| } |
| |
| public IBytesMessage CreateBytesMessage(byte[] body) |
| { |
| return session.CreateBytesMessage(body); |
| } |
| |
| public IStreamMessage CreateStreamMessage() |
| { |
| return session.CreateStreamMessage(); |
| } |
| |
| } |
| } |