| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| using System; |
| using System.Threading.Tasks; |
| using Apache.NMS.AMQP.Meta; |
| using Apache.NMS.AMQP.Provider; |
| using Apache.NMS.AMQP.Util; |
| |
| namespace Apache.NMS.AMQP |
| { |
| public class NmsMessageProducer : IMessageProducer |
| { |
| private readonly NmsSession session; |
| private readonly AtomicBool closed = new AtomicBool(); |
| |
| private Exception failureCause; |
| private MsgDeliveryMode deliveryMode = MsgDeliveryMode.Persistent; |
| private TimeSpan timeToLive = NMSConstants.defaultTimeToLive; |
| private TimeSpan requestTimeout; |
| private MsgPriority priority = NMSConstants.defaultPriority; |
| private bool disableMessageId; |
| private bool disableMessageTimestamp; |
| |
| public NmsMessageProducer(Id producerId, NmsSession session, IDestination destination) |
| { |
| this.session = session; |
| Info = new ProducerInfo(producerId, session.SessionInfo.Id) |
| { |
| Destination = destination |
| }; |
| |
| session.Connection.CreateResource(Info).ConfigureAwait(false).GetAwaiter().GetResult(); |
| |
| session.Add(this); |
| } |
| |
| public ProducerInfo Info { get; } |
| public IdGenerator MessageIdGenerator { get; } = new CustomIdGenerator(true, "ID", new AtomicSequence()); |
| |
| public void Dispose() |
| { |
| try |
| { |
| Close(); |
| } |
| catch (Exception ex) |
| { |
| Tracer.DebugFormat("Caught exception while disposing {0} {1}. Exception {2}", GetType().Name, Info, ex); |
| } |
| } |
| |
| public void Send(IMessage message) |
| { |
| Send(message, DeliveryMode, Priority, TimeToLive); |
| } |
| |
| public void Send(IMessage message, MsgDeliveryMode deliveryMode, MsgPriority priority, TimeSpan timeToLive) |
| { |
| Send(Info.Destination, message, deliveryMode, priority, timeToLive); |
| } |
| |
| public void Send(IDestination destination, IMessage message) |
| { |
| Send(destination, message, DeliveryMode, Priority, TimeToLive); |
| } |
| |
| public void Send(IDestination destination, IMessage message, MsgDeliveryMode deliveryMode, MsgPriority priority, TimeSpan timeToLive) |
| { |
| CheckClosed(); |
| session.Send(this, destination, message, deliveryMode, priority, timeToLive, DisableMessageID, DisableMessageTimestamp); |
| } |
| |
| public void Close() |
| { |
| if (closed) |
| return; |
| |
| Shutdown(); |
| session.Connection.DestroyResource(Info).ConfigureAwait(false).GetAwaiter().GetResult(); |
| } |
| |
| public IMessage CreateMessage() |
| { |
| CheckClosed(); |
| return session.CreateMessage(); |
| } |
| |
| public ITextMessage CreateTextMessage() |
| { |
| CheckClosed(); |
| return session.CreateTextMessage(); |
| } |
| |
| public ITextMessage CreateTextMessage(string text) |
| { |
| CheckClosed(); |
| return session.CreateTextMessage(text); |
| } |
| |
| public IMapMessage CreateMapMessage() |
| { |
| CheckClosed(); |
| return session.CreateMapMessage(); |
| } |
| |
| public IObjectMessage CreateObjectMessage(object body) |
| { |
| CheckClosed(); |
| return session.CreateObjectMessage(body); |
| } |
| |
| public IBytesMessage CreateBytesMessage() |
| { |
| CheckClosed(); |
| return session.CreateBytesMessage(); |
| } |
| |
| public IBytesMessage CreateBytesMessage(byte[] body) |
| { |
| CheckClosed(); |
| return session.CreateBytesMessage(body); |
| } |
| |
| public IStreamMessage CreateStreamMessage() |
| { |
| CheckClosed(); |
| return session.CreateStreamMessage(); |
| } |
| |
| public ProducerTransformerDelegate ProducerTransformer { get; set; } |
| |
| public MsgDeliveryMode DeliveryMode |
| { |
| get |
| { |
| CheckClosed(); |
| return deliveryMode; |
| } |
| set |
| { |
| CheckClosed(); |
| deliveryMode = value; |
| } |
| } |
| |
| public TimeSpan TimeToLive |
| { |
| get |
| { |
| CheckClosed(); |
| return timeToLive; |
| } |
| set |
| { |
| CheckClosed(); |
| timeToLive = value; |
| } |
| } |
| |
| public TimeSpan RequestTimeout |
| { |
| get |
| { |
| CheckClosed(); |
| return requestTimeout; |
| } |
| set |
| { |
| CheckClosed(); |
| requestTimeout = value; |
| } |
| } |
| |
| public MsgPriority Priority |
| { |
| get |
| { |
| CheckClosed(); |
| return priority; |
| } |
| set |
| { |
| CheckClosed(); |
| priority = value; |
| } |
| } |
| |
| public bool DisableMessageID |
| { |
| get |
| { |
| CheckClosed(); |
| return disableMessageId; |
| } |
| set |
| { |
| CheckClosed(); |
| disableMessageId = value; |
| } |
| } |
| |
| public bool DisableMessageTimestamp |
| { |
| get |
| { |
| CheckClosed(); |
| return disableMessageTimestamp; |
| } |
| set |
| { |
| CheckClosed(); |
| disableMessageTimestamp = value; |
| } |
| } |
| |
| public Task Init() |
| { |
| return session.Connection.CreateResource(Info); |
| } |
| |
| public Task OnConnectionRecovery(IProvider provider) |
| { |
| return provider.CreateResource(Info); |
| } |
| |
| private void CheckClosed() |
| { |
| if (!closed) return; |
| |
| if (failureCause == null) |
| throw new IllegalStateException("The MessageProducer is closed"); |
| else |
| throw new IllegalStateException("The MessageProducer was closed due to an unrecoverable error.", failureCause); |
| } |
| |
| public void Shutdown(Exception error = null) |
| { |
| if (closed.CompareAndSet(false, true)) |
| { |
| failureCause = error; |
| session.Remove(this); |
| } |
| } |
| } |
| } |