blob: 0d21a7ffe5e60c844856bd28255405780339794b [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
using System;
using Apache.Qpid.Proton.Client.Concurrent;
using Apache.Qpid.Proton.Engine.Implementation;
using Apache.Qpid.Proton.Types.Messaging;
using Apache.Qpid.Proton.Types.Transactions;
using Apache.Qpid.Proton.Types.Transport;
namespace Apache.Qpid.Proton.Client.Implementation
{
/// <summary>
/// The sender builder is used by client session instances to create
/// AMQP senders and wrap those within a client sender instance.
/// </summary>
internal class ClientSenderBuilder
{
private readonly ClientSession session;
private readonly SessionOptions sessionOptions;
private readonly AtomicInteger senderCounter = new AtomicInteger();
private volatile SenderOptions defaultSenderOptions;
private volatile StreamSenderOptions defaultStreamSenderOptions;
public ClientSenderBuilder(ClientSession session)
{
this.session = session;
this.sessionOptions = session.Options;
}
public ClientSender Sender(string address, SenderOptions senderOptions)
{
SenderOptions options = senderOptions != null ? senderOptions : GetDefaultSenderOptions();
string senderId = NextSenderId();
Engine.ISender protonSender = CreateSender(session.ProtonSession, address, options, senderId);
return new ClientSender(session, options, senderId, protonSender);
}
public ClientSender AnonymousSender(SenderOptions senderOptions)
{
SenderOptions options = senderOptions != null ? senderOptions : GetDefaultSenderOptions();
string senderId = NextSenderId();
Engine.ISender protonSender = CreateSender(session.ProtonSession, null, options, senderId);
return new ClientSender(session, options, senderId, protonSender);
}
public ClientStreamSender StreamSender(String address, StreamSenderOptions senderOptions)
{
StreamSenderOptions options = senderOptions != null ? senderOptions : GetDefaultStreamSenderOptions();
string senderId = NextSenderId();
Engine.ISender protonSender = CreateSender(session.ProtonSession, address, options, senderId);
return new ClientStreamSender(session, options, senderId, protonSender);
}
public static Engine.ISender RecreateSender(ClientSession session, Engine.ISender previousSender, SenderOptions options)
{
Engine.ISender protonSender = session.ProtonSession.Sender(previousSender.Name);
protonSender.Source = previousSender.Source;
if (previousSender.Terminus is Coordinator coordinator)
{
protonSender.Coordinator = coordinator;
}
else
{
protonSender.Target = previousSender.Target;
}
protonSender.DeliveryTagGenerator = previousSender.DeliveryTagGenerator;
protonSender.SenderSettleMode = previousSender.SenderSettleMode;
protonSender.ReceiverSettleMode = previousSender.ReceiverSettleMode;
protonSender.OfferedCapabilities = ClientConversionSupport.ToSymbolArray(options.OfferedCapabilities);
protonSender.DesiredCapabilities = ClientConversionSupport.ToSymbolArray(options.DesiredCapabilities);
protonSender.Properties = ClientConversionSupport.ToSymbolKeyedMap(options.Properties);
return protonSender;
}
#region Private sender builder APIs
private string NextSenderId()
{
return session.SessionId + ":" + senderCounter.IncrementAndGet();
}
private static Engine.ISender CreateSender(Engine.ISession protonSession, string address, SenderOptions options, string senderId)
{
string linkName = options?.LinkName ?? "sender-" + senderId;
Engine.ISender protonSender = protonSession.Sender(linkName);
switch (options.DeliveryMode)
{
case DeliveryMode.AtMostOnce:
protonSender.SenderSettleMode = SenderSettleMode.Settled;
protonSender.ReceiverSettleMode = ReceiverSettleMode.First;
break;
case DeliveryMode.AtLeastOnce:
protonSender.SenderSettleMode = SenderSettleMode.Unsettled;
protonSender.ReceiverSettleMode = ReceiverSettleMode.First;
break;
}
protonSender.OfferedCapabilities = ClientConversionSupport.ToSymbolArray(options.OfferedCapabilities);
protonSender.DesiredCapabilities = ClientConversionSupport.ToSymbolArray(options.DesiredCapabilities);
protonSender.Properties = ClientConversionSupport.ToSymbolKeyedMap(options.Properties);
protonSender.Target = CreateTarget(address, options);
protonSender.Source = CreateSource(senderId, options);
// Use a tag generator that will reuse old tags. Later we might make this configurable.
if (protonSender.SenderSettleMode == SenderSettleMode.Settled)
{
protonSender.DeliveryTagGenerator = ProtonDeliveryTagTypes.Empty.NewTagGenerator();
}
else
{
protonSender.DeliveryTagGenerator = ProtonDeliveryTagTypes.Pooled.NewTagGenerator();
}
return protonSender;
}
private static Source CreateSource(string address, SenderOptions options)
{
SourceOptions sourceOptions = options.SourceOptions;
Source source = new Source();
source.Address = address;
if (sourceOptions.DurabilityMode.HasValue)
{
source.Durable = sourceOptions.DurabilityMode.Value.AsProtonType();
}
else
{
source.Durable = TerminusDurability.None;
}
if (sourceOptions.ExpiryPolicy.HasValue)
{
source.ExpiryPolicy = sourceOptions.ExpiryPolicy.Value.AsProtonType();
}
else
{
source.ExpiryPolicy = TerminusExpiryPolicy.LinkDetach;
}
if (sourceOptions.DistributionMode.HasValue)
{
source.DistributionMode = sourceOptions.DistributionMode.Value.AsProtonType();
}
if (sourceOptions.Timeout.HasValue)
{
source.Timeout = sourceOptions.Timeout.Value;
}
if (sourceOptions.Filters != null)
{
source.Filter = ClientConversionSupport.ToSymbolKeyedMap(sourceOptions.Filters);
}
if (sourceOptions.DefaultOutcome != null)
{
source.DefaultOutcome = (IOutcome)sourceOptions.DefaultOutcome.AsProtonType();
}
source.Outcomes = ClientConversionSupport.ToSymbolArray(sourceOptions.Outcomes);
source.Capabilities = ClientConversionSupport.ToSymbolArray(sourceOptions.Capabilities);
return source;
}
private static Target CreateTarget(string address, SenderOptions options)
{
TargetOptions targetOptions = options.TargetOptions;
Target target = new Target();
target.Address = address;
target.Capabilities = ClientConversionSupport.ToSymbolArray(targetOptions.Capabilities);
if (targetOptions.DurabilityMode.HasValue)
{
target.Durable = targetOptions.DurabilityMode.Value.AsProtonType();
}
if (targetOptions.ExpiryPolicy.HasValue)
{
target.ExpiryPolicy = targetOptions.ExpiryPolicy.Value.AsProtonType();
}
if (targetOptions.Timeout.HasValue)
{
target.Timeout = targetOptions.Timeout.Value;
}
return target;
}
private SenderOptions GetDefaultSenderOptions()
{
SenderOptions senderOptions = defaultSenderOptions;
if (senderOptions == null)
{
lock (sessionOptions)
{
senderOptions = defaultSenderOptions;
if (senderOptions == null)
{
senderOptions = new SenderOptions();
senderOptions.OpenTimeout = sessionOptions.OpenTimeout;
senderOptions.CloseTimeout = sessionOptions.CloseTimeout;
senderOptions.RequestTimeout = sessionOptions.RequestTimeout;
senderOptions.SendTimeout = sessionOptions.SendTimeout;
}
defaultSenderOptions = senderOptions;
}
}
return senderOptions;
}
private StreamSenderOptions GetDefaultStreamSenderOptions()
{
StreamSenderOptions senderOptions = defaultStreamSenderOptions;
if (senderOptions == null)
{
lock (sessionOptions)
{
senderOptions = defaultStreamSenderOptions;
if (senderOptions == null)
{
senderOptions = new StreamSenderOptions();
senderOptions.OpenTimeout = sessionOptions.OpenTimeout;
senderOptions.CloseTimeout = sessionOptions.CloseTimeout;
senderOptions.RequestTimeout = sessionOptions.RequestTimeout;
senderOptions.SendTimeout = sessionOptions.SendTimeout;
}
defaultStreamSenderOptions = senderOptions;
}
}
return senderOptions;
}
#endregion
}
}