blob: b9195220d49216f45c044867b8cc2589a319f1d3 [file] [log] [blame]
/*
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
*/
package org.apache.qpid.messaging.address.amqp_0_10;
import java.util.Map;
import java.util.UUID;
import org.apache.qpid.messaging.Address;
import org.apache.qpid.messaging.Session;
import org.apache.qpid.messaging.SubscriptionSettings;
import org.apache.qpid.messaging.Address.PolicyType;
import org.apache.qpid.messaging.QpidDestination.CheckMode;
import org.apache.qpid.messaging.QpidTopic;
import org.apache.qpid.messaging.address.AddressException;
import org.apache.qpid.messaging.address.AddressHelper;
import org.apache.qpid.messaging.address.Link;
import org.apache.qpid.messaging.address.Link.Reliability;
import org.apache.qpid.messaging.amqp_0_10.Session_0_10;
import org.apache.qpid.transport.ExchangeQueryResult;
import org.apache.qpid.transport.ExecutionErrorCode;
import org.apache.qpid.transport.MessageAcceptMode;
import org.apache.qpid.transport.Option;
import org.apache.qpid.transport.SessionException;
public class QpidTopic_0_10 extends QpidTopic
{
private String exchangeName;
private Session ssn;
private Address address;
private ExchangeNode exchange;
private Link_0_10 link;
private String subscriptionQueue;
public QpidTopic_0_10(Address address,ExchangeNode exchange,Link_0_10 link) throws Exception
{
if (Reliability.AT_LEAST_ONCE == link.getReliability())
{
throw new Exception("AT-LEAST-ONCE is not yet supported for Topics");
}
this.address = address;
this.exchange = exchange;
this.link = link;
this.exchangeName = address.getName();
topicName = retrieveTopicName();
}
@Override
public void checkCreate(Session session,CheckMode mode) throws Exception
{
org.apache.qpid.transport.Session ssn = ((Session_0_10)session).getProtocolSession();
if (AddressHelper.isAllowed(exchange.getCreatePolicy(), mode))
{
ssn.exchangeDeclare(exchangeName,
exchange.getExchangeType(),
exchange.getAltExchange(),
exchange.getDeclareArgs(),
exchangeName.toString().startsWith("amq.") ?
Option.PASSIVE : Option.NONE);
}
else
{
try
{
ssn.exchangeDeclare(exchangeName, null, null, null, Option.PASSIVE);
ssn.sync();
}
catch(SessionException e)
{
if (e.getException() != null
&& e.getException().getErrorCode() == ExecutionErrorCode.NOT_FOUND)
{
throw new AddressException("The exchange '" + exchangeName +"' does not exist",e);
}
else
{
throw e;
}
}
}
}
@Override
public void checkAssert(Session session,CheckMode mode) throws Exception
{
org.apache.qpid.transport.Session ssn = ((Session_0_10)session).getProtocolSession();
if (AddressHelper.isAllowed(exchange.getAssertPolicy(), mode))
{
ExchangeQueryResult result = ssn.exchangeQuery(exchangeName, Option.NONE).get();
boolean match = !result.getNotFound() &&
(result.getDurable() == exchange.isDurable()) &&
(exchange.getExchangeType() != null && exchange.getExchangeType().equals(result.getType())) &&
(exchange.matchProps(result.getArguments()));
if (!match)
{
throw new AddressException("The exchange described by the address does not exist on the broker");
}
}
}
@Override
public void checkDelete(Session session,CheckMode mode) throws Exception
{
org.apache.qpid.transport.Session ssn = ((Session_0_10)session).getProtocolSession();
if (AddressHelper.isAllowed(exchange.getDeletePolicy(), mode) &&
!exchangeName.toString().startsWith("amq."))
{
ssn.exchangeDelete(exchangeName, Option.NONE);
ssn.sync();
}
}
@Override
public void createSubscription(Session session,SubscriptionSettings settings) throws Exception
{
org.apache.qpid.transport.Session ssn = ((Session_0_10)session).getProtocolSession();
subscriptionQueue = getSubscriptionQueueName();
// for topics subscriptions queues are exclusive by default.
boolean exclusive =
(link.isQueueExclusive() == null) ? true : link.getSubscription().isExclusive();
// for topics subscriptions are autoDelete by default.
boolean autoDelete =
(link.getSubscription().isExclusive() == null) ? true : link.getSubscription().isExclusive();
ssn.queueDeclare(subscriptionQueue,
link.getQueueAltExchange(),
link.getQueueDeclareArgs(),
autoDelete ? Option.AUTO_DELETE : Option.NONE,
link.isDurable() ? Option.DURABLE : Option.NONE,
exclusive ? Option.EXCLUSIVE : Option.NONE);
if (link.getQueueBindings().size() > 0)
{
for (Binding binding: link.getQueueBindings())
{
String queue = binding.getQueue() == null?
subscriptionQueue: binding.getQueue();
String exchange = binding.getExchange() == null ?
address.getName() :
binding.getExchange();
ssn.exchangeBind(queue,
exchange,
binding.getBindingKey(),
binding.getArgs());
}
}
else
{
String subject = address.getSubject();
ssn.exchangeBind(subscriptionQueue,
address.getName(),
subject == null || subject.trim().equals("") ? "#" : subject,
null);
}
SubscriptionSettings_0_10 settings_0_10 = (SubscriptionSettings_0_10)settings;
Map<String,Object> arguments = settings_0_10.getArgs();
arguments.putAll((Map<? extends String, ? extends Object>)link.getSubscription().getArgs());
if (link.getReliability() == Reliability.UNRELIABLE ||
link.getReliability() == Reliability.AT_MOST_ONCE)
{
settings_0_10.setAcceptMode(MessageAcceptMode.NONE);
}
// for topics subscriptions are exclusive by default.
boolean exclusiveConsume =
(link.getSubscription().isExclusive() == null) ? true : link.getSubscription().isExclusive();
ssn.messageSubscribe(subscriptionQueue,
settings_0_10.getSubscriptionTag(),
settings_0_10.getAcceptMode(),
settings_0_10.getAccquireMode(),
null, // resume id
0, // resume ttl
arguments,
exclusiveConsume ? Option.EXCLUSIVE : Option.NONE);
}
@Override
public void deleteSubscription(Session session) throws Exception
{
org.apache.qpid.transport.Session ssn = ((Session_0_10)session).getProtocolSession();
if (link.getQueueBindings().size() > 0)
{
for (Binding binding: link.getQueueBindings())
{
String queue = binding.getQueue() == null?
subscriptionQueue: binding.getQueue();
String exchange = binding.getExchange() == null ?
"anq.topic" :
binding.getExchange();
ssn.exchangeUnbind(queue,
exchange,
binding.getBindingKey(),
Option.NONE);
}
}
ssn.queueDelete(subscriptionQueue, Option.NONE);
}
private String getSubscriptionQueueName()
{
if (link.getName() == null)
{
return "TempQueue" + UUID.randomUUID();
}
else
{
return link.getName();
}
}
private String retrieveTopicName()
{
if (address.getSubject() != null && !address.getSubject().trim().equals(""))
{
return address.getSubject();
}
else if ("topic".equals(exchange.getExchangeType()))
{
return "#";
}
else
{
return "";
}
}
public String getSubscriptionQueue()
{
return subscriptionQueue;
}
public long getConsumerCapacity()
{
return link.getConsumerCapacity();
}
public long getProducerCapacity()
{
return link.getProducerCapacity();
}
public String toString()
{
return address.toString();
}
}