blob: 6d548be508661ca860841a916d2be92ab052efef [file] [log] [blame]
/*
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
*/
package org.apache.qpid.server.exchange;
import java.lang.ref.WeakReference;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.WeakHashMap;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.log4j.Logger;
import org.apache.qpid.AMQInvalidArgumentException;
import org.apache.qpid.common.AMQPFilterTypes;
import org.apache.qpid.filter.SelectorParsingException;
import org.apache.qpid.filter.selector.ParseException;
import org.apache.qpid.filter.selector.TokenMgrError;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.FieldTable;
import org.apache.qpid.server.binding.Binding;
import org.apache.qpid.server.exchange.topic.TopicExchangeResult;
import org.apache.qpid.server.exchange.topic.TopicMatcherResult;
import org.apache.qpid.server.exchange.topic.TopicNormalizer;
import org.apache.qpid.server.exchange.topic.TopicParser;
import org.apache.qpid.server.filter.JMSSelectorFilter;
import org.apache.qpid.server.filter.MessageFilter;
import org.apache.qpid.server.message.InboundMessage;
import org.apache.qpid.server.plugin.ExchangeType;
import org.apache.qpid.server.protocol.AMQSessionModel;
import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.queue.BaseQueue;
import org.apache.qpid.server.queue.Filterable;
public class TopicExchange extends AbstractExchange
{
public static final ExchangeType<TopicExchange> TYPE = new TopicExchangeType();
private static final Logger _logger = Logger.getLogger(TopicExchange.class);
private final TopicParser _parser = new TopicParser();
private final Map<AMQShortString, TopicExchangeResult> _topicExchangeResults =
new ConcurrentHashMap<AMQShortString, TopicExchangeResult>();
private final Map<Binding, FieldTable> _bindings = new HashMap<Binding, FieldTable>();
private final Map<String, WeakReference<JMSSelectorFilter>> _selectorCache = new WeakHashMap<String, WeakReference<JMSSelectorFilter>>();
public TopicExchange()
{
super(TYPE);
}
protected synchronized void registerQueue(final Binding binding) throws AMQInvalidArgumentException
{
AMQShortString rKey = new AMQShortString(binding.getBindingKey()) ;
AMQQueue queue = binding.getQueue();
FieldTable args = FieldTable.convertToFieldTable(binding.getArguments());
assert queue != null;
assert rKey != null;
_logger.debug("Registering queue " + queue.getNameShortString() + " with routing key " + rKey);
AMQShortString routingKey = TopicNormalizer.normalize(rKey);
if(_bindings.containsKey(binding))
{
FieldTable oldArgs = _bindings.get(binding);
TopicExchangeResult result = _topicExchangeResults.get(routingKey);
if(argumentsContainFilter(args))
{
if(argumentsContainFilter(oldArgs))
{
result.replaceQueueFilter(queue,
createMessageFilter(oldArgs, queue),
createMessageFilter(args, queue));
}
else
{
result.addFilteredQueue(queue, createMessageFilter(args, queue));
result.removeUnfilteredQueue(queue);
}
}
else
{
if(argumentsContainFilter(oldArgs))
{
result.addUnfilteredQueue(queue);
result.removeFilteredQueue(queue, createMessageFilter(oldArgs, queue));
}
else
{
// TODO - fix control flow
return;
}
}
result.addBinding(binding);
}
else
{
TopicExchangeResult result = _topicExchangeResults.get(routingKey);
if(result == null)
{
result = new TopicExchangeResult();
if(argumentsContainFilter(args))
{
result.addFilteredQueue(queue, createMessageFilter(args, queue));
}
else
{
result.addUnfilteredQueue(queue);
}
_parser.addBinding(routingKey, result);
_topicExchangeResults.put(routingKey,result);
}
else
{
if(argumentsContainFilter(args))
{
result.addFilteredQueue(queue, createMessageFilter(args, queue));
}
else
{
result.addUnfilteredQueue(queue);
}
}
result.addBinding(binding);
_bindings.put(binding, args);
}
}
private MessageFilter createMessageFilter(final FieldTable args, AMQQueue queue) throws AMQInvalidArgumentException
{
if(argumentsContainNoLocal(args))
{
MessageFilter filter = new NoLocalFilter(queue);
if(argumentsContainJMSSelector(args))
{
filter = new CompoundFilter(filter, createJMSSelectorFilter(args));
}
return filter;
}
else
{
return createJMSSelectorFilter(args);
}
}
private MessageFilter createJMSSelectorFilter(FieldTable args) throws AMQInvalidArgumentException
{
final String selectorString = args.getString(AMQPFilterTypes.JMS_SELECTOR.getValue());
WeakReference<JMSSelectorFilter> selectorRef = _selectorCache.get(selectorString);
JMSSelectorFilter selector = null;
if(selectorRef == null || (selector = selectorRef.get())==null)
{
try
{
selector = new JMSSelectorFilter(selectorString);
}
catch (ParseException e)
{
throw new AMQInvalidArgumentException("Cannot parse JMS selector \"" + selectorString + "\"", e);
}
catch (SelectorParsingException e)
{
throw new AMQInvalidArgumentException("Cannot parse JMS selector \"" + selectorString + "\"", e);
}
catch (TokenMgrError e)
{
throw new AMQInvalidArgumentException("Cannot parse JMS selector \"" + selectorString + "\"", e);
}
_selectorCache.put(selectorString, new WeakReference<JMSSelectorFilter>(selector));
}
return selector;
}
private static boolean argumentsContainFilter(final FieldTable args)
{
return argumentsContainNoLocal(args) || argumentsContainJMSSelector(args);
}
private static boolean argumentsContainNoLocal(final FieldTable args)
{
return args != null
&& args.containsKey(AMQPFilterTypes.NO_LOCAL.getValue())
&& Boolean.TRUE.equals(args.get(AMQPFilterTypes.NO_LOCAL.getValue()));
}
private static boolean argumentsContainJMSSelector(final FieldTable args)
{
return args != null && (args.containsKey(AMQPFilterTypes.JMS_SELECTOR.getValue())
&& args.getString(AMQPFilterTypes.JMS_SELECTOR.getValue()).trim().length() != 0);
}
public ArrayList<BaseQueue> doRoute(InboundMessage payload)
{
final AMQShortString routingKey = payload.getRoutingKeyShortString() == null
? AMQShortString.EMPTY_STRING
: payload.getRoutingKeyShortString();
final Collection<AMQQueue> matchedQueues = getMatchedQueues(payload, routingKey);
ArrayList<BaseQueue> queues;
if(matchedQueues.getClass() == ArrayList.class)
{
queues = (ArrayList) matchedQueues;
}
else
{
queues = new ArrayList<BaseQueue>();
queues.addAll(matchedQueues);
}
if(queues == null || queues.isEmpty())
{
_logger.info("Message routing key: " + payload.getRoutingKey() + " No routes.");
}
return queues;
}
public boolean isBound(AMQShortString routingKey, FieldTable arguments, AMQQueue queue)
{
Binding binding = new Binding(null, routingKey.toString(), queue, this, FieldTable.convertToMap(arguments));
if (arguments == null)
{
return _bindings.containsKey(binding);
}
else
{
FieldTable o = _bindings.get(binding);
if (o != null)
{
return o.equals(arguments);
}
else
{
return false;
}
}
}
public boolean isBound(String bindingKey, Map<String, Object> arguments, AMQQueue queue)
{
Binding binding = new Binding(null, bindingKey, queue, this, arguments);
if (arguments == null)
{
return _bindings.containsKey(binding);
}
else
{
FieldTable o = _bindings.get(binding);
if (o != null)
{
return arguments.equals(FieldTable.convertToMap(o));
}
else
{
return false;
}
}
}
public boolean isBound(AMQShortString routingKey, AMQQueue queue)
{
return isBound(routingKey, null, queue);
}
public boolean isBound(AMQShortString routingKey)
{
for(Binding b : _bindings.keySet())
{
if(b.getBindingKey().equals(routingKey.toString()))
{
return true;
}
}
return false;
}
public boolean isBound(AMQQueue queue)
{
for(Binding b : _bindings.keySet())
{
if(b.getQueue().equals(queue))
{
return true;
}
}
return false;
}
public boolean hasBindings()
{
return !_bindings.isEmpty();
}
private boolean deregisterQueue(final Binding binding)
{
if(_bindings.containsKey(binding))
{
FieldTable bindingArgs = _bindings.remove(binding);
AMQShortString bindingKey = TopicNormalizer.normalize(new AMQShortString(binding.getBindingKey()));
TopicExchangeResult result = _topicExchangeResults.get(bindingKey);
result.removeBinding(binding);
if(argumentsContainFilter(bindingArgs))
{
try
{
result.removeFilteredQueue(binding.getQueue(), createMessageFilter(bindingArgs, binding.getQueue()));
}
catch (AMQInvalidArgumentException e)
{
return false;
}
}
else
{
result.removeUnfilteredQueue(binding.getQueue());
}
return true;
}
else
{
return false;
}
}
private Collection<AMQQueue> getMatchedQueues(InboundMessage message, AMQShortString routingKey)
{
Collection<TopicMatcherResult> results = _parser.parse(routingKey);
switch(results.size())
{
case 0:
return Collections.EMPTY_SET;
case 1:
TopicMatcherResult[] resultQueues = new TopicMatcherResult[1];
results.toArray(resultQueues);
return ((TopicExchangeResult)resultQueues[0]).processMessage(message, null);
default:
Collection<AMQQueue> queues = new HashSet<AMQQueue>();
for(TopicMatcherResult result : results)
{
TopicExchangeResult res = (TopicExchangeResult)result;
for(Binding b : res.getBindings())
{
b.incrementMatches();
}
queues = res.processMessage(message, queues);
}
return queues;
}
}
protected void onBind(final Binding binding)
{
try
{
registerQueue(binding);
}
catch (AMQInvalidArgumentException e)
{
throw new RuntimeException(e);
}
}
protected void onUnbind(final Binding binding)
{
deregisterQueue(binding);
}
private static final class NoLocalFilter implements MessageFilter
{
private final AMQQueue _queue;
public NoLocalFilter(AMQQueue queue)
{
_queue = queue;
}
public boolean matches(Filterable message)
{
InboundMessage inbound = (InboundMessage) message;
final AMQSessionModel exclusiveOwningSession = _queue.getExclusiveOwningSession();
return exclusiveOwningSession == null || !exclusiveOwningSession.onSameConnection(inbound);
}
@Override
public boolean equals(Object o)
{
if (this == o)
{
return true;
}
if (o == null || getClass() != o.getClass())
{
return false;
}
NoLocalFilter that = (NoLocalFilter) o;
return _queue == null ? that._queue == null : _queue.equals(that._queue);
}
@Override
public int hashCode()
{
return _queue != null ? _queue.hashCode() : 0;
}
}
private static final class CompoundFilter implements MessageFilter
{
private MessageFilter _noLocalFilter;
private MessageFilter _jmsSelectorFilter;
public CompoundFilter(MessageFilter filter, MessageFilter jmsSelectorFilter)
{
_noLocalFilter = filter;
_jmsSelectorFilter = jmsSelectorFilter;
}
public boolean matches(Filterable message)
{
return _noLocalFilter.matches(message) && _jmsSelectorFilter.matches(message);
}
@Override
public boolean equals(Object o)
{
if (this == o)
{
return true;
}
if (o == null || getClass() != o.getClass())
{
return false;
}
CompoundFilter that = (CompoundFilter) o;
if (_jmsSelectorFilter != null ? !_jmsSelectorFilter.equals(that._jmsSelectorFilter) : that._jmsSelectorFilter != null)
{
return false;
}
if (_noLocalFilter != null ? !_noLocalFilter.equals(that._noLocalFilter) : that._noLocalFilter != null)
{
return false;
}
return true;
}
@Override
public int hashCode()
{
int result = _noLocalFilter != null ? _noLocalFilter.hashCode() : 0;
result = 31 * result + (_jmsSelectorFilter != null ? _jmsSelectorFilter.hashCode() : 0);
return result;
}
}
}