blob: 62f4a0dc69f60260e6d9c9f6b7b9d7f6746b099f [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.qpid.protonj2.client.impl;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.qpid.protonj2.client.LinkOptions;
import org.apache.qpid.protonj2.client.SenderOptions;
import org.apache.qpid.protonj2.client.SessionOptions;
import org.apache.qpid.protonj2.client.SourceOptions;
import org.apache.qpid.protonj2.client.StreamSenderOptions;
import org.apache.qpid.protonj2.client.TargetOptions;
import org.apache.qpid.protonj2.client.exceptions.ClientException;
import org.apache.qpid.protonj2.engine.Sender;
import org.apache.qpid.protonj2.engine.Session;
import org.apache.qpid.protonj2.engine.impl.ProtonDeliveryTagGenerator;
import org.apache.qpid.protonj2.types.UnsignedInteger;
import org.apache.qpid.protonj2.types.messaging.Outcome;
import org.apache.qpid.protonj2.types.messaging.Source;
import org.apache.qpid.protonj2.types.messaging.Target;
import org.apache.qpid.protonj2.types.messaging.TerminusDurability;
import org.apache.qpid.protonj2.types.messaging.TerminusExpiryPolicy;
import org.apache.qpid.protonj2.types.transactions.Coordinator;
import org.apache.qpid.protonj2.types.transport.ReceiverSettleMode;
import org.apache.qpid.protonj2.types.transport.SenderSettleMode;
/**
* Session owned builder of {@link Sender} objects.
*/
final class ClientSenderBuilder {
private final ClientSession session;
private final SessionOptions sessionOptions;
private final AtomicInteger senderCounter = new AtomicInteger();
private volatile SenderOptions defaultSenderOptions;
private volatile StreamSenderOptions defaultStreamSenderOptions;
ClientSenderBuilder(ClientSession session) {
this.session = session;
this.sessionOptions = session.options();
}
public ClientSender sender(String address, SenderOptions senderOptions) throws ClientException {
final SenderOptions options = senderOptions != null ? senderOptions : getDefaultSenderOptions();
final String senderId = nextSenderId();
final Sender protonSender = createSender(session.getProtonSession(), address, options, senderId);
return new ClientSender(session, options, senderId, protonSender);
}
public ClientSender anonymousSender(SenderOptions senderOptions) throws ClientException {
final SenderOptions options = senderOptions != null ? senderOptions : getDefaultSenderOptions();
final String senderId = nextSenderId();
final Sender protonSender = createSender(session.getProtonSession(), null, options, senderId);
return new ClientSender(session, options, senderId, protonSender);
}
public ClientStreamSender streamSender(String address, StreamSenderOptions senderOptions) throws ClientException {
final StreamSenderOptions options = senderOptions != null ? senderOptions : getDefaultStreamSenderOptions();
final String senderId = nextSenderId();
final Sender protonSender = createSender(session.getProtonSession(), address, options, senderId);
return new ClientStreamSender(session, options, senderId, protonSender);
}
private static Sender createSender(Session protonSession, String address, LinkOptions<?> options, String senderId) {
final String linkName;
if (options.linkName() != null) {
linkName = options.linkName();
} else {
linkName = "sender-" + senderId;
}
final Sender protonSender = protonSession.sender(linkName);
switch (options.deliveryMode()) {
case AT_MOST_ONCE:
protonSender.setSenderSettleMode(SenderSettleMode.SETTLED);
protonSender.setReceiverSettleMode(ReceiverSettleMode.FIRST);
break;
case AT_LEAST_ONCE:
protonSender.setSenderSettleMode(SenderSettleMode.UNSETTLED);
protonSender.setReceiverSettleMode(ReceiverSettleMode.FIRST);
break;
}
protonSender.setOfferedCapabilities(ClientConversionSupport.toSymbolArray(options.offeredCapabilities()));
protonSender.setDesiredCapabilities(ClientConversionSupport.toSymbolArray(options.desiredCapabilities()));
protonSender.setProperties(ClientConversionSupport.toSymbolKeyedMap(options.properties()));
protonSender.setTarget(createTarget(address, options));
protonSender.setSource(createSource(senderId, options));
// Use a tag generator that will reuse old tags. Later we might make this configurable.
if (protonSender.getSenderSettleMode() == SenderSettleMode.SETTLED) {
protonSender.setDeliveryTagGenerator(ProtonDeliveryTagGenerator.BUILTIN.EMPTY.createGenerator());
} else {
protonSender.setDeliveryTagGenerator(ProtonDeliveryTagGenerator.BUILTIN.POOLED.createGenerator());
}
return protonSender;
}
private static Source createSource(String address, LinkOptions<?> options) {
final SourceOptions sourceOptions = options.sourceOptions();
final Source source = new Source();
source.setAddress(address);
source.setOutcomes(ClientConversionSupport.outcomesToSymbols(sourceOptions.outcomes()));
source.setDefaultOutcome((Outcome) ClientDeliveryState.asProtonType(sourceOptions.defaultOutcome()));
source.setCapabilities(ClientConversionSupport.toSymbolArray(sourceOptions.capabilities()));
if (sourceOptions.timeout() >= 0) {
source.setTimeout(UnsignedInteger.valueOf(sourceOptions.timeout()));
}
if (sourceOptions.durabilityMode() != null) {
source.setDurable(ClientConversionSupport.asProtonType(sourceOptions.durabilityMode()));
} else {
source.setDurable(TerminusDurability.NONE);
}
if (sourceOptions.expiryPolicy() != null) {
source.setExpiryPolicy(ClientConversionSupport.asProtonType(sourceOptions.expiryPolicy()));
} else {
source.setExpiryPolicy(TerminusExpiryPolicy.LINK_DETACH);
}
if (sourceOptions.distributionMode() != null) {
source.setDistributionMode(ClientConversionSupport.asProtonType(sourceOptions.distributionMode()));
}
if (sourceOptions.timeout() >= 0) {
source.setTimeout(UnsignedInteger.valueOf(sourceOptions.timeout()));
}
if (sourceOptions.filters() != null) {
source.setFilter(ClientConversionSupport.toSymbolKeyedMap(sourceOptions.filters()));
}
return source;
}
private static Target createTarget(String address, LinkOptions<?> options) {
final TargetOptions targetOptions = options.targetOptions();
final Target target = new Target();
target.setAddress(address);
target.setCapabilities(ClientConversionSupport.toSymbolArray(targetOptions.capabilities()));
if (targetOptions.durabilityMode() != null) {
target.setDurable(ClientConversionSupport.asProtonType(targetOptions.durabilityMode()));
}
if (targetOptions.expiryPolicy() != null) {
target.setExpiryPolicy(ClientConversionSupport.asProtonType(targetOptions.expiryPolicy()));
}
if (targetOptions.timeout() >= 0) {
target.setTimeout(UnsignedInteger.valueOf(targetOptions.timeout()));
}
return target;
}
public static Sender recreateSender(ClientSession session, Sender previousSender, LinkOptions<?> options) {
final Sender protonSender = session.getProtonSession().sender(previousSender.getName());
protonSender.setSource(previousSender.getSource());
if (previousSender.getTarget() instanceof Coordinator) {
protonSender.setTarget((Coordinator) previousSender.getTarget());
} else {
protonSender.setTarget((Target) previousSender.getTarget());
}
protonSender.setDeliveryTagGenerator(previousSender.getDeliveryTagGenerator());
protonSender.setSenderSettleMode(previousSender.getSenderSettleMode());
protonSender.setReceiverSettleMode(previousSender.getReceiverSettleMode());
protonSender.setOfferedCapabilities(ClientConversionSupport.toSymbolArray(options.offeredCapabilities()));
protonSender.setDesiredCapabilities(ClientConversionSupport.toSymbolArray(options.desiredCapabilities()));
protonSender.setProperties(ClientConversionSupport.toSymbolKeyedMap(options.properties()));
return protonSender;
}
private String nextSenderId() {
return session.id() + ":" + senderCounter.incrementAndGet();
}
/*
* Sender options used when none specified by the caller creating a new sender.
*/
private SenderOptions getDefaultSenderOptions() {
SenderOptions senderOptions = defaultSenderOptions;
if (senderOptions == null) {
synchronized (this) {
senderOptions = defaultSenderOptions;
if (senderOptions == null) {
senderOptions = new SenderOptions();
senderOptions.openTimeout(sessionOptions.openTimeout());
senderOptions.closeTimeout(sessionOptions.closeTimeout());
senderOptions.requestTimeout(sessionOptions.requestTimeout());
senderOptions.sendTimeout(sessionOptions.sendTimeout());
}
defaultSenderOptions = senderOptions;
}
}
return senderOptions;
}
/*
* Stream Sender options used when none specified by the caller creating a new sender.
*/
private StreamSenderOptions getDefaultStreamSenderOptions() {
StreamSenderOptions senderOptions = defaultStreamSenderOptions;
if (senderOptions == null) {
synchronized (this) {
senderOptions = defaultStreamSenderOptions;
if (senderOptions == null) {
senderOptions = new StreamSenderOptions();
senderOptions.openTimeout(sessionOptions.openTimeout());
senderOptions.closeTimeout(sessionOptions.closeTimeout());
senderOptions.requestTimeout(sessionOptions.requestTimeout());
senderOptions.sendTimeout(sessionOptions.sendTimeout());
}
defaultStreamSenderOptions = senderOptions;
}
}
return senderOptions;
}
}