blob: d64632d8405023ce8d43428120a77477e70b6433 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
using System;
using System.Threading.Tasks;
using Apache.NMS.AMQP.Message;
using Apache.NMS.AMQP.Meta;
using Apache.NMS.AMQP.Provider;
using Apache.NMS.AMQP.Util;
using Apache.NMS.AMQP.Util.Synchronization;
using Apache.NMS.Util;
namespace Apache.NMS.AMQP
{
public class NmsMessageConsumer : IMessageConsumer
{
private readonly NmsSynchronizationMonitor syncRoot = new NmsSynchronizationMonitor();
private readonly AcknowledgementMode acknowledgementMode;
private readonly AtomicBool closed = new AtomicBool();
private readonly MessageDeliveryTask deliveryTask;
private readonly PriorityMessageQueue messageQueue = new PriorityMessageQueue();
private readonly AtomicBool started = new AtomicBool();
private Exception failureCause;
public NmsMessageConsumer(NmsConsumerId consumerId, NmsSession session, IDestination destination, string selector, bool noLocal) : this(consumerId, session, destination, null, selector, noLocal)
{
}
protected NmsMessageConsumer(NmsConsumerId consumerId, NmsSession session, IDestination destination, string name, string selector, bool noLocal)
{
Session = session;
acknowledgementMode = session.AcknowledgementMode;
if (destination.IsTemporary)
{
session.Connection.CheckConsumeFromTemporaryDestination((NmsTemporaryDestination) destination);
}
Info = new NmsConsumerInfo(consumerId)
{
Destination = destination,
Selector = selector,
NoLocal = noLocal,
IsExplicitClientId = Session.Connection.ConnectionInfo.IsExplicitClientId,
SubscriptionName = name,
IsShared = IsSharedSubscription,
IsDurable = IsDurableSubscription,
IsBrowser = IsBrowser,
LocalMessageExpiry = Session.Connection.ConnectionInfo.LocalMessageExpiry
};
deliveryTask = new MessageDeliveryTask(this);
}
public NmsSession Session { get; }
public NmsConsumerInfo Info { get; }
public IDestination Destination => Info.Destination;
protected virtual bool IsDurableSubscription => false;
protected virtual bool IsSharedSubscription => false;
protected virtual bool IsBrowser => false;
public void Dispose()
{
try
{
Close();
}
catch (Exception ex)
{
Tracer.DebugFormat("Caught exception while disposing {0} {1}. Exception {2}", GetType().Name, Info, ex);
}
}
public void Close()
{
CloseAsync().GetAsyncResult();
}
public async Task CloseAsync()
{
if (closed)
return;
using(await syncRoot.LockAsync().Await())
{
Shutdown(null);
await Session.Connection.DestroyResource(Info).Await();
}
}
public ConsumerTransformerDelegate ConsumerTransformer { get; set; }
public string MessageSelector => Info.Selector;
event MessageListener IMessageConsumer.Listener
{
add
{
CheckClosed();
using(syncRoot.Lock())
{
Listener += value;
DrainMessageQueueToListener();
}
}
remove
{
using(syncRoot.Lock())
{
Listener -= value;
}
}
}
public IMessage Receive()
{
CheckClosed();
CheckMessageListener();
while (true)
{
if (started)
{
return ReceiveInternalAsync(-1).GetAsyncResult();
}
}
}
public async Task<IMessage> ReceiveAsync()
{
CheckClosed();
CheckMessageListener();
while (true)
{
if (started)
{
return await ReceiveInternalAsync(-1).Await();
}
}
}
public T ReceiveBody<T>()
{
return ReceiveBodyAsync<T>().GetAsyncResult();
}
public Task<T> ReceiveBodyAsync<T>()
{
CheckClosed();
CheckMessageListener();
while (true)
{
if (started)
{
return ReceiveBodyInternalAsync<T>(-1);
}
}
}
public IMessage ReceiveNoWait()
{
CheckClosed();
CheckMessageListener();
return started ? ReceiveInternalAsync(0).GetAsyncResult() : null;
}
public T ReceiveBodyNoWait<T>()
{
CheckClosed();
CheckMessageListener();
return started ? ReceiveBodyInternalAsync<T>(0).GetAsyncResult() : default;
}
public IMessage Receive(TimeSpan timeout)
{
return ReceiveAsync(timeout).GetAsyncResult();
}
public async Task<IMessage> ReceiveAsync(TimeSpan timeout)
{
CheckClosed();
CheckMessageListener();
int timeoutInMilliseconds = (int) timeout.TotalMilliseconds;
if (started)
{
return await ReceiveInternalAsync(timeoutInMilliseconds).Await();
}
long deadline = GetDeadline(timeoutInMilliseconds);
while (true)
{
timeoutInMilliseconds = (int) (deadline - DateTime.UtcNow.Ticks / 10_000L);
if (timeoutInMilliseconds < 0)
{
return null;
}
if (started)
{
return await ReceiveInternalAsync(timeoutInMilliseconds).Await();
}
}
}
public T ReceiveBody<T>(TimeSpan timeout)
{
return ReceiveBodyAsync<T>(timeout).GetAsyncResult();
}
public async Task<T> ReceiveBodyAsync<T>(TimeSpan timeout)
{
CheckClosed();
CheckMessageListener();
int timeoutInMilliseconds = (int) timeout.TotalMilliseconds;
if (started)
{
return await ReceiveBodyInternalAsync<T>(timeoutInMilliseconds).Await();
}
long deadline = GetDeadline(timeoutInMilliseconds);
while (true)
{
timeoutInMilliseconds = (int) (deadline - DateTime.UtcNow.Ticks / 10_000L);
if (timeoutInMilliseconds < 0)
{
return default;
}
if (started)
{
return await ReceiveBodyInternalAsync<T>(timeoutInMilliseconds).Await();
}
}
}
private void CheckMessageListener()
{
if (HasMessageListener())
{
throw new IllegalStateException("Cannot synchronously receive a message when a MessageListener is set");
}
}
private void CheckClosed()
{
if (closed)
{
if (failureCause == null)
throw new IllegalStateException("The MessageConsumer is closed");
else
throw new IllegalStateException("The MessageConsumer was closed due to an unrecoverable error.", failureCause);
}
}
private event MessageListener Listener;
public async Task Init()
{
await Session.Connection.CreateResource(Info).Await();
Session.Add(this);
if (Session.IsStarted)
Start();
await Session.Connection.StartResource(Info).Await();
}
public void OnInboundMessage(InboundMessageDispatch envelope)
{
if (Tracer.IsDebugEnabled)
{
Tracer.Debug($"Message {envelope.Message.NMSMessageId} passed to consumer {Info.Id}");
}
SetAcknowledgeCallback(envelope);
if (envelope.EnqueueFirst)
messageQueue.EnqueueFirst(envelope);
else
messageQueue.Enqueue(envelope);
if (Session.IsStarted && Listener != null)
{
using (syncRoot.Exclude()) // Exclude lock for a time of dispatching, so it does not pass along to actionblock
{
Session.EnqueueForDispatch(deliveryTask);
}
}
}
private async Task DeliverNextPendingAsync()
{
if (Tracer.IsDebugEnabled)
{
Tracer.Debug($"{Info.Id} is about to deliver next pending message.");
}
if (Session.IsStarted && started && Listener != null)
{
using(await syncRoot.LockAsync())
{
try
{
if (started && Listener != null)
{
var envelope = messageQueue.DequeueNoWait();
if (envelope == null)
{
if (Tracer.IsDebugEnabled)
{
Tracer.Debug($"No message available for delivery.");
}
return;
}
if (IsMessageExpired(envelope))
{
if (Tracer.IsDebugEnabled)
{
Tracer.Debug($"{Info.Id} filtered expired message: {envelope.Message.NMSMessageId}");
}
await DoAckExpiredAsync(envelope).Await();
}
else if (IsRedeliveryExceeded(envelope))
{
if (Tracer.IsDebugEnabled)
{
Tracer.Debug($"{Info.Id} filtered message with excessive redelivery count: {envelope.RedeliveryCount.ToString()}");
}
// TODO: Apply redelivery policy
await DoAckExpiredAsync(envelope).Await();
}
else
{
bool deliveryFailed = false;
bool autoAckOrDupsOk = acknowledgementMode == AcknowledgementMode.AutoAcknowledge || acknowledgementMode == AcknowledgementMode.DupsOkAcknowledge;
if (autoAckOrDupsOk)
await DoAckDeliveredAsync(envelope).Await();
else
await AckFromReceiveAsync(envelope).Await();
try
{
Listener.Invoke(envelope.Message.Copy());
}
catch (Exception)
{
deliveryFailed = true;
}
if (autoAckOrDupsOk)
{
if (!deliveryFailed)
await DoAckConsumedAsync(envelope).Await();
else
await DoAckReleasedAsync(envelope).Await();
}
}
}
}
catch (Exception e)
{
// TODO - There are two cases when we can get an error here:
// 1) error returned from the attempted ACK that was sent
// 2) error while attempting to copy the incoming message.
//
// We need to decide how to respond to these, but definitely we cannot
// let this error propagate as it could take down the SessionDispatcher
Session.Connection.OnAsyncException(e);
}
}
}
}
private bool IsMessageExpired(InboundMessageDispatch envelope)
{
NmsMessage message = envelope.Message;
return Info.LocalMessageExpiry && message.IsExpired();
}
private bool IsRedeliveryExceeded(InboundMessageDispatch envelope)
{
Tracer.DebugFormat("Checking envelope with {0} redeliveries", envelope.RedeliveryCount);
IRedeliveryPolicy redeliveryPolicy = Session.Connection.RedeliveryPolicy;
if (redeliveryPolicy != null && redeliveryPolicy.MaximumRedeliveries >= 0)
{
return envelope.RedeliveryCount > redeliveryPolicy.MaximumRedeliveries;
}
return false;
}
private Task DoAckReleasedAsync(InboundMessageDispatch envelope)
{
return Session.AcknowledgeIndividualAsync(AckType.RELEASED, envelope);
}
private Task<IMessage> ReceiveInternalAsync(int timeout)
{
return ReceiveInternalBaseAsync(timeout, async envelope =>
{
IMessage message = envelope.Message.Copy();
await AckFromReceiveAsync(envelope);
return message;
});
}
private Task<T> ReceiveBodyInternalAsync<T>(int timeout)
{
return ReceiveInternalBaseAsync<T>(timeout, async envelope =>
{
try
{
T body = envelope.Message.Body<T>();
await AckFromReceiveAsync(envelope);
return body;
}
catch (MessageFormatException mfe)
{
// Should behave as if receiveBody never happened in these modes.
if (acknowledgementMode == AcknowledgementMode.AutoAcknowledge ||
acknowledgementMode == AcknowledgementMode.DupsOkAcknowledge) {
envelope.EnqueueFirst = true;
OnInboundMessage(envelope);
}
throw mfe;
}
});
}
private async Task<T> ReceiveInternalBaseAsync<T>(int timeout, Func<InboundMessageDispatch, Task<T>> func)
{
try
{
long deadline = 0;
if (timeout > 0)
{
deadline = GetDeadline(timeout);
}
while (true)
{
if (Tracer.IsDebugEnabled)
{
Tracer.Debug("Trying to dequeue next message.");
}
InboundMessageDispatch envelope = await messageQueue.DequeueAsync(timeout).Await();
if (failureCause != null)
throw NMSExceptionSupport.Create(failureCause);
if (envelope == null)
return default;
if (IsMessageExpired(envelope))
{
if (Tracer.IsDebugEnabled)
{
Tracer.Debug($"{Info.Id} filtered expired message: {envelope.Message.NMSMessageId}");
}
await DoAckExpiredAsync(envelope).Await();
if (timeout > 0)
timeout = (int) Math.Max(deadline - DateTime.UtcNow.Ticks / 10_000L, 0);
}
else if (IsRedeliveryExceeded(envelope))
{
if (Tracer.IsDebugEnabled)
{
Tracer.Debug($"{Info.Id} filtered message with excessive redelivery count: {envelope.RedeliveryCount.ToString()}");
}
// TODO: Apply redelivery policy
await DoAckExpiredAsync(envelope).Await();
}
else
{
if (Tracer.IsDebugEnabled)
{
Tracer.Debug($"{Info.Id} received message {envelope.Message.NMSMessageId}.");
}
return await func.Invoke(envelope);
}
}
}
catch (NMSException)
{
throw;
}
catch (Exception ex)
{
throw ExceptionSupport.Wrap(ex, "Receive failed");
}
}
private static long GetDeadline(int timeout)
{
return DateTime.UtcNow.Ticks / 10_000L + timeout;
}
private async Task AckFromReceiveAsync(InboundMessageDispatch envelope)
{
if (envelope?.Message != null)
{
NmsMessage message = envelope.Message;
if (message.NmsAcknowledgeCallback != null)
{
await DoAckDeliveredAsync(envelope).Await();
}
else
{
await DoAckConsumedAsync(envelope).Await();
}
}
}
private Task DoAckDeliveredAsync(InboundMessageDispatch envelope)
{
return Session.AcknowledgeAsync(AckType.DELIVERED, envelope);
}
private Task DoAckConsumedAsync(InboundMessageDispatch envelope)
{
return Session.AcknowledgeAsync(AckType.ACCEPTED, envelope);
}
private Task DoAckExpiredAsync(InboundMessageDispatch envelope)
{
return Session.AcknowledgeAsync(AckType.MODIFIED_FAILED_UNDELIVERABLE, envelope);
}
private void SetAcknowledgeCallback(InboundMessageDispatch envelope)
{
if (acknowledgementMode == AcknowledgementMode.ClientAcknowledge)
{
envelope.Message.NmsAcknowledgeCallback = new NmsAcknowledgeCallback(Session);
}
else if (Session.IsIndividualAcknowledge())
{
envelope.Message.NmsAcknowledgeCallback = new NmsAcknowledgeCallback(Session, envelope);
}
}
public bool HasMessageListener()
{
return Listener != null;
}
public void Shutdown(Exception exception)
{
if (closed.CompareAndSet(false, true))
{
if (Tracer.IsDebugEnabled)
{
Tracer.Debug("Shutting down NmsMessageConsumer.");
}
failureCause = exception;
Session.Remove(this);
started.Set(false);
messageQueue.Dispose();
}
else
{
if (Tracer.IsDebugEnabled)
{
Tracer.Debug("NmsMessageConsumer already closed.");
}
}
}
public void Start()
{
if (started.CompareAndSet(false, true))
{
DrainMessageQueueToListener();
}
}
private void DrainMessageQueueToListener()
{
if (Listener != null && Session.IsStarted)
{
int size = messageQueue.Count;
for (int i = 0; i < size; i++)
{
using (syncRoot.Exclude()) // Exclude lock for a time of dispatching, so it does not pass along to actionblock
{
Session.EnqueueForDispatch(deliveryTask);
}
}
}
}
public Task OnConnectionRecovery(IProvider provider)
{
return provider.CreateResource(Info);
}
public async Task OnConnectionRecovered(IProvider provider)
{
await provider.StartResource(Info).Await();
DrainMessageQueueToListener();
}
public void Stop()
{
using(syncRoot.Lock())
{
started.Set(false);
}
}
public bool IsDestinationInUse(NmsTemporaryDestination destination)
{
return Equals(Info.Destination, destination);
}
public void OnConnectionInterrupted()
{
messageQueue.Clear();
}
public async Task SuspendForRollbackAsync()
{
Stop();
try
{
await Session.Connection.StopResource(Info).Await();
}
finally
{
if (Session.TransactionContext.IsActiveInThisContext(Info.Id))
{
messageQueue.Clear();
}
}
}
public async Task ResumeAfterRollbackAsync()
{
Start();
await StartConsumerResourceAsync().Await();
}
private async Task StartConsumerResourceAsync()
{
try
{
await Session.Connection.StartResource(Info).Await();
}
catch (NMSException)
{
Session.Remove(this);
throw;
}
}
public class MessageDeliveryTask
{
private readonly NmsMessageConsumer consumer;
public MessageDeliveryTask(NmsMessageConsumer consumer)
{
this.consumer = consumer;
}
public Task DeliverNextPending()
{
return consumer.DeliverNextPendingAsync();
}
}
}
}