blob: f1bf6322358b3fdc63126b9ebd85d1712562219c [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
*
*/
package org.apache.qpid.server.exchange;
import junit.framework.Assert;
import org.apache.qpid.AMQException;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.BasicContentHeaderProperties;
import org.apache.qpid.framing.ContentHeaderBody;
import org.apache.qpid.framing.abstraction.MessagePublishInfo;
import org.apache.qpid.server.binding.Binding;
import org.apache.qpid.server.message.AMQMessage;
import org.apache.qpid.server.message.MessageMetaData;
import org.apache.qpid.server.model.UUIDGenerator;
import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.queue.AMQQueueFactory;
import org.apache.qpid.server.queue.BaseQueue;
import org.apache.qpid.server.queue.IncomingMessage;
import org.apache.qpid.server.store.MemoryMessageStore;
import org.apache.qpid.server.store.MessageStore;
import org.apache.qpid.server.util.BrokerTestHelper;
import org.apache.qpid.server.virtualhost.VirtualHost;
import org.apache.qpid.test.utils.QpidTestCase;
public class TopicExchangeTest extends QpidTestCase
{
private TopicExchange _exchange;
private VirtualHost _vhost;
private MessageStore _store;
@Override
public void setUp() throws Exception
{
super.setUp();
BrokerTestHelper.setUp();
_exchange = new TopicExchange();
_vhost = BrokerTestHelper.createVirtualHost(getName());
_store = new MemoryMessageStore();
}
@Override
public void tearDown() throws Exception
{
try
{
if (_vhost != null)
{
_vhost.close();
}
}
finally
{
BrokerTestHelper.tearDown();
super.tearDown();
}
}
public void testNoRoute() throws AMQException
{
AMQQueue queue = AMQQueueFactory.createAMQQueueImpl(UUIDGenerator.generateRandomUUID(), "a*#b", false, null, false, false, _vhost, null);
_exchange.registerQueue(new Binding(null, "a.*.#.b",queue, _exchange, null));
IncomingMessage message = createMessage("a.b");
routeMessage(message);
Assert.assertEquals(0, queue.getMessageCount());
}
public void testDirectMatch() throws AMQException
{
AMQQueue queue = AMQQueueFactory.createAMQQueueImpl(UUIDGenerator.generateRandomUUID(), "ab", false, null, false, false, _vhost, null);
_exchange.registerQueue(new Binding(null, "a.b",queue, _exchange, null));
IncomingMessage message = createMessage("a.b");
routeMessage(message);
Assert.assertEquals(1, queue.getMessageCount());
Assert.assertEquals("Wrong message recevied", (Object) message.getMessageNumber(), queue.getMessagesOnTheQueue().get(0).getMessage().getMessageNumber());
queue.deleteMessageFromTop();
Assert.assertEquals(0, queue.getMessageCount());
message = createMessage("a.c");
int queueCount = routeMessage(message);
Assert.assertEquals("Message should not route to any queues", 0, queueCount);
Assert.assertEquals(0, queue.getMessageCount());
}
public void testStarMatch() throws AMQException
{
AMQQueue queue = AMQQueueFactory.createAMQQueueImpl(UUIDGenerator.generateRandomUUID(), "a*", false, null, false, false, _vhost, null);
_exchange.registerQueue(new Binding(null, "a.*",queue, _exchange, null));
IncomingMessage message = createMessage("a.b");
routeMessage(message);
Assert.assertEquals(1, queue.getMessageCount());
Assert.assertEquals("Wrong message recevied", (Object) message.getMessageNumber(), queue.getMessagesOnTheQueue().get(0).getMessage().getMessageNumber());
queue.deleteMessageFromTop();
Assert.assertEquals(0, queue.getMessageCount());
message = createMessage("a.c");
int queueCount = routeMessage(message);
Assert.assertEquals(1, queue.getMessageCount());
Assert.assertEquals("Wrong message recevied", (Object) message.getMessageNumber(), queue.getMessagesOnTheQueue().get(0).getMessage().getMessageNumber());
queue.deleteMessageFromTop();
Assert.assertEquals(0, queue.getMessageCount());
message = createMessage("a");
queueCount = routeMessage(message);
Assert.assertEquals("Message should not route to any queues", 0, queueCount);
Assert.assertEquals(0, queue.getMessageCount());
}
public void testHashMatch() throws AMQException
{
AMQQueue queue = AMQQueueFactory.createAMQQueueImpl(UUIDGenerator.generateRandomUUID(), "a#", false, null, false, false, _vhost, null);
_exchange.registerQueue(new Binding(null, "a.#",queue, _exchange, null));
IncomingMessage message = createMessage("a.b.c");
int queueCount = routeMessage(message);
Assert.assertEquals(1, queue.getMessageCount());
Assert.assertEquals("Wrong message recevied", (Object) message.getMessageNumber(), queue.getMessagesOnTheQueue().get(0).getMessage().getMessageNumber());
queue.deleteMessageFromTop();
Assert.assertEquals(0, queue.getMessageCount());
message = createMessage("a.b");
queueCount = routeMessage(message);
Assert.assertEquals(1, queue.getMessageCount());
Assert.assertEquals("Wrong message recevied", (Object) message.getMessageNumber(), queue.getMessagesOnTheQueue().get(0).getMessage().getMessageNumber());
queue.deleteMessageFromTop();
Assert.assertEquals(0, queue.getMessageCount());
message = createMessage("a.c");
queueCount = routeMessage(message);
Assert.assertEquals(1, queue.getMessageCount());
Assert.assertEquals("Wrong message recevied", (Object) message.getMessageNumber(), queue.getMessagesOnTheQueue().get(0).getMessage().getMessageNumber());
queue.deleteMessageFromTop();
Assert.assertEquals(0, queue.getMessageCount());
message = createMessage("a");
queueCount = routeMessage(message);
Assert.assertEquals(1, queue.getMessageCount());
Assert.assertEquals("Wrong message recevied", (Object) message.getMessageNumber(), queue.getMessagesOnTheQueue().get(0).getMessage().getMessageNumber());
queue.deleteMessageFromTop();
Assert.assertEquals(0, queue.getMessageCount());
message = createMessage("b");
queueCount = routeMessage(message);
Assert.assertEquals("Message should not route to any queues", 0, queueCount);
Assert.assertEquals(0, queue.getMessageCount());
}
public void testMidHash() throws AMQException
{
AMQQueue queue = AMQQueueFactory.createAMQQueueImpl(UUIDGenerator.generateRandomUUID(), "a", false, null, false, false, _vhost, null);
_exchange.registerQueue(new Binding(null, "a.*.#.b",queue, _exchange, null));
IncomingMessage message = createMessage("a.c.d.b");
routeMessage(message);
Assert.assertEquals(1, queue.getMessageCount());
Assert.assertEquals("Wrong message recevied", (Object) message.getMessageNumber(), queue.getMessagesOnTheQueue().get(0).getMessage().getMessageNumber());
queue.deleteMessageFromTop();
Assert.assertEquals(0, queue.getMessageCount());
message = createMessage("a.c.b");
routeMessage(message);
Assert.assertEquals(1, queue.getMessageCount());
Assert.assertEquals("Wrong message recevied", (Object) message.getMessageNumber(), queue.getMessagesOnTheQueue().get(0).getMessage().getMessageNumber());
queue.deleteMessageFromTop();
Assert.assertEquals(0, queue.getMessageCount());
}
public void testMatchafterHash() throws AMQException
{
AMQQueue queue = AMQQueueFactory.createAMQQueueImpl(UUIDGenerator.generateRandomUUID(), "a#", false, null, false, false, _vhost, null);
_exchange.registerQueue(new Binding(null, "a.*.#.b.c",queue, _exchange, null));
IncomingMessage message = createMessage("a.c.b.b");
int queueCount = routeMessage(message);
Assert.assertEquals("Message should not route to any queues", 0, queueCount);
Assert.assertEquals(0, queue.getMessageCount());
message = createMessage("a.a.b.c");
routeMessage(message);
Assert.assertEquals(1, queue.getMessageCount());
Assert.assertEquals("Wrong message recevied", (Object) message.getMessageNumber(), queue.getMessagesOnTheQueue().get(0).getMessage().getMessageNumber());
queue.deleteMessageFromTop();
Assert.assertEquals(0, queue.getMessageCount());
message = createMessage("a.b.c.b");
queueCount = routeMessage(message);
Assert.assertEquals("Message should not route to any queues", 0, queueCount);
Assert.assertEquals(0, queue.getMessageCount());
message = createMessage("a.b.c.b.c");
routeMessage(message);
Assert.assertEquals(1, queue.getMessageCount());
Assert.assertEquals("Wrong message recevied", (Object) message.getMessageNumber(), queue.getMessagesOnTheQueue().get(0).getMessage().getMessageNumber());
queue.deleteMessageFromTop();
Assert.assertEquals(0, queue.getMessageCount());
}
public void testHashAfterHash() throws AMQException
{
AMQQueue queue = AMQQueueFactory.createAMQQueueImpl(UUIDGenerator.generateRandomUUID(), "a#", false, null, false, false, _vhost, null);
_exchange.registerQueue(new Binding(null, "a.*.#.b.c.#.d",queue, _exchange, null));
IncomingMessage message = createMessage("a.c.b.b.c");
int queueCount = routeMessage(message);
Assert.assertEquals("Message should not route to any queues", 0, queueCount);
Assert.assertEquals(0, queue.getMessageCount());
message = createMessage("a.a.b.c.d");
routeMessage(message);
Assert.assertEquals(1, queue.getMessageCount());
Assert.assertEquals("Wrong message recevied", (Object) message.getMessageNumber(), queue.getMessagesOnTheQueue().get(0).getMessage().getMessageNumber());
queue.deleteMessageFromTop();
Assert.assertEquals(0, queue.getMessageCount());
}
public void testHashHash() throws AMQException
{
AMQQueue queue = AMQQueueFactory.createAMQQueueImpl(UUIDGenerator.generateRandomUUID(), "a#", false, null, false, false, _vhost, null);
_exchange.registerQueue(new Binding(null, "a.#.*.#.d",queue, _exchange, null));
IncomingMessage message = createMessage("a.c.b.b.c");
int queueCount = routeMessage(message);
Assert.assertEquals("Message should not route to any queues", 0, queueCount);
Assert.assertEquals(0, queue.getMessageCount());
message = createMessage("a.a.b.c.d");
routeMessage(message);
Assert.assertEquals(1, queue.getMessageCount());
Assert.assertEquals("Wrong message recevied", (Object) message.getMessageNumber(), queue.getMessagesOnTheQueue().get(0).getMessage().getMessageNumber());
queue.deleteMessageFromTop();
Assert.assertEquals(0, queue.getMessageCount());
}
public void testSubMatchFails() throws AMQException
{
AMQQueue queue = AMQQueueFactory.createAMQQueueImpl(UUIDGenerator.generateRandomUUID(), "a", false, null, false, false, _vhost, null);
_exchange.registerQueue(new Binding(null, "a.b.c.d",queue, _exchange, null));
IncomingMessage message = createMessage("a.b.c");
int queueCount = routeMessage(message);
Assert.assertEquals("Message should not route to any queues", 0, queueCount);
Assert.assertEquals(0, queue.getMessageCount());
}
private int routeMessage(final IncomingMessage message)
throws AMQException
{
MessageMetaData mmd = message.headersReceived(System.currentTimeMillis());
message.setStoredMessage(_store.addMessage(mmd));
message.enqueue(_exchange.route(message));
AMQMessage msg = new AMQMessage(message.getStoredMessage());
for(BaseQueue q : message.getDestinationQueues())
{
q.enqueue(msg);
}
return message.getDestinationQueues().size();
}
public void testMoreRouting() throws AMQException
{
AMQQueue queue = AMQQueueFactory.createAMQQueueImpl(UUIDGenerator.generateRandomUUID(), "a", false, null, false, false, _vhost, null);
_exchange.registerQueue(new Binding(null, "a.b",queue, _exchange, null));
IncomingMessage message = createMessage("a.b.c");
int queueCount = routeMessage(message);
Assert.assertEquals("Message should not route to any queues", 0, queueCount);
Assert.assertEquals(0, queue.getMessageCount());
}
public void testMoreQueue() throws AMQException
{
AMQQueue queue = AMQQueueFactory.createAMQQueueImpl(UUIDGenerator.generateRandomUUID(), "a", false, null, false, false, _vhost, null);
_exchange.registerQueue(new Binding(null, "a.b",queue, _exchange, null));
IncomingMessage message = createMessage("a");
int queueCount = routeMessage(message);
Assert.assertEquals("Message should not route to any queues", 0, queueCount);
Assert.assertEquals(0, queue.getMessageCount());
}
private IncomingMessage createMessage(String s) throws AMQException
{
MessagePublishInfo info = new PublishInfo(new AMQShortString(s));
IncomingMessage message = new IncomingMessage(info);
final ContentHeaderBody chb = new ContentHeaderBody();
BasicContentHeaderProperties props = new BasicContentHeaderProperties();
chb.setProperties(props);
message.setContentHeaderBody(chb);
return message;
}
class PublishInfo implements MessagePublishInfo
{
private AMQShortString _routingkey;
PublishInfo(AMQShortString routingkey)
{
_routingkey = routingkey;
}
public AMQShortString getExchange()
{
return null;
}
public void setExchange(AMQShortString exchange)
{
}
public boolean isImmediate()
{
return false;
}
public boolean isMandatory()
{
return true;
}
public AMQShortString getRoutingKey()
{
return _routingkey;
}
}
}