blob: 25a34cb6fc7602cae62f9fb5fb6b44e59e138a22 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
using System;
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;
using Apache.Qpid.Proton.Client.Exceptions;
using Apache.Qpid.Proton.Client.Concurrent;
using Apache.Qpid.Proton.Client.Utilities;
using Apache.Qpid.Proton.Engine;
using Apache.Qpid.Proton.Logging;
namespace Apache.Qpid.Proton.Client.Implementation
{
/// <summary>
/// Client receiver implementation which provides a wrapper around the proton
/// receiver link and processes incoming deliveries with options for queueing
/// with a credit window.
/// </summary>
public class ClientReceiver : IReceiver
{
private static IProtonLogger LOG = ProtonLoggerFactory.GetLogger<ClientReceiver>();
private readonly ReceiverOptions options;
private readonly ClientSession session;
private readonly string receiverId;
private readonly FifoDeliveryQueue<ClientDelivery> messageQueue;
private readonly AtomicBoolean closed = new AtomicBoolean();
private readonly TaskCompletionSource<IReceiver> openFuture = new TaskCompletionSource<IReceiver>();
private readonly TaskCompletionSource<IReceiver> closeFuture = new TaskCompletionSource<IReceiver>();
private TaskCompletionSource<IReceiver> drainingFuture;
private ClientException failureCause;
private Engine.IReceiver protonReceiver;
private volatile ISource remoteSource;
private volatile ITarget remoteTarget;
internal ClientReceiver(ClientSession session, ReceiverOptions options, String receiverId, Engine.IReceiver receiver)
{
this.options = options;
this.session = session;
this.receiverId = receiverId;
this.protonReceiver = receiver;
this.protonReceiver.LinkedResource = this;
if (options.CreditWindow > 0)
{
protonReceiver.AddCredit(options.CreditWindow);
}
messageQueue = new FifoDeliveryQueue<ClientDelivery>();
messageQueue.Start();
}
public IClient Client => session.Client;
public IConnection Connection => session.Connection;
public ISession Session => session;
public Task<IReceiver> OpenTask => openFuture.Task;
public string Address
{
get
{
if (IsDynamic)
{
WaitForOpenToComplete();
return protonReceiver.RemoteSource?.Address;
}
else
{
return protonReceiver.Source?.Address;
}
}
}
public ISource Source
{
get
{
WaitForOpenToComplete();
return remoteSource;
}
}
public ITarget Target
{
get
{
WaitForOpenToComplete();
return remoteTarget;
}
}
public IReadOnlyDictionary<string, object> Properties
{
get
{
WaitForOpenToComplete();
return ClientConversionSupport.ToStringKeyedMap(protonReceiver.RemoteProperties);
}
}
public IReadOnlyCollection<string> OfferedCapabilities
{
get
{
WaitForOpenToComplete();
return ClientConversionSupport.ToStringArray(protonReceiver.RemoteOfferedCapabilities);
}
}
public IReadOnlyCollection<string> DesiredCapabilities
{
get
{
WaitForOpenToComplete();
return ClientConversionSupport.ToStringArray(protonReceiver.RemoteDesiredCapabilities);
}
}
public int QueuedDeliveries => messageQueue.Count;
public IReceiver AddCredit(uint credit)
{
CheckClosedOrFailed();
TaskCompletionSource<IReceiver> creditAdded = new TaskCompletionSource<IReceiver>();
session.Execute(() =>
{
if (NotClosedOrFailed(creditAdded))
{
if (options.CreditWindow != 0)
{
creditAdded.TrySetException(new ClientIllegalStateException("Cannot add credit when a credit window has been configured"));
}
else if (protonReceiver.IsDraining)
{
creditAdded.TrySetException(new ClientIllegalStateException("Cannot add credit while a drain is pending"));
}
else
{
try
{
protonReceiver.AddCredit(credit);
creditAdded.TrySetResult(this);
}
catch (Exception ex)
{
creditAdded.TrySetException(ClientExceptionSupport.CreateNonFatalOrPassthrough(ex));
}
}
}
});
return session.Request(this, creditAdded).Task.GetAwaiter().GetResult();
}
public void Close(IErrorCondition error = null)
{
try
{
CloseAsync(error).Wait();
}
catch (Exception)
{
}
}
public Task<IReceiver> CloseAsync(IErrorCondition error = null)
{
return DoCloseOrDetach(true, error);
}
public void Detach(IErrorCondition error = null)
{
try
{
DetachAsync(error).Wait();
}
catch (Exception)
{
}
}
public Task<IReceiver> DetachAsync(IErrorCondition error = null)
{
return DoCloseOrDetach(false, error);
}
public void Dispose()
{
try
{
Close();
}
catch (Exception)
{
}
}
public Task<IReceiver> Drain()
{
CheckClosedOrFailed();
TaskCompletionSource<IReceiver> drainComplete = new TaskCompletionSource<IReceiver>();
session.Execute(() =>
{
if (NotClosedOrFailed(drainComplete))
{
if (protonReceiver.IsDraining)
{
drainComplete.TrySetException(new ClientIllegalStateException("Receiver is already draining"));
return;
}
try
{
if (protonReceiver.Drain())
{
drainingFuture = drainComplete;
// TODO: Need a cancellation point: drainingTimeout
session.ScheduleRequestTimeout(drainingFuture, options.DrainTimeout,
() => new ClientOperationTimedOutException("Timed out waiting for remote to respond to drain request"));
}
else
{
drainComplete.TrySetResult(this);
}
}
catch (Exception ex)
{
drainComplete.TrySetException(ClientExceptionSupport.CreateNonFatalOrPassthrough(ex));
}
}
});
return drainComplete.Task;
}
public IDelivery Receive()
{
return Receive(TimeSpan.MaxValue);
}
public IDelivery Receive(TimeSpan timeout)
{
CheckClosedOrFailed();
try
{
ClientDelivery delivery = messageQueue.Dequeue(timeout);
if (delivery != null)
{
if (options.AutoAccept)
{
delivery.Disposition(ClientAccepted.Instance, options.AutoSettle);
}
else
{
AsyncReplenishCreditIfNeeded();
}
return delivery;
}
CheckClosedOrFailed();
return null;
}
catch (ThreadInterruptedException e)
{
throw new ClientException("Receive wait interrupted", e);
}
}
public IDelivery TryReceive()
{
CheckClosedOrFailed();
IDelivery delivery = messageQueue.DequeueNoWait();
if (delivery != null)
{
if (options.AutoAccept)
{
delivery.Disposition(ClientAccepted.Instance, options.AutoSettle);
}
else
{
AsyncReplenishCreditIfNeeded();
}
}
else
{
CheckClosedOrFailed();
}
return delivery;
}
#region Internal Receiver API
internal ClientReceiver Open()
{
protonReceiver.LocalOpenHandler(HandleLocalOpen)
.LocalCloseHandler(HandleLocalCloseOrDetach)
.LocalDetachHandler(HandleLocalCloseOrDetach)
.OpenHandler(HandleRemoteOpen)
.CloseHandler(HandleRemoteCloseOrDetach)
.DetachHandler(HandleRemoteCloseOrDetach)
.ParentEndpointClosedHandler(HandleParentEndpointClosed)
.DeliveryStateUpdatedHandler(HandleDeliveryStateRemotelyUpdated)
.DeliveryReadHandler(HandleDeliveryReceived)
.DeliveryAbortedHandler(HandleDeliveryAborted)
.CreditStateUpdateHandler(HandleReceiverCreditUpdated)
.EngineShutdownHandler(HandleEngineShutdown)
.Open();
return this;
}
internal void Disposition(IIncomingDelivery delivery, Types.Transport.IDeliveryState state, bool settle)
{
CheckClosedOrFailed();
AsyncApplyDisposition(delivery, state, settle);
}
internal String ReceiverId => receiverId;
internal bool IsClosed => closed;
internal bool IsDynamic => protonReceiver.Source?.Dynamic ?? false;
#endregion
#region Private Receiver Implementation
private Task<IReceiver> DoCloseOrDetach(bool close, IErrorCondition error)
{
if (closed.CompareAndSet(false, true))
{
// Already closed by failure or shutdown so no need to
if (!closeFuture.Task.IsCompleted)
{
session.Execute(() =>
{
if (protonReceiver.IsLocallyOpen)
{
try
{
protonReceiver.ErrorCondition = ClientErrorCondition.AsProtonErrorCondition(error);
if (close)
{
protonReceiver.Close();
}
else
{
protonReceiver.Detach();
}
}
catch (Exception)
{
// The engine event handlers will deal with errors
}
}
});
}
}
return closeFuture.Task;
}
private void AsyncApplyDisposition(IIncomingDelivery delivery, Types.Transport.IDeliveryState state, bool settle)
{
session.Execute(() =>
{
session.TransactionContext.Disposition(delivery, state, settle);
ReplenishCreditIfNeeded();
});
}
private void ReplenishCreditIfNeeded()
{
uint creditWindow = options.CreditWindow;
if (creditWindow > 0)
{
uint currentCredit = protonReceiver.Credit;
if (currentCredit <= creditWindow * 0.5)
{
uint potentialPrefetch = currentCredit + (uint)messageQueue.Count;
if (potentialPrefetch <= creditWindow * 0.7)
{
uint additionalCredit = creditWindow - potentialPrefetch;
LOG.Trace("Consumer granting additional credit: {0}", additionalCredit);
try
{
protonReceiver.AddCredit(additionalCredit);
}
catch (Exception ex)
{
LOG.Debug("Error caught during credit top-up", ex);
}
}
}
}
}
private void AsyncReplenishCreditIfNeeded()
{
uint creditWindow = options.CreditWindow;
if (creditWindow > 0)
{
session.Execute(ReplenishCreditIfNeeded);
}
}
private void CheckClosedOrFailed()
{
if (IsClosed)
{
throw new ClientIllegalStateException("The Receiver was explicitly closed", failureCause);
}
else if (failureCause != null)
{
throw failureCause;
}
}
private bool NotClosedOrFailed(TaskCompletionSource<IReceiver> request)
{
if (IsClosed)
{
request.TrySetException(new ClientIllegalStateException("The Receiver was explicitly closed", failureCause));
return false;
}
else if (failureCause != null)
{
request.TrySetException(failureCause);
return false;
}
else
{
return true;
}
}
private void WaitForOpenToComplete()
{
if (!openFuture.Task.IsCompleted || openFuture.Task.IsFaulted)
{
try
{
openFuture.Task.Wait();
}
catch (Exception e)
{
throw failureCause ?? ClientExceptionSupport.CreateNonFatalOrPassthrough(e);
}
}
}
private void ImmediateLinkShutdown(ClientException failureCause)
{
if (this.failureCause == null)
{
this.failureCause = failureCause;
}
try
{
if (protonReceiver.IsRemotelyDetached)
{
protonReceiver.Detach();
}
else
{
protonReceiver.Close();
}
}
catch (Exception)
{
}
if (failureCause != null)
{
openFuture.TrySetException(failureCause);
if (drainingFuture != null)
{
drainingFuture.TrySetException(failureCause);
}
}
else
{
openFuture.TrySetResult(this);
if (drainingFuture != null)
{
drainingFuture.TrySetException(new ClientResourceRemotelyClosedException("The Receiver has been closed"));
}
}
// TODO
// if (drainingTimeout != null)
// {
// drainingTimeout.cancel(false);
// drainingTimeout = null;
// }
closeFuture.TrySetResult(this);
}
#endregion
#region Proton Receiver lifecycle event handlers
private void HandleLocalOpen(Engine.IReceiver receiver)
{
if (options.OpenTimeout > 0)
{
session.Schedule(() =>
{
if (!openFuture.Task.IsCompleted)
{
ImmediateLinkShutdown(
new ClientOperationTimedOutException("Receiver open timed out waiting for remote to respond"));
}
}, TimeSpan.FromMilliseconds(options.OpenTimeout));
}
}
private void HandleLocalCloseOrDetach(Engine.IReceiver receiver)
{
messageQueue.Stop(); // Ensure blocked receivers are all unblocked.
// If not yet remotely closed we only wait for a remote close if the engine isn't
// already failed and we have successfully opened the sender without a timeout.
if (!receiver.Engine.IsShutdown && failureCause == null && receiver.IsRemotelyOpen)
{
long timeout = options.CloseTimeout;
if (timeout > 0)
{
session.ScheduleRequestTimeout(closeFuture, timeout, () =>
new ClientOperationTimedOutException("receiver close timed out waiting for remote to respond"));
}
}
else
{
ImmediateLinkShutdown(failureCause);
}
}
private void HandleRemoteOpen(Engine.IReceiver receiver)
{
// Check for deferred close pending and hold completion if so
if (receiver.RemoteSource != null)
{
remoteSource = new ClientRemoteSource(receiver.RemoteSource);
if (receiver.RemoteTerminus != null)
{
remoteTarget = new ClientRemoteTarget((Types.Messaging.Target)receiver.RemoteTerminus);
}
ReplenishCreditIfNeeded();
_ = openFuture.TrySetResult(this);
LOG.Trace("Receiver opened successfully: {0}", receiverId);
}
else
{
LOG.Debug("Receiver opened but remote signalled close is pending: {0}", receiverId);
}
}
private void HandleRemoteCloseOrDetach(Engine.IReceiver receiver)
{
if (receiver.IsLocallyOpen)
{
ImmediateLinkShutdown(ClientExceptionSupport.ConvertToLinkClosedException(
receiver.RemoteErrorCondition, "Receiver remotely closed without explanation from the remote"));
}
else
{
ImmediateLinkShutdown(failureCause);
}
}
private void HandleParentEndpointClosed(Engine.IReceiver receiver)
{
// Don't react if engine was shutdown and parent closed as a result instead wait to get the
// shutdown notification and respond to that change.
if (receiver.Engine.IsRunning)
{
ClientException failureCause;
if (receiver.Connection.RemoteErrorCondition != null)
{
failureCause = ClientExceptionSupport.ConvertToConnectionClosedException(receiver.Connection.RemoteErrorCondition);
}
else if (receiver.Session.RemoteErrorCondition != null)
{
failureCause = ClientExceptionSupport.ConvertToSessionClosedException(receiver.Session.RemoteErrorCondition);
}
else if (receiver.Engine.FailureCause != null)
{
failureCause = ClientExceptionSupport.ConvertToConnectionClosedException(receiver.Engine.FailureCause);
}
else if (!IsClosed)
{
failureCause = new ClientResourceRemotelyClosedException("Remote closed without a specific error condition");
}
else
{
failureCause = null;
}
ImmediateLinkShutdown(failureCause);
}
}
private void HandleEngineShutdown(Engine.IEngine engine)
{
if (!IsDynamic && !session.ProtonSession.Engine.IsShutdown)
{
uint previousCredit = protonReceiver.Credit + (uint)messageQueue.Count;
messageQueue.Clear(); // Prefetched messages should be discarded.
if (drainingFuture != null)
{
drainingFuture.TrySetResult(this);
// if (drainingTimeout != null)
// {
// drainingTimeout.cancel(false);
// drainingTimeout = null;
// }
}
protonReceiver.LocalCloseHandler(null);
protonReceiver.LocalDetachHandler(null);
protonReceiver.Close();
protonReceiver = ClientReceiverBuilder.RecreateReceiver(session, protonReceiver, options);
protonReceiver.LinkedResource = this;
protonReceiver.AddCredit(previousCredit);
Open();
}
else
{
Engine.IConnection connection = engine.Connection;
ClientException failureCause;
if (connection.RemoteErrorCondition != null)
{
failureCause = ClientExceptionSupport.ConvertToConnectionClosedException(connection.RemoteErrorCondition);
}
else if (engine.FailureCause != null)
{
failureCause = ClientExceptionSupport.ConvertToConnectionClosedException(engine.FailureCause);
}
else if (!IsClosed)
{
failureCause = new ClientConnectionRemotelyClosedException("Remote closed without a specific error condition");
}
else
{
failureCause = null;
}
ImmediateLinkShutdown(failureCause);
}
}
private void HandleDeliveryReceived(IIncomingDelivery delivery)
{
LOG.Trace("Delivery data was received: {0}", delivery);
if (delivery.DefaultDeliveryState == null)
{
delivery.DefaultDeliveryState = Types.Messaging.Released.Instance;
}
if (!delivery.IsPartial)
{
LOG.Trace("{0} has incoming Message(s).", this);
messageQueue.Enqueue(new ClientDelivery(this, delivery));
}
else
{
// The receiver doesn't return a delivery until it has been
// completely received, and to ensure that happens we need to
// claim the partially received bytes so that the session window
// can be reopened if need be and more delivery portions can
// then arrive.
delivery.ClaimAvailableBytes();
}
}
private void HandleDeliveryAborted(IIncomingDelivery delivery)
{
LOG.Trace("Delivery data was aborted: {0}", delivery);
delivery.Settle();
ReplenishCreditIfNeeded();
}
private void HandleDeliveryStateRemotelyUpdated(IIncomingDelivery delivery)
{
LOG.Trace("Delivery remote state was updated: {0}", delivery);
}
private void HandleReceiverCreditUpdated(Engine.IReceiver receiver)
{
LOG.Trace("Receiver credit update by remote: {0}", receiver);
if (drainingFuture != null)
{
if (receiver.Credit == 0)
{
drainingFuture.TrySetResult(this);
// TODO
// if (drainingTimeout != null)
// {
// drainingTimeout.cancel(false);
// drainingTimeout = null;
// }
}
}
}
#endregion
}
}