|  | /* | 
|  | * Licensed to the Apache Software Foundation (ASF) under one or more | 
|  | * contributor license agreements.  See the NOTICE file distributed with | 
|  | * this work for additional information regarding copyright ownership. | 
|  | * The ASF licenses this file to You under the Apache License, Version 2.0 | 
|  | * (the "License"); you may not use this file except in compliance with | 
|  | * the License.  You may obtain a copy of the License at | 
|  | * | 
|  | *     http://www.apache.org/licenses/LICENSE-2.0 | 
|  | * | 
|  | * Unless required by applicable law or agreed to in writing, software | 
|  | * distributed under the License is distributed on an "AS IS" BASIS, | 
|  | * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | 
|  | * See the License for the specific language governing permissions and | 
|  | * limitations under the License. | 
|  | */ | 
|  |  | 
|  | using System; | 
|  | using System.Threading; | 
|  | using System.Threading.Tasks; | 
|  | using Apache.NMS.Util; | 
|  | using Apache.NMS.ActiveMQ.Commands; | 
|  | using Apache.NMS.ActiveMQ.Util; | 
|  | using Apache.NMS.ActiveMQ.Util.Synchronization; | 
|  |  | 
|  | namespace Apache.NMS.ActiveMQ | 
|  | { | 
|  | /// <summary> | 
|  | /// An object capable of sending messages to some destination | 
|  | /// </summary> | 
|  | public class MessageProducer : IMessageProducer | 
|  | { | 
|  | private readonly Session session; | 
|  | private readonly MemoryUsage usage = null; | 
|  | private readonly NmsSynchronizationMonitor closedLock = new NmsSynchronizationMonitor(); | 
|  | private bool closed = false; | 
|  | private readonly ProducerInfo info; | 
|  | private int producerSequenceId = 0; | 
|  |  | 
|  | private MsgDeliveryMode msgDeliveryMode = NMSConstants.defaultDeliveryMode; | 
|  | private TimeSpan requestTimeout; | 
|  | private TimeSpan msgTimeToLive = NMSConstants.defaultTimeToLive; | 
|  | private MsgPriority msgPriority = NMSConstants.defaultPriority - 1; | 
|  | private bool disableMessageID = false; | 
|  | private bool disableMessageTimestamp = false; | 
|  | protected bool disposed = false; | 
|  |  | 
|  | private readonly MessageTransformation messageTransformation; | 
|  |  | 
|  | public MessageProducer(Session session, ProducerId id, ActiveMQDestination destination, TimeSpan requestTimeout) | 
|  | { | 
|  | this.session = session; | 
|  | this.RequestTimeout = requestTimeout; | 
|  |  | 
|  | this.info = new ProducerInfo(); | 
|  | this.info.ProducerId = id; | 
|  | this.info.Destination = destination; | 
|  | this.info.WindowSize = session.Connection.ProducerWindowSize; | 
|  |  | 
|  | this.messageTransformation = session.Connection.MessageTransformation; | 
|  |  | 
|  | // If the destination contained a URI query, then use it to set public | 
|  | // properties on the ProducerInfo | 
|  | if (destination != null && destination.Options != null) | 
|  | { | 
|  | URISupport.SetProperties(this.info, destination.Options, "producer."); | 
|  | } | 
|  |  | 
|  | // Version Three and higher will send us a ProducerAck, but only if we | 
|  | // have a set producer window size. | 
|  | if (session.Connection.ProtocolVersion >= 3 && this.info.WindowSize > 0) | 
|  | { | 
|  | if (Tracer.IsDebugEnabled) | 
|  | { | 
|  | Tracer.Debug("MessageProducer created with a Window Size of: " + this.info.WindowSize); | 
|  | } | 
|  | this.usage = new MemoryUsage(this.info.WindowSize); | 
|  | } | 
|  | } | 
|  |  | 
|  | ~MessageProducer() | 
|  | { | 
|  | Dispose(false); | 
|  | } | 
|  |  | 
|  | public void Dispose() | 
|  | { | 
|  | Dispose(true); | 
|  | GC.SuppressFinalize(this); | 
|  | } | 
|  |  | 
|  | protected void Dispose(bool disposing) | 
|  | { | 
|  | if(disposed) | 
|  | { | 
|  | return; | 
|  | } | 
|  |  | 
|  | try | 
|  | { | 
|  | Close(); | 
|  | } | 
|  | catch | 
|  | { | 
|  | } | 
|  |  | 
|  | disposed = true; | 
|  | } | 
|  |  | 
|  |  | 
|  |  | 
|  | public void Close() | 
|  | { | 
|  | CloseAsync().GetAsyncResult(); | 
|  | } | 
|  |  | 
|  | public async Task CloseAsync() | 
|  | { | 
|  | using(await closedLock.LockAsync().Await()) | 
|  | { | 
|  | if(closed) | 
|  | { | 
|  | return; | 
|  | } | 
|  |  | 
|  | Shutdown(); | 
|  | RemoveInfo removeInfo = new RemoveInfo(); | 
|  | removeInfo.ObjectId = this.info.ProducerId; | 
|  | this.session.Connection.Oneway(removeInfo); | 
|  | if(Tracer.IsDebugEnabled) | 
|  | { | 
|  | Tracer.DebugFormat("Remove of Producer[{0}] for destination[{1}] sent.", | 
|  | this.ProducerId, this.info.Destination); | 
|  | } | 
|  | } | 
|  | } | 
|  |  | 
|  | /// <summary> | 
|  | /// Called from the Parent session to deactivate this Producer, when a parent | 
|  | /// is closed all children are automatically removed from the broker so this | 
|  | /// method circumvents the need to send a Remove command to the broker. | 
|  | /// </summary> | 
|  | internal void Shutdown() | 
|  | { | 
|  | using(closedLock.Lock()) | 
|  | { | 
|  | if(closed) | 
|  | { | 
|  | return; | 
|  | } | 
|  |  | 
|  | try | 
|  | { | 
|  | session.RemoveProducer(info.ProducerId); | 
|  | } | 
|  | catch(Exception ex) | 
|  | { | 
|  | Tracer.ErrorFormat("Error during producer close: {0}", ex); | 
|  | } | 
|  |  | 
|  | if(this.usage != null) | 
|  | { | 
|  | this.usage.Stop(); | 
|  | } | 
|  |  | 
|  | closed = true; | 
|  | } | 
|  | } | 
|  |  | 
|  | public void Send(IMessage message) | 
|  | { | 
|  | Send(info.Destination, message, this.msgDeliveryMode, this.msgPriority, this.msgTimeToLive, false); | 
|  | } | 
|  |  | 
|  | public void Send(IDestination destination, IMessage message) | 
|  | { | 
|  | Send(destination, message, this.msgDeliveryMode, this.msgPriority, this.msgTimeToLive, false); | 
|  | } | 
|  |  | 
|  | public void Send(IMessage message, MsgDeliveryMode deliveryMode, MsgPriority priority, TimeSpan timeToLive) | 
|  | { | 
|  | Send(info.Destination, message, deliveryMode, priority, timeToLive, true); | 
|  | } | 
|  |  | 
|  | public void Send(IDestination destination, IMessage message, MsgDeliveryMode deliveryMode, MsgPriority priority, TimeSpan timeToLive) | 
|  | { | 
|  | Send(destination, message, deliveryMode, priority, timeToLive, true); | 
|  | } | 
|  |  | 
|  | public Task SendAsync(IMessage message) | 
|  | { | 
|  | return SendAsync(info.Destination, message, this.msgDeliveryMode, this.msgPriority, this.msgTimeToLive, false); | 
|  | } | 
|  |  | 
|  | public Task SendAsync(IDestination destination, IMessage message, MsgDeliveryMode deliveryMode, MsgPriority priority, | 
|  | TimeSpan timeToLive) | 
|  | { | 
|  | return SendAsync(destination, message, deliveryMode, priority, timeToLive, true); | 
|  | } | 
|  | public Task SendAsync(IMessage message, MsgDeliveryMode deliveryMode, MsgPriority priority, TimeSpan timeToLive) | 
|  | { | 
|  | return SendAsync(info.Destination, message, deliveryMode, priority, timeToLive, true); | 
|  | } | 
|  |  | 
|  | public Task SendAsync(IDestination destination, IMessage message) | 
|  | { | 
|  | return SendAsync(destination, message, this.msgDeliveryMode, this.msgPriority, this.msgTimeToLive, false); | 
|  | } | 
|  |  | 
|  | protected void Send(IDestination destination, IMessage message, MsgDeliveryMode deliveryMode, | 
|  | MsgPriority priority, TimeSpan timeToLive, bool specifiedTimeToLive) | 
|  | { | 
|  | SendAsync(destination, message, deliveryMode, priority, timeToLive, specifiedTimeToLive).GetAsyncResult(); | 
|  | } | 
|  |  | 
|  | protected async Task SendAsync(IDestination destination, IMessage message, MsgDeliveryMode deliveryMode, MsgPriority priority, TimeSpan timeToLive, bool specifiedTimeToLive) | 
|  | { | 
|  | if(null == destination) | 
|  | { | 
|  | // See if this producer was created without a destination. | 
|  | if(null == info.Destination) | 
|  | { | 
|  | throw new NotSupportedException(); | 
|  | } | 
|  |  | 
|  | // The producer was created with a destination, but an invalid destination | 
|  | // was specified. | 
|  | throw new Apache.NMS.InvalidDestinationException(); | 
|  | } | 
|  |  | 
|  | ActiveMQDestination dest = null; | 
|  |  | 
|  | if(destination == this.info.Destination) | 
|  | { | 
|  | dest = destination as ActiveMQDestination; | 
|  | } | 
|  | else if(info.Destination == null) | 
|  | { | 
|  | dest = ActiveMQDestination.Transform(destination); | 
|  | } | 
|  | else | 
|  | { | 
|  | throw new NotSupportedException("This producer can only send messages to: " + this.info.Destination.PhysicalName); | 
|  | } | 
|  |  | 
|  | if(this.ProducerTransformer != null) | 
|  | { | 
|  | IMessage transformed = this.ProducerTransformer(this.session, this, message); | 
|  | if(transformed != null) | 
|  | { | 
|  | message = transformed; | 
|  | } | 
|  | } | 
|  |  | 
|  | ActiveMQMessage activeMessage = this.messageTransformation.TransformMessage<ActiveMQMessage>(message); | 
|  |  | 
|  | activeMessage.ProducerId = info.ProducerId; | 
|  | activeMessage.Destination = dest; | 
|  | activeMessage.NMSDeliveryMode = deliveryMode; | 
|  | activeMessage.NMSPriority = priority; | 
|  |  | 
|  | // Always set the message Id regardless of the disable flag. | 
|  | MessageId id = new MessageId(); | 
|  | id.ProducerId = info.ProducerId; | 
|  | id.ProducerSequenceId = Interlocked.Increment(ref this.producerSequenceId); | 
|  | activeMessage.MessageId = id; | 
|  |  | 
|  | // Ensure that the source message contains the NMSMessageId of the transformed | 
|  | // message for correlation purposes. | 
|  | if (!ReferenceEquals(message, activeMessage)) | 
|  | { | 
|  | message.NMSMessageId = activeMessage.NMSMessageId; | 
|  | } | 
|  |  | 
|  | if(!disableMessageTimestamp) | 
|  | { | 
|  | activeMessage.NMSTimestamp = DateTime.UtcNow; | 
|  | } | 
|  |  | 
|  | if(specifiedTimeToLive) | 
|  | { | 
|  | activeMessage.NMSTimeToLive = timeToLive; | 
|  | } | 
|  |  | 
|  | // Ensure there's room left to send this message | 
|  | if(this.usage != null) | 
|  | { | 
|  | usage.WaitForSpace(); | 
|  | } | 
|  |  | 
|  | using(await closedLock.LockAsync().Await()) | 
|  | { | 
|  | if(closed) | 
|  | { | 
|  | throw new ConnectionClosedException(); | 
|  | } | 
|  |  | 
|  | await session.DoSendAsync(dest, activeMessage, this, this.usage, this.RequestTimeout).Await(); | 
|  | } | 
|  | } | 
|  |  | 
|  | public ProducerId ProducerId | 
|  | { | 
|  | get { return info.ProducerId; } | 
|  | } | 
|  |  | 
|  | public ProducerInfo ProducerInfo | 
|  | { | 
|  | get { return info; } | 
|  | } | 
|  |  | 
|  | public MsgDeliveryMode DeliveryMode | 
|  | { | 
|  | get { return msgDeliveryMode; } | 
|  | set { this.msgDeliveryMode = value; } | 
|  | } | 
|  |  | 
|  | public TimeSpan TimeToLive | 
|  | { | 
|  | get { return msgTimeToLive; } | 
|  | set { this.msgTimeToLive = value; } | 
|  | } | 
|  |  | 
|  | public TimeSpan RequestTimeout | 
|  | { | 
|  | get { return requestTimeout; } | 
|  | set { this.requestTimeout = value; } | 
|  | } | 
|  |  | 
|  | public MsgPriority Priority | 
|  | { | 
|  | get { return msgPriority; } | 
|  | set { this.msgPriority = value; } | 
|  | } | 
|  |  | 
|  | public bool DisableMessageID | 
|  | { | 
|  | get { return disableMessageID; } | 
|  | set { this.disableMessageID = value; } | 
|  | } | 
|  |  | 
|  | public bool DisableMessageTimestamp | 
|  | { | 
|  | get { return disableMessageTimestamp; } | 
|  | set { this.disableMessageTimestamp = value; } | 
|  | } | 
|  |  | 
|  | public TimeSpan DeliveryDelay | 
|  | { | 
|  | get => throw new NotImplementedException(); | 
|  | set => throw new NotImplementedException(); | 
|  | } | 
|  |  | 
|  | private ProducerTransformerDelegate producerTransformer; | 
|  | public Task<IStreamMessage> CreateStreamMessageAsync() | 
|  | { | 
|  | return Task.FromResult(CreateStreamMessage()); | 
|  | } | 
|  |  | 
|  | public ProducerTransformerDelegate ProducerTransformer | 
|  | { | 
|  | get { return this.producerTransformer; } | 
|  | set { this.producerTransformer = value; } | 
|  | } | 
|  |  | 
|  | public IMessage CreateMessage() | 
|  | { | 
|  | return session.CreateMessage(); | 
|  | } | 
|  |  | 
|  | public Task<IMessage> CreateMessageAsync() | 
|  | { | 
|  | return Task.FromResult(CreateMessage()); | 
|  | } | 
|  |  | 
|  | public ITextMessage CreateTextMessage() | 
|  | { | 
|  | return session.CreateTextMessage(); | 
|  | } | 
|  |  | 
|  | public Task<ITextMessage> CreateTextMessageAsync() | 
|  | { | 
|  | return Task.FromResult(CreateTextMessage()); | 
|  | } | 
|  |  | 
|  | public ITextMessage CreateTextMessage(string text) | 
|  | { | 
|  | return session.CreateTextMessage(text); | 
|  | } | 
|  |  | 
|  | public Task<ITextMessage> CreateTextMessageAsync(string text) | 
|  | { | 
|  | return Task.FromResult(CreateTextMessage(text)); | 
|  | } | 
|  |  | 
|  | public IMapMessage CreateMapMessage() | 
|  | { | 
|  | return session.CreateMapMessage(); | 
|  | } | 
|  |  | 
|  | public Task<IMapMessage> CreateMapMessageAsync() | 
|  | { | 
|  | return Task.FromResult(CreateMapMessage()); | 
|  | } | 
|  |  | 
|  | public IObjectMessage CreateObjectMessage(object body) | 
|  | { | 
|  | return session.CreateObjectMessage(body); | 
|  | } | 
|  |  | 
|  | public Task<IObjectMessage> CreateObjectMessageAsync(object body) | 
|  | { | 
|  | return Task.FromResult(CreateObjectMessage(body)); | 
|  | } | 
|  |  | 
|  | public IBytesMessage CreateBytesMessage() | 
|  | { | 
|  | return session.CreateBytesMessage(); | 
|  | } | 
|  |  | 
|  | public Task<IBytesMessage> CreateBytesMessageAsync() | 
|  | { | 
|  | return Task.FromResult(CreateBytesMessage()); | 
|  | } | 
|  |  | 
|  | public IBytesMessage CreateBytesMessage(byte[] body) | 
|  | { | 
|  | return session.CreateBytesMessage(body); | 
|  | } | 
|  |  | 
|  | public Task<IBytesMessage> CreateBytesMessageAsync(byte[] body) | 
|  | { | 
|  | return Task.FromResult(CreateBytesMessage(body)); | 
|  | } | 
|  |  | 
|  | public IStreamMessage CreateStreamMessage() | 
|  | { | 
|  | return session.CreateStreamMessage(); | 
|  | } | 
|  |  | 
|  | internal void OnProducerAck(ProducerAck ack) | 
|  | { | 
|  | if (Tracer.IsDebugEnabled) | 
|  | { | 
|  | Tracer.Debug("Received ProducerAck for Message of Size = {" + ack.Size + "}"); | 
|  | } | 
|  |  | 
|  | if(this.usage != null) | 
|  | { | 
|  | this.usage.DecreaseUsage( ack.Size ); | 
|  | } | 
|  | } | 
|  | } | 
|  | } |