blob: 751f5477a6ecf79a5a619293651b728e613ba655 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;
using Apache.Qpid.Proton.Client.Exceptions;
using Apache.Qpid.Proton.Engine;
using Apache.Qpid.Proton.Utilities;
namespace Apache.Qpid.Proton.Client.Implementation
{
public sealed class ClientNextReceiverSelector
{
private static readonly string LAST_RETURNED_STATE_KEY = "Last_Returned_State";
private readonly ArrayDeque<TaskCompletionSource<IReceiver>> pending = new();
private readonly Random random = new();
private readonly ClientSession session;
internal ClientNextReceiverSelector(ClientSession session)
{
this.session = session;
HandleReconnect(); // Same processing works for initialization
}
public void NextReceiver(TaskCompletionSource<IReceiver> request, NextReceiverPolicy policy, TimeSpan timeout)
{
Statics.RequireNonNull(policy, "The next receiver selection policy cannot be null");
ClientReceiver result = null;
switch (policy)
{
case NextReceiverPolicy.Random:
result = SelectRandomReceiver();
break;
case NextReceiverPolicy.RoundRobin:
result = SelectNextAvailable();
break;
case NextReceiverPolicy.FirstAvailable:
result = SelectFirstAvailable();
break;
case NextReceiverPolicy.LargestBacklog:
result = SelectLargestBacklog();
break;
case NextReceiverPolicy.SmallestBacklog:
result = SelectSmallestBacklog();
break;
default:
request.TrySetException(new ClientException("Next receiver called with invalid or unknown policy:" + policy));
break;
}
if (result == null)
{
pending.Enqueue(request);
if (timeout > TimeSpan.Zero && timeout < TimeSpan.MaxValue)
{
session.Schedule(() =>
{
pending.Remove(request);
request.TrySetResult(null);
}, timeout);
}
}
else
{
// Track last returned to update state for Round Robin next receiver dispatch
// this effectively ties all policies together in updating the next result from
// a call that requests the round robin fairness policy.
session.ProtonSession.Attachments[LAST_RETURNED_STATE_KEY] = result;
request.TrySetResult(result);
}
}
public void HandleReconnect()
{
session.ProtonSession.DeliveryReadHandler(DeliveryReadHandler);
}
public void HandleShutdown()
{
ClientException cause;
if (session.IsClosed)
{
cause = new ClientIllegalStateException("The Session was explicitly closed", session.FailureCause);
}
else if (session.FailureCause != null)
{
cause = session.FailureCause;
}
else
{
cause = new ClientIllegalStateException("The session was closed without a specific error being provided");
}
foreach (TaskCompletionSource<IReceiver> request in pending)
{
request.TrySetException(cause);
}
pending.Clear();
}
private void DeliveryReadHandler(IIncomingDelivery delivery)
{
// When a new delivery arrives that is completed
if (!pending.IsEmpty && !delivery.IsPartial && !delivery.IsAborted)
{
// We only handle next receiver events for normal client receivers and
// not for stream receiver types etc.
if (delivery.Receiver.LinkedResource is ClientReceiver receiver)
{
// Track last returned to update state for Round Robin next receiver dispatch
delivery.Receiver.Session.Attachments[LAST_RETURNED_STATE_KEY] = receiver;
pending.Dequeue().TrySetResult(receiver);
}
}
}
private ClientReceiver SelectRandomReceiver()
{
IEnumerable<Engine.IReceiver> receivers = session.ProtonSession.Receivers.
Where(r => r.LinkedResource is ClientReceiver receiver && receiver.GetQueuedDeliveries() > 0);
Engine.IReceiver receiver = receivers.ElementAtOrDefault(random.Next(0, receivers.Count()));
return receiver?.LinkedResource as ClientReceiver;
}
private ClientReceiver SelectNextAvailable()
{
ClientReceiver lastReceiver = session.ProtonSession.Attachments.Get<ClientReceiver>(LAST_RETURNED_STATE_KEY, null);
ClientReceiver result = null;
if (lastReceiver != null && !lastReceiver.ProtonReceiver.IsLocallyClosedOrDetached)
{
bool foundLast = false;
foreach (Engine.IReceiver protonReceiver in session.ProtonSession.Receivers)
{
if (protonReceiver.LinkedResource is ClientReceiver candidate)
{
if (foundLast)
{
if (candidate.GetQueuedDeliveries() > 0)
{
result = candidate;
}
}
else
{
foundLast = candidate == lastReceiver;
}
}
}
}
else
{
session.ProtonSession.Attachments[LAST_RETURNED_STATE_KEY] = null;
}
return result ?? SelectFirstAvailable();
}
private ClientReceiver SelectFirstAvailable()
{
Engine.IReceiver receiver =
session.ProtonSession.Receivers.Where(
r => r.LinkedResource is ClientReceiver receiver && receiver.GetQueuedDeliveries() > 0).FirstOrDefault();
return (ClientReceiver)receiver?.LinkedResource;
}
private ClientReceiver SelectLargestBacklog()
{
IEnumerable<Engine.IReceiver> receivers = session.ProtonSession.Receivers.
Where(r => r.LinkedResource is ClientReceiver receiver && receiver.GetQueuedDeliveries() > 0);
ClientReceiver result = null;
foreach (Engine.IReceiver receiver in receivers)
{
ClientReceiver candidate = (ClientReceiver)receiver.LinkedResource;
if (result == null || result.GetQueuedDeliveries() < candidate.GetQueuedDeliveries())
{
result = candidate;
}
}
return result;
}
private ClientReceiver SelectSmallestBacklog()
{
IEnumerable<Engine.IReceiver> receivers = session.ProtonSession.Receivers.
Where(r => r.LinkedResource is ClientReceiver receiver && receiver.GetQueuedDeliveries() > 0);
ClientReceiver result = null;
foreach (Engine.IReceiver receiver in receivers)
{
ClientReceiver candidate = (ClientReceiver)receiver.LinkedResource;
if (result == null || result.GetQueuedDeliveries() > candidate.GetQueuedDeliveries())
{
result = candidate;
}
}
return result;
}
}
}