blob: bc303a219d21b9bc02fdcaca973adac577db1d99 [file] [log] [blame]
/*
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
*/
package org.apache.qpid.server.exchange;
import org.apache.log4j.Logger;
import org.apache.qpid.AMQException;
import org.apache.qpid.common.AMQPFilterTypes;
import org.apache.qpid.protocol.AMQConstant;
import org.apache.qpid.exchange.ExchangeDefaults;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.FieldTable;
import org.apache.qpid.framing.AMQShortStringTokenizer;
import org.apache.qpid.server.management.MBeanConstructor;
import org.apache.qpid.server.management.MBeanDescription;
import org.apache.qpid.server.queue.IncomingMessage;
import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.virtualhost.VirtualHost;
import org.apache.qpid.server.exchange.topic.TopicParser;
import org.apache.qpid.server.exchange.topic.TopicMatcherResult;
import org.apache.qpid.server.filter.MessageFilter;
import org.apache.qpid.server.filter.JMSSelectorFilter;
import javax.management.JMException;
import javax.management.MBeanException;
import javax.management.openmbean.CompositeData;
import javax.management.openmbean.CompositeDataSupport;
import javax.management.openmbean.OpenDataException;
import javax.management.openmbean.TabularData;
import javax.management.openmbean.TabularDataSupport;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.lang.ref.WeakReference;
public class TopicExchange extends AbstractExchange
{
public static final ExchangeType<TopicExchange> TYPE = new ExchangeType<TopicExchange>()
{
public AMQShortString getName()
{
return ExchangeDefaults.TOPIC_EXCHANGE_CLASS;
}
public Class<TopicExchange> getExchangeClass()
{
return TopicExchange.class;
}
public TopicExchange newInstance(VirtualHost host,
AMQShortString name,
boolean durable,
int ticket,
boolean autoDelete) throws AMQException
{
TopicExchange exch = new TopicExchange();
exch.initialise(host, name, durable, ticket, autoDelete);
return exch;
}
public AMQShortString getDefaultExchangeName()
{
return ExchangeDefaults.TOPIC_EXCHANGE_NAME;
}
};
private static final Logger _logger = Logger.getLogger(TopicExchange.class);
/*
private final ConcurrentHashMap<AMQShortString, List<AMQQueue>> _bindingKey2queues =
new ConcurrentHashMap<AMQShortString, List<AMQQueue>>();
private final ConcurrentHashMap<AMQShortString, List<AMQQueue>> _simpleBindingKey2queues =
new ConcurrentHashMap<AMQShortString, List<AMQQueue>>();
private final ConcurrentHashMap<AMQShortString, List<AMQQueue>> _wildCardBindingKey2queues =
new ConcurrentHashMap<AMQShortString, List<AMQQueue>>();
*/
// private ConcurrentHashMap<AMQShortString, AMQQueue> _routingKey2queue = new ConcurrentHashMap<AMQShortString, AMQQueue>();
private static final byte TOPIC_SEPARATOR = (byte)'.';
private static final AMQShortString TOPIC_SEPARATOR_AS_SHORTSTRING = new AMQShortString(".");
private static final AMQShortString AMQP_STAR_TOKEN = new AMQShortString("*");
private static final AMQShortString AMQP_HASH_TOKEN = new AMQShortString("#");
private static final byte HASH_BYTE = (byte)'#';
private static final byte STAR_BYTE = (byte)'*';
private final TopicParser _parser = new TopicParser();
private final Map<AMQShortString, TopicExchangeResult> _topicExchangeResults =
new ConcurrentHashMap<AMQShortString, TopicExchangeResult>();
private final Map<Binding, FieldTable> _bindings = new HashMap<Binding, FieldTable>();
private final Map<String, WeakReference<JMSSelectorFilter<RuntimeException>>> _selectorCache = new WeakHashMap<String, WeakReference<JMSSelectorFilter<RuntimeException>>>();
public static class Binding
{
private final AMQShortString _bindingKey;
private final AMQQueue _queue;
private final FieldTable _args;
public Binding(AMQShortString bindingKey, AMQQueue queue, FieldTable args)
{
_bindingKey = bindingKey;
_queue = queue;
_args = args;
}
public AMQShortString getBindingKey()
{
return _bindingKey;
}
public AMQQueue getQueue()
{
return _queue;
}
public int hashCode()
{
return (_bindingKey == null ? 1 : _bindingKey.hashCode())*31 +_queue.hashCode();
}
public boolean equals(Object o)
{
if(this == o)
{
return true;
}
if(o instanceof Binding)
{
Binding other = (Binding) o;
return (_queue == other._queue)
&& ((_bindingKey == null) ? other._bindingKey == null : _bindingKey.equals(other._bindingKey));
}
return false;
}
}
private final class TopicExchangeResult implements TopicMatcherResult
{
private final Map<AMQQueue, Integer> _unfilteredQueues = new ConcurrentHashMap<AMQQueue, Integer>();
private final ConcurrentHashMap<AMQQueue, Map<MessageFilter<RuntimeException>,Integer>> _filteredQueues = new ConcurrentHashMap<AMQQueue, Map<MessageFilter<RuntimeException>, Integer>>();
public void addUnfilteredQueue(AMQQueue queue)
{
Integer instances = _unfilteredQueues.get(queue);
if(instances == null)
{
_unfilteredQueues.put(queue, 1);
}
else
{
_unfilteredQueues.put(queue, instances + 1);
}
}
public void removeUnfilteredQueue(AMQQueue queue)
{
Integer instances = _unfilteredQueues.get(queue);
if(instances == 1)
{
_unfilteredQueues.remove(queue);
}
else
{
_unfilteredQueues.put(queue,instances - 1);
}
}
public void addFilteredQueue(AMQQueue queue, MessageFilter<RuntimeException> filter)
{
Map<MessageFilter<RuntimeException>,Integer> filters = _filteredQueues.get(queue);
if(filters == null)
{
filters = new ConcurrentHashMap<MessageFilter<RuntimeException>,Integer>();
_filteredQueues.put(queue, filters);
}
Integer instances = filters.get(filter);
if(instances == null)
{
filters.put(filter,1);
}
else
{
filters.put(filter, instances + 1);
}
}
public void removeFilteredQueue(AMQQueue queue, MessageFilter<RuntimeException> filter)
{
Map<MessageFilter<RuntimeException>,Integer> filters = _filteredQueues.get(queue);
if(filters != null)
{
Integer instances = filters.get(filter);
if(instances != null)
{
if(instances == 1)
{
filters.remove(filter);
if(filters.isEmpty())
{
_filteredQueues.remove(queue);
}
}
else
{
filters.put(filter, instances - 1);
}
}
}
}
public void replaceQueueFilter(AMQQueue queue,
MessageFilter<RuntimeException> oldFilter,
MessageFilter<RuntimeException> newFilter)
{
Map<MessageFilter<RuntimeException>,Integer> filters = _filteredQueues.get(queue);
Map<MessageFilter<RuntimeException>,Integer> newFilters = new ConcurrentHashMap<MessageFilter<RuntimeException>,Integer>(filters);
Integer oldFilterInstances = filters.get(oldFilter);
if(oldFilterInstances == 1)
{
newFilters.remove(oldFilter);
}
else
{
newFilters.put(oldFilter, oldFilterInstances-1);
}
Integer newFilterInstances = filters.get(newFilter);
if(newFilterInstances == null)
{
newFilters.put(newFilter, 1);
}
else
{
newFilters.put(newFilter, newFilterInstances+1);
}
_filteredQueues.put(queue,newFilters);
}
public Collection<AMQQueue> processMessage(IncomingMessage msg, Collection<AMQQueue> queues)
{
if(queues == null)
{
if(_filteredQueues.isEmpty())
{
return new ArrayList<AMQQueue>(_unfilteredQueues.keySet());
}
else
{
queues = new HashSet<AMQQueue>();
}
}
else if(!(queues instanceof Set))
{
queues = new HashSet<AMQQueue>(queues);
}
queues.addAll(_unfilteredQueues.keySet());
if(!_filteredQueues.isEmpty())
{
for(Map.Entry<AMQQueue, Map<MessageFilter<RuntimeException>, Integer>> entry : _filteredQueues.entrySet())
{
if(!queues.contains(entry.getKey()))
{
for(MessageFilter<RuntimeException> filter : entry.getValue().keySet())
{
if(filter.matches(msg))
{
queues.add(entry.getKey());
}
}
}
}
}
return queues;
}
}
/** TopicExchangeMBean class implements the management interface for the Topic exchanges. */
@MBeanDescription("Management Bean for Topic Exchange")
private final class TopicExchangeMBean extends ExchangeMBean
{
@MBeanConstructor("Creates an MBean for AMQ topic exchange")
public TopicExchangeMBean() throws JMException
{
super();
_exchangeType = "topic";
init();
}
/** returns exchange bindings in tabular form */
public TabularData bindings() throws OpenDataException
{
_bindingList = new TabularDataSupport(_bindinglistDataType);
Map<String, List<String>> bindingData = new HashMap<String, List<String>>();
for (Binding binding : _bindings.keySet())
{
String key = binding.getBindingKey().toString();
List<String> queueNames = bindingData.get(key);
if(queueNames == null)
{
queueNames = new ArrayList<String>();
bindingData.put(key, queueNames);
}
queueNames.add(binding.getQueue().getName().toString());
}
for(Map.Entry<String, List<String>> entry : bindingData.entrySet())
{
Object[] bindingItemValues = {entry.getKey(), entry.getValue().toArray(new String[entry.getValue().size()]) };
CompositeData bindingCompositeData = new CompositeDataSupport(_bindingDataType, _bindingItemNames, bindingItemValues);
_bindingList.put(bindingCompositeData);
}
return _bindingList;
}
public void createNewBinding(String queueName, String binding) throws JMException
{
AMQQueue queue = getQueueRegistry().getQueue(new AMQShortString(queueName));
if (queue == null)
{
throw new JMException("Queue \"" + queueName + "\" is not registered with the exchange.");
}
try
{
queue.bind(TopicExchange.this, new AMQShortString(binding), null);
}
catch (AMQException ex)
{
throw new MBeanException(ex);
}
}
} // End of MBean class
public AMQShortString getType()
{
return ExchangeDefaults.TOPIC_EXCHANGE_CLASS;
}
public synchronized void registerQueue(AMQShortString rKey, AMQQueue queue, FieldTable args) throws AMQException
{
assert queue != null;
assert rKey != null;
_logger.debug("Registering queue " + queue.getName() + " with routing key " + rKey);
AMQShortString routingKey;
if(rKey.contains(HASH_BYTE) || rKey.contains(STAR_BYTE))
{
routingKey = normalize(rKey);
}
else
{
routingKey = rKey;
}
Binding binding = new Binding(rKey, queue, args);
if(_bindings.containsKey(binding))
{
FieldTable oldArgs = _bindings.get(binding);
TopicExchangeResult result = _topicExchangeResults.get(routingKey);
if(argumentsContainSelector(args))
{
if(argumentsContainSelector(oldArgs))
{
result.replaceQueueFilter(queue,createSelectorFilter(oldArgs), createSelectorFilter(args));
}
else
{
result.addFilteredQueue(queue,createSelectorFilter(args));
result.removeUnfilteredQueue(queue);
}
}
else
{
if(argumentsContainSelector(oldArgs))
{
result.addUnfilteredQueue(queue);
result.removeFilteredQueue(queue, createSelectorFilter(oldArgs));
}
else
{
// TODO - fix control flow
return;
}
}
}
else
{
TopicExchangeResult result = _topicExchangeResults.get(routingKey);
if(result == null)
{
result = new TopicExchangeResult();
if(argumentsContainSelector(args))
{
result.addFilteredQueue(queue, createSelectorFilter(args));
}
else
{
result.addUnfilteredQueue(queue);
}
_parser.addBinding(routingKey, result);
_topicExchangeResults.put(routingKey,result);
}
else
{
if(argumentsContainSelector(args))
{
result.addFilteredQueue(queue, createSelectorFilter(args));
}
else
{
result.addUnfilteredQueue(queue);
}
}
_bindings.put(binding, args);
}
}
private JMSSelectorFilter<RuntimeException> createSelectorFilter(final FieldTable args)
throws AMQException
{
final String selectorString = args.getString(AMQPFilterTypes.JMS_SELECTOR.getValue());
WeakReference<JMSSelectorFilter<RuntimeException>> selectorRef = _selectorCache.get(selectorString);
JMSSelectorFilter selector = null;
if(selectorRef == null || (selector = selectorRef.get())==null)
{
selector = new JMSSelectorFilter<RuntimeException>(selectorString);
_selectorCache.put(selectorString, new WeakReference<JMSSelectorFilter<RuntimeException>>(selector));
}
return selector;
}
private static boolean argumentsContainSelector(final FieldTable args)
{
return args != null && args.containsKey(AMQPFilterTypes.JMS_SELECTOR.getValue()) && args.getString(AMQPFilterTypes.JMS_SELECTOR.getValue()).trim().length() != 0;
}
private AMQShortString normalize(AMQShortString routingKey)
{
if(routingKey == null)
{
routingKey = AMQShortString.EMPTY_STRING;
}
AMQShortStringTokenizer routingTokens = routingKey.tokenize(TOPIC_SEPARATOR);
List<AMQShortString> subscriptionList = new ArrayList<AMQShortString>();
while (routingTokens.hasMoreTokens())
{
subscriptionList.add(routingTokens.nextToken());
}
int size = subscriptionList.size();
for (int index = 0; index < size; index++)
{
// if there are more levels
if ((index + 1) < size)
{
if (subscriptionList.get(index).equals(AMQP_HASH_TOKEN))
{
if (subscriptionList.get(index + 1).equals(AMQP_HASH_TOKEN))
{
// we don't need #.# delete this one
subscriptionList.remove(index);
size--;
// redo this normalisation
index--;
}
if (subscriptionList.get(index + 1).equals(AMQP_STAR_TOKEN))
{
// we don't want #.* swap to *.#
// remove it and put it in at index + 1
subscriptionList.add(index + 1, subscriptionList.remove(index));
}
}
} // if we have more levels
}
AMQShortString normalizedString = AMQShortString.join(subscriptionList, TOPIC_SEPARATOR_AS_SHORTSTRING);
return normalizedString;
}
public void route(IncomingMessage payload) throws AMQException
{
final AMQShortString routingKey = payload.getRoutingKey();
// The copy here is unfortunate, but not too bad relevant to the amount of
// things created and copied in getMatchedQueues
ArrayList<AMQQueue> queues = new ArrayList<AMQQueue>();
queues.addAll(getMatchedQueues(payload, routingKey));
if(queues == null || queues.isEmpty())
{
_logger.info("Message routing key: " + payload.getRoutingKey() + " No routes.");
}
payload.enqueue(queues);
}
public boolean isBound(AMQShortString routingKey, FieldTable arguments, AMQQueue queue)
{
Binding binding = new Binding(routingKey, queue, arguments);
if (arguments == null)
{
return _bindings.containsKey(binding);
}
else
{
FieldTable o = _bindings.get(binding);
if (o != null)
{
return o.equals(arguments);
}
else
{
return false;
}
}
}
public boolean isBound(AMQShortString routingKey, AMQQueue queue)
{
return isBound(routingKey, null, queue);
}
public boolean isBound(AMQShortString routingKey)
{
for(Binding b : _bindings.keySet())
{
if(b.getBindingKey().equals(routingKey))
{
return true;
}
}
return false;
}
public boolean isBound(AMQQueue queue)
{
for(Binding b : _bindings.keySet())
{
if(b.getQueue().equals(queue))
{
return true;
}
}
return false;
}
public boolean hasBindings()
{
return !_bindings.isEmpty();
}
public synchronized void deregisterQueue(AMQShortString rKey, AMQQueue queue, FieldTable args) throws AMQException
{
assert queue != null;
assert rKey != null;
Binding binding = new Binding(rKey, queue, args);
if (!_bindings.containsKey(binding))
{
throw new AMQException(AMQConstant.NOT_FOUND, "Queue " + queue.getName() + " was not registered with exchange " + this.getName()
+ " with routing key " + rKey + ".");
}
FieldTable bindingArgs = _bindings.remove(binding);
AMQShortString bindingKey = normalize(rKey);
TopicExchangeResult result = _topicExchangeResults.get(bindingKey);
if(argumentsContainSelector(bindingArgs))
{
result.removeFilteredQueue(queue, createSelectorFilter(bindingArgs));
}
else
{
result.removeUnfilteredQueue(queue);
}
}
protected ExchangeMBean createMBean() throws AMQException
{
try
{
return new TopicExchangeMBean();
}
catch (JMException ex)
{
_logger.error("Exception occured in creating the topic exchenge mbean", ex);
throw new AMQException("Exception occured in creating the topic exchenge mbean", ex);
}
}
private Collection<AMQQueue> getMatchedQueues(IncomingMessage message, AMQShortString routingKey)
{
Collection<TopicMatcherResult> results = _parser.parse(routingKey);
if(results.isEmpty())
{
return Collections.EMPTY_SET;
}
else
{
Collection<AMQQueue> queues = results.size() == 1 ? null : new HashSet<AMQQueue>();
for(TopicMatcherResult result : results)
{
queues = ((TopicExchangeResult)result).processMessage(message, queues);
}
return queues;
}
}
}