blob: 5cec9af46bf04c841b40d66586b9d52216eccda4 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.qpid.jms.provider.amqp.builders;
import static org.apache.qpid.jms.provider.amqp.AmqpSupport.COPY;
import static org.apache.qpid.jms.provider.amqp.AmqpSupport.JMS_NO_LOCAL_SYMBOL;
import static org.apache.qpid.jms.provider.amqp.AmqpSupport.JMS_SELECTOR_SYMBOL;
import static org.apache.qpid.jms.provider.amqp.AmqpSupport.MODIFIED_FAILED;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.Map;
import javax.jms.InvalidDestinationException;
import javax.jms.JMSRuntimeException;
import org.apache.qpid.jms.JmsDestination;
import org.apache.qpid.jms.meta.JmsConsumerInfo;
import org.apache.qpid.jms.provider.amqp.AmqpConnection;
import org.apache.qpid.jms.provider.amqp.AmqpConsumer;
import org.apache.qpid.jms.provider.amqp.AmqpSession;
import org.apache.qpid.jms.provider.amqp.AmqpSubscriptionTracker;
import org.apache.qpid.jms.provider.amqp.AmqpSupport;
import org.apache.qpid.jms.provider.amqp.filters.AmqpJmsNoLocalType;
import org.apache.qpid.jms.provider.amqp.filters.AmqpJmsSelectorType;
import org.apache.qpid.jms.provider.amqp.message.AmqpDestinationHelper;
import org.apache.qpid.proton.amqp.DescribedType;
import org.apache.qpid.proton.amqp.Symbol;
import org.apache.qpid.proton.amqp.messaging.Accepted;
import org.apache.qpid.proton.amqp.messaging.Modified;
import org.apache.qpid.proton.amqp.messaging.Rejected;
import org.apache.qpid.proton.amqp.messaging.Released;
import org.apache.qpid.proton.amqp.messaging.Source;
import org.apache.qpid.proton.amqp.messaging.Target;
import org.apache.qpid.proton.amqp.messaging.TerminusDurability;
import org.apache.qpid.proton.amqp.messaging.TerminusExpiryPolicy;
import org.apache.qpid.proton.amqp.transport.ReceiverSettleMode;
import org.apache.qpid.proton.amqp.transport.SenderSettleMode;
import org.apache.qpid.proton.engine.Receiver;
/**
* Resource builder responsible for creating and opening an AmqpConsumer instance.
*/
public class AmqpConsumerBuilder extends AmqpResourceBuilder<AmqpConsumer, AmqpSession, JmsConsumerInfo, Receiver> {
public AmqpConsumerBuilder(AmqpSession parent, JmsConsumerInfo consumerInfo) {
super(parent, consumerInfo);
}
@Override
protected Receiver createEndpoint(JmsConsumerInfo resourceInfo) {
JmsDestination destination = resourceInfo.getDestination();
String address = AmqpDestinationHelper.INSTANCE.getDestinationAddress(destination, getParent().getConnection());
Source source = new Source();
source.setAddress(address);
Target target = new Target();
configureSource(source);
String receiverLinkName = null;
String subscriptionName = resourceInfo.getSubscriptionName();
if (subscriptionName != null && !subscriptionName.isEmpty()) {
AmqpConnection connection = getParent().getConnection();
if (resourceInfo.isShared() && !connection.getProperties().isSharedSubsSupported()) {
// Don't allow shared sub if peer hasn't said it can handle them (or we haven't overridden it).
throw new JMSRuntimeException("Remote peer does not support shared subscriptions");
}
AmqpSubscriptionTracker subTracker = connection.getSubTracker();
// Validate subscriber type allowed given existing active subscriber types.
if (resourceInfo.isShared() && resourceInfo.isDurable()) {
if(subTracker.isActiveExclusiveDurableSub(subscriptionName)) {
// Don't allow shared sub if there is already an active exclusive durable sub
throw new JMSRuntimeException("A non-shared durable subscription is already active with name '" + subscriptionName + "'");
}
} else if (!resourceInfo.isShared() && resourceInfo.isDurable()) {
if (subTracker.isActiveExclusiveDurableSub(subscriptionName)) {
// Exclusive durable sub is already active
throw new JMSRuntimeException("A non-shared durable subscription is already active with name '" + subscriptionName + "'");
} else if (subTracker.isActiveSharedDurableSub(subscriptionName)) {
// Don't allow exclusive durable sub if there is already an active shared durable sub
throw new JMSRuntimeException("A shared durable subscription is already active with name '" + subscriptionName + "'");
}
}
// Get the link name for the subscription. Throws if certain further validations fail.
receiverLinkName = subTracker.reserveNextSubscriptionLinkName(subscriptionName, resourceInfo);
}
if(receiverLinkName == null) {
receiverLinkName = "qpid-jms:receiver:" + resourceInfo.getId() + ":" + address;
}
Receiver receiver = getParent().getEndpoint().receiver(receiverLinkName);
receiver.setSource(source);
receiver.setTarget(target);
if (resourceInfo.isBrowser() || resourceInfo.isPresettle()) {
receiver.setSenderSettleMode(SenderSettleMode.SETTLED);
} else {
receiver.setSenderSettleMode(SenderSettleMode.UNSETTLED);
}
receiver.setReceiverSettleMode(ReceiverSettleMode.FIRST);
return receiver;
}
@Override
protected void afterClosed(AmqpConsumer resource, JmsConsumerInfo info) {
// If the resource being built is closed during the creation process
// then this is a failure, we need to ensure we don't track it.
AmqpConnection connection = getParent().getConnection();
AmqpSubscriptionTracker subTracker = connection.getSubTracker();
subTracker.consumerRemoved(info);
}
@Override
protected AmqpConsumer createResource(AmqpSession parent, JmsConsumerInfo resourceInfo, Receiver endpoint) {
return new AmqpConsumer(parent, resourceInfo, endpoint);
}
@Override
protected Exception getOpenAbortException() {
// Verify the attach response contained a non-null Source
org.apache.qpid.proton.amqp.transport.Source source = endpoint.getRemoteSource();
if (source != null) {
return super.getOpenAbortException();
} else {
// No link terminus was created, the peer has detach/closed us, create IDE.
return new InvalidDestinationException("Link creation was refused");
}
}
@Override
protected boolean isClosePending() {
// When no link terminus was created, the peer will now detach/close us otherwise
// we need to validate the returned remote source prior to open completion.
return endpoint.getRemoteSource() == null;
}
//----- Internal implementation ------------------------------------------//
private void configureSource(Source source) {
Map<Symbol, DescribedType> filters = new HashMap<Symbol, DescribedType>();
Symbol[] outcomes = new Symbol[]{ Accepted.DESCRIPTOR_SYMBOL, Rejected.DESCRIPTOR_SYMBOL,
Released.DESCRIPTOR_SYMBOL, Modified.DESCRIPTOR_SYMBOL };
if (resourceInfo.isDurable()) {
source.setExpiryPolicy(TerminusExpiryPolicy.NEVER);
source.setDurable(TerminusDurability.UNSETTLED_STATE);
source.setDistributionMode(COPY);
} else {
source.setDurable(TerminusDurability.NONE);
source.setExpiryPolicy(TerminusExpiryPolicy.LINK_DETACH);
}
if (resourceInfo.isBrowser()) {
source.setDistributionMode(COPY);
}
// Capabilities
LinkedList<Symbol> capabilities = new LinkedList<>();
Symbol typeCapability = AmqpDestinationHelper.INSTANCE.toTypeCapability(resourceInfo.getDestination());
if(typeCapability != null){
capabilities.add(typeCapability);
}
if(resourceInfo.isShared()) {
capabilities.add(AmqpSupport.SHARED);
}
if(!resourceInfo.isExplicitClientID()) {
capabilities.add(AmqpSupport.GLOBAL);
}
if(!capabilities.isEmpty()) {
Symbol[] capArray = capabilities.toArray(new Symbol[capabilities.size()]);
source.setCapabilities(capArray);
}
//Outcomes
source.setOutcomes(outcomes);
source.setDefaultOutcome(MODIFIED_FAILED);
// Filters
if (resourceInfo.isNoLocal()) {
filters.put(JMS_NO_LOCAL_SYMBOL, AmqpJmsNoLocalType.NO_LOCAL);
}
if (resourceInfo.getSelector() != null && !resourceInfo.getSelector().trim().equals("")) {
filters.put(JMS_SELECTOR_SYMBOL, new AmqpJmsSelectorType(resourceInfo.getSelector()));
}
if (!filters.isEmpty()) {
source.setFilter(filters);
}
}
}