blob: 5fe46a5329df1202a3f90faca1cc19567d113429 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
using System;
using System.Collections.Generic;
using Apache.Qpid.Proton.Client.Concurrent;
using Apache.Qpid.Proton.Types.Messaging;
using Apache.Qpid.Proton.Types.Transactions;
using Apache.Qpid.Proton.Types.Transport;
namespace Apache.Qpid.Proton.Client.Implementation
{
/// <summary>
/// The receiver builder is used by client session instances to create
/// AMQP receivers and wrap those within a client receiver instance.
/// </summary>
internal class ClientReceiverBuilder
{
private readonly ClientSession session;
private readonly SessionOptions sessionOptions;
private readonly AtomicInteger receiverCounter = new AtomicInteger();
private volatile ReceiverOptions defaultReceiverOptions;
private volatile StreamReceiverOptions defaultStreamReceiverOptions;
public ClientReceiverBuilder(ClientSession session)
{
this.session = session;
this.sessionOptions = session.Options;
}
public ClientReceiver Receiver(string address, ReceiverOptions receiverOptions)
{
ReceiverOptions rcvOptions = receiverOptions != null ? receiverOptions : getDefaultReceiverOptions();
string receiverId = NextReceiverId();
Engine.IReceiver protonReceiver = CreateReceiver(address, rcvOptions, receiverId);
protonReceiver.Source = CreateSource(address, rcvOptions);
protonReceiver.Target = CreateTarget(address, rcvOptions);
return new ClientReceiver(session, rcvOptions, receiverId, protonReceiver);
}
public ClientReceiver DurableReceiver(string address, string subscriptionName, ReceiverOptions receiverOptions)
{
ReceiverOptions options = receiverOptions != null ? receiverOptions : getDefaultReceiverOptions();
string receiverId = NextReceiverId();
options.LinkName = subscriptionName;
Engine.IReceiver protonReceiver = CreateReceiver(address, options, receiverId);
protonReceiver.Source = CreateDurableSource(address, options);
protonReceiver.Target = CreateTarget(address, options);
return new ClientReceiver(session, options, receiverId, protonReceiver);
}
public ClientReceiver DynamicReceiver(IDictionary<string, object> dynamicNodeProperties, ReceiverOptions receiverOptions)
{
ReceiverOptions options = receiverOptions != null ? receiverOptions : getDefaultReceiverOptions();
string receiverId = NextReceiverId();
Engine.IReceiver protonReceiver = CreateReceiver(null, options, receiverId);
protonReceiver.Source = CreateSource(null, options);
protonReceiver.Target = CreateTarget(null, options);
// Configure the dynamic nature of the source now.
protonReceiver.Source.Dynamic = true;
protonReceiver.Source.DynamicNodeProperties = ClientConversionSupport.ToSymbolKeyedMap(dynamicNodeProperties);
return new ClientReceiver(session, options, receiverId, protonReceiver);
}
public ClientStreamReceiver StreamReceiver(string address, StreamReceiverOptions receiverOptions)
{
StreamReceiverOptions options = receiverOptions != null ? receiverOptions : GetDefaultStreamReceiverOptions();
string receiverId = NextReceiverId();
Engine.IReceiver protonReceiver = CreateReceiver(address, options, receiverId);
protonReceiver.Source = CreateSource(address, options);
protonReceiver.Target = CreateTarget(address, options);
return new ClientStreamReceiver(session, options, receiverId, protonReceiver);
}
public static Engine.IReceiver RecreateReceiver(ClientSession session, Engine.IReceiver previousReceiver, ReceiverOptions options)
{
Engine.IReceiver protonReceiver = session.ProtonSession.Receiver(previousReceiver.Name);
protonReceiver.Source = previousReceiver.Source;
if (previousReceiver.Terminus is Coordinator)
{
protonReceiver.Coordinator = protonReceiver.Coordinator;
}
else
{
protonReceiver.Target = protonReceiver.Target;
}
protonReceiver.SenderSettleMode = previousReceiver.SenderSettleMode;
protonReceiver.ReceiverSettleMode = previousReceiver.ReceiverSettleMode;
protonReceiver.OfferedCapabilities = ClientConversionSupport.ToSymbolArray(options.OfferedCapabilities);
protonReceiver.DesiredCapabilities = ClientConversionSupport.ToSymbolArray(options.DesiredCapabilities);
protonReceiver.Properties = ClientConversionSupport.ToSymbolKeyedMap(options.Properties);
protonReceiver.DefaultDeliveryState = Released.Instance;
return protonReceiver;
}
#region Private build helper methods
private string NextReceiverId()
{
return session.SessionId + ":" + receiverCounter.IncrementAndGet();
}
private Engine.IReceiver CreateReceiver(String address, ReceiverOptions options, String receiverId)
{
string linkName = options?.LinkName ?? "receiver-" + receiverId;
Engine.IReceiver protonReceiver = session.ProtonSession.Receiver(linkName);
switch (options.DeliveryMode)
{
case DeliveryMode.AtMostOnce:
protonReceiver.SenderSettleMode = SenderSettleMode.Settled;
protonReceiver.ReceiverSettleMode = ReceiverSettleMode.First;
break;
case DeliveryMode.AtLeastOnce:
protonReceiver.SenderSettleMode = SenderSettleMode.Unsettled;
protonReceiver.ReceiverSettleMode = ReceiverSettleMode.First;
break;
}
protonReceiver.OfferedCapabilities = ClientConversionSupport.ToSymbolArray(options.OfferedCapabilities);
protonReceiver.DesiredCapabilities = ClientConversionSupport.ToSymbolArray(options.DesiredCapabilities);
protonReceiver.Properties = ClientConversionSupport.ToSymbolKeyedMap(options.Properties);
protonReceiver.DefaultDeliveryState = Released.Instance;
return protonReceiver;
}
private Source CreateSource(string address, ReceiverOptions options)
{
SourceOptions sourceOptions = options.SourceOptions;
Source source = new Source();
source.Address = address;
if (sourceOptions.DurabilityMode.HasValue)
{
source.Durable = sourceOptions.DurabilityMode.Value.AsProtonType();
}
else
{
source.Durable = TerminusDurability.None;
}
if (sourceOptions.ExpiryPolicy.HasValue)
{
source.ExpiryPolicy = sourceOptions.ExpiryPolicy.Value.AsProtonType();
}
else
{
source.ExpiryPolicy = TerminusExpiryPolicy.LinkDetach;
}
if (sourceOptions.DistributionMode.HasValue)
{
source.DistributionMode = sourceOptions.DistributionMode.Value.AsProtonType();
}
if (sourceOptions.Timeout.HasValue)
{
source.Timeout = sourceOptions.Timeout.Value;
}
if (sourceOptions.Filters != null)
{
source.Filter = ClientConversionSupport.ToSymbolKeyedMap(sourceOptions.Filters);
}
if (sourceOptions.DefaultOutcome != null)
{
source.DefaultOutcome = (IOutcome)sourceOptions.DefaultOutcome.AsProtonType();
}
else
{
source.DefaultOutcome = (IOutcome)SourceOptions.DefaultReceiverOutcome.AsProtonType();
}
source.Outcomes = ClientConversionSupport.ToSymbolArray(sourceOptions.Outcomes);
source.Capabilities = ClientConversionSupport.ToSymbolArray(sourceOptions.Capabilities);
return source;
}
private Source CreateDurableSource(string address, ReceiverOptions options)
{
SourceOptions sourceOptions = options.SourceOptions;
Source source = new Source();
source.Address = address;
source.Durable = TerminusDurability.UnsettledState;
source.ExpiryPolicy = TerminusExpiryPolicy.Never;
source.DistributionMode = ClientConstants.COPY;
source.Outcomes = ClientConversionSupport.ToSymbolArray(sourceOptions.Outcomes);
source.DefaultOutcome = (IOutcome)(sourceOptions.DefaultOutcome?.AsProtonType());
source.Capabilities = ClientConversionSupport.ToSymbolArray(sourceOptions.Capabilities);
if (sourceOptions.Timeout.HasValue)
{
source.Timeout = sourceOptions.Timeout.Value;
}
if (sourceOptions.Filters != null)
{
source.Filter = ClientConversionSupport.ToSymbolKeyedMap(sourceOptions.Filters);
}
return source;
}
private Target CreateTarget(String address, ReceiverOptions options)
{
TargetOptions targetOptions = options.TargetOptions;
Target target = new Target();
target.Address = address;
target.Capabilities = ClientConversionSupport.ToSymbolArray(targetOptions.Capabilities);
if (targetOptions.DurabilityMode.HasValue)
{
target.Durable = targetOptions.DurabilityMode.Value.AsProtonType();
}
if (targetOptions.ExpiryPolicy.HasValue)
{
target.ExpiryPolicy = targetOptions.ExpiryPolicy.Value.AsProtonType();
}
if (targetOptions.Timeout.HasValue)
{
target.Timeout = targetOptions.Timeout.Value;
}
return target;
}
private ReceiverOptions getDefaultReceiverOptions()
{
ReceiverOptions receiverOptions = defaultReceiverOptions;
if (receiverOptions == null)
{
lock (sessionOptions)
{
receiverOptions = defaultReceiverOptions;
if (receiverOptions == null)
{
receiverOptions = new ReceiverOptions();
receiverOptions.OpenTimeout = sessionOptions.OpenTimeout;
receiverOptions.CloseTimeout = sessionOptions.CloseTimeout;
receiverOptions.RequestTimeout = sessionOptions.RequestTimeout;
receiverOptions.DrainTimeout = sessionOptions.DrainTimeout;
}
defaultReceiverOptions = receiverOptions;
}
}
return receiverOptions;
}
private StreamReceiverOptions GetDefaultStreamReceiverOptions()
{
StreamReceiverOptions receiverOptions = defaultStreamReceiverOptions;
if (receiverOptions == null)
{
lock (sessionOptions)
{
receiverOptions = defaultStreamReceiverOptions;
if (receiverOptions == null)
{
receiverOptions = new StreamReceiverOptions();
receiverOptions.OpenTimeout = sessionOptions.OpenTimeout;
receiverOptions.CloseTimeout = sessionOptions.CloseTimeout;
receiverOptions.RequestTimeout = sessionOptions.RequestTimeout;
receiverOptions.DrainTimeout = sessionOptions.DrainTimeout;
}
defaultStreamReceiverOptions = receiverOptions;
}
}
return receiverOptions;
}
#endregion
}
}