blob: 7797ae3863d1b0d7351372f2996176682e6aba35 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.broker.virtual;
import java.util.ArrayList;
import javax.jms.Connection;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.activemq.EmbeddedBrokerTestSupport;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.region.DestinationInterceptor;
import org.apache.activemq.broker.region.virtual.CompositeQueue;
import org.apache.activemq.broker.region.virtual.FilteredDestination;
import org.apache.activemq.broker.region.virtual.VirtualDestination;
import org.apache.activemq.broker.region.virtual.VirtualDestinationInterceptor;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.spring.ConsumerBean;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class CompositeDestinationSendWhenNotMatchedTest extends EmbeddedBrokerTestSupport {
private static final Logger LOG = LoggerFactory.getLogger(CompositeQueueTest.class);
protected int total = 10;
protected Connection connection;
@Test
public void testSendWhenNotMatched() throws Exception {
if (connection == null) {
connection = createConnection();
}
connection.start();
ConsumerBean messageList1 = new ConsumerBean();
ConsumerBean messageList2 = new ConsumerBean();
messageList1.setVerbose(true);
messageList2.setVerbose(true);
// messageList1.waitForMessagesToArrive(0);
// messageList2.waitForMessagesToArrive(1);
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Destination producerDestination = new ActiveMQQueue("A.B");
Destination destination1 =new ActiveMQQueue("A.B");
Destination destination2 = new ActiveMQQueue("A.C");
LOG.info("Sending to: " + producerDestination);
LOG.info("Consuming from: " + destination1 + " and " + destination2);
MessageConsumer c1 = session.createConsumer(destination1);
MessageConsumer c2 = session.createConsumer(destination2);
c1.setMessageListener(messageList1);
c2.setMessageListener(messageList2);
// create topic producer
MessageProducer producer = session.createProducer(producerDestination);
assertNotNull(producer);
producer.send(createMessage(session, "tet13"));
messageList1.assertMessagesArrived(1);
messageList2.assertMessagesArrived(0);
}
@Test
public void testSendWhenMatched() throws Exception {
if (connection == null) {
connection = createConnection();
}
connection.start();
ConsumerBean messageList1 = new ConsumerBean();
ConsumerBean messageList2 = new ConsumerBean();
messageList1.setVerbose(true);
messageList2.setVerbose(true);
// messageList1.waitForMessagesToArrive(0);
// messageList2.waitForMessagesToArrive(1);
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Destination producerDestination = new ActiveMQQueue("A.B");
Destination destination1 =new ActiveMQQueue("A.B");
Destination destination2 = new ActiveMQQueue("A.C");
LOG.info("Sending to: " + producerDestination);
LOG.info("Consuming from: " + destination1 + " and " + destination2);
MessageConsumer c1 = session.createConsumer(destination1);
MessageConsumer c2 = session.createConsumer(destination2);
c1.setMessageListener(messageList1);
c2.setMessageListener(messageList2);
// create topic producer
MessageProducer producer = session.createProducer(producerDestination);
assertNotNull(producer);
producer.send(createMessage(session, "test13"));
messageList2.assertMessagesArrived(1);
messageList1.assertMessagesArrived(0);
}
@Test
public void testForwardOnlyFalseSendWhenMatchedTrue1() throws Exception {
if (connection == null) {
connection = createConnection();
}
connection.start();
ConsumerBean messageList1 = new ConsumerBean();
ConsumerBean messageList2 = new ConsumerBean();
messageList1.setVerbose(true);
messageList2.setVerbose(true);
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Destination producerDestination = new ActiveMQQueue("A.D");
Destination destination1 =new ActiveMQQueue("A.D");
Destination destination2 = new ActiveMQQueue("A.E");
LOG.info("Sending to: " + producerDestination);
LOG.info("Consuming from: " + destination1 + " and " + destination2);
MessageConsumer c1 = session.createConsumer(destination1);
MessageConsumer c2 = session.createConsumer(destination2);
c1.setMessageListener(messageList1);
c2.setMessageListener(messageList2);
// create topic producer
MessageProducer producer = session.createProducer(producerDestination);
assertNotNull(producer);
producer.send(createMessage(session, "tes13"));
messageList1.assertMessagesArrived(1);
messageList2.assertMessagesArrived(0);
}
public void testForwardOnlyFalseSendWhenMatchedTrue2() throws Exception {
if (connection == null) {
connection = createConnection();
}
connection.start();
ConsumerBean messageList1 = new ConsumerBean();
ConsumerBean messageList2 = new ConsumerBean();
messageList1.setVerbose(true);
messageList2.setVerbose(true);
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Destination producerDestination = new ActiveMQQueue("A.D");
Destination destination1 =new ActiveMQQueue("A.D");
Destination destination2 = new ActiveMQQueue("A.E");
LOG.info("Sending to: " + producerDestination);
LOG.info("Consuming from: " + destination1 + " and " + destination2);
MessageConsumer c1 = session.createConsumer(destination1);
MessageConsumer c2 = session.createConsumer(destination2);
c1.setMessageListener(messageList1);
c2.setMessageListener(messageList2);
// create topic producer
MessageProducer producer = session.createProducer(producerDestination);
assertNotNull(producer);
producer.send(createMessage(session, "test13"));
Thread.sleep(1*1000);
messageList2.assertMessagesArrived(1);
messageList1.assertMessagesArrived(0);
}
@Test
public void testForwardOnlyFalseBackwardCompatability1() throws Exception {
if (connection == null) {
connection = createConnection();
}
connection.start();
ConsumerBean messageList1 = new ConsumerBean();
ConsumerBean messageList2 = new ConsumerBean();
messageList1.setVerbose(true);
messageList2.setVerbose(true);
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Destination producerDestination = new ActiveMQQueue("A.X");
Destination destination1 =new ActiveMQQueue("A.X");
Destination destination2 = new ActiveMQQueue("A.Y");
LOG.info("Sending to: " + producerDestination);
LOG.info("Consuming from: " + destination1 + " and " + destination2);
MessageConsumer c1 = session.createConsumer(destination1);
MessageConsumer c2 = session.createConsumer(destination2);
c1.setMessageListener(messageList1);
c2.setMessageListener(messageList2);
// create topic producer
MessageProducer producer = session.createProducer(producerDestination);
assertNotNull(producer);
producer.send(createMessage(session, "test13"));
messageList2.assertMessagesArrived(1);
messageList1.assertMessagesArrived(1);
}
@Test
public void testForwardOnlyFalseBackwardCompatability2() throws Exception {
if (connection == null) {
connection = createConnection();
}
connection.start();
ConsumerBean messageList1 = new ConsumerBean();
ConsumerBean messageList2 = new ConsumerBean();
messageList1.setVerbose(true);
messageList2.setVerbose(true);
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Destination producerDestination = new ActiveMQQueue("A.X");
Destination destination1 =new ActiveMQQueue("A.X");
Destination destination2 = new ActiveMQQueue("A.Y");
LOG.info("Sending to: " + producerDestination);
LOG.info("Consuming from: " + destination1 + " and " + destination2);
MessageConsumer c1 = session.createConsumer(destination1);
MessageConsumer c2 = session.createConsumer(destination2);
c1.setMessageListener(messageList1);
c2.setMessageListener(messageList2);
// create topic producer
MessageProducer producer = session.createProducer(producerDestination);
assertNotNull(producer);
producer.send(createMessage(session, "tet13"));
messageList1.assertMessagesArrived(1);
messageList2.assertMessagesArrived(0);
}
@Test
public void testForwardOnlyTrueBackwardCompatability1() throws Exception {
if (connection == null) {
connection = createConnection();
}
connection.start();
ConsumerBean messageList1 = new ConsumerBean();
ConsumerBean messageList2 = new ConsumerBean();
messageList1.setVerbose(true);
messageList2.setVerbose(true);
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Destination producerDestination = new ActiveMQQueue("A.W");
Destination destination1 =new ActiveMQQueue("A.W");
Destination destination2 = new ActiveMQQueue("A.V");
LOG.info("Sending to: " + producerDestination);
LOG.info("Consuming from: " + destination1 + " and " + destination2);
MessageConsumer c1 = session.createConsumer(destination1);
MessageConsumer c2 = session.createConsumer(destination2);
c1.setMessageListener(messageList1);
c2.setMessageListener(messageList2);
// create topic producer
MessageProducer producer = session.createProducer(producerDestination);
assertNotNull(producer);
producer.send(createMessage(session, "test13"));
messageList2.assertMessagesArrived(1);
messageList1.assertMessagesArrived(0);
}
@Test
public void testForwardOnlyTrueBackwardCompatability2() throws Exception {
if (connection == null) {
connection = createConnection();
}
connection.start();
ConsumerBean messageList1 = new ConsumerBean();
ConsumerBean messageList2 = new ConsumerBean();
messageList1.setVerbose(true);
messageList2.setVerbose(true);
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Destination producerDestination = new ActiveMQQueue("A.W");
Destination destination1 =new ActiveMQQueue("A.W");
Destination destination2 = new ActiveMQQueue("A.V");
LOG.info("Sending to: " + producerDestination);
LOG.info("Consuming from: " + destination1 + " and " + destination2);
MessageConsumer c1 = session.createConsumer(destination1);
MessageConsumer c2 = session.createConsumer(destination2);
c1.setMessageListener(messageList1);
c2.setMessageListener(messageList2);
// create topic producer
MessageProducer producer = session.createProducer(producerDestination);
assertNotNull(producer);
producer.send(createMessage(session, "tet13"));
Thread.sleep(2*1000);
messageList1.assertMessagesArrived(0);
messageList2.assertMessagesArrived(0);
}
@Test
public void testForwardOnlySendWhenNotMatchedSetToFalse() throws Exception {
if (connection == null) {
connection = createConnection();
}
connection.start();
ConsumerBean messageList1 = new ConsumerBean();
ConsumerBean messageList2 = new ConsumerBean();
messageList1.setVerbose(true);
messageList2.setVerbose(true);
// messageList1.waitForMessagesToArrive(0);
// messageList2.waitForMessagesToArrive(1);
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Destination producerDestination = new ActiveMQQueue("X.Y");
Destination destination1 = new ActiveMQQueue("X.Y");
Destination destination2 = new ActiveMQQueue("X.Z");
LOG.info("Sending to: " + producerDestination);
LOG.info("Consuming from: " + destination1 + " and " + destination2);
MessageConsumer c1 = session.createConsumer(destination1);
MessageConsumer c2 = session.createConsumer(destination2);
c1.setMessageListener(messageList1);
c2.setMessageListener(messageList2);
// create topic producer
MessageProducer producer = session.createProducer(producerDestination);
assertNotNull(producer);
producer.send(createMessage(session, "tet13"));
messageList2.assertMessagesArrived(1);
messageList1.assertMessagesArrived(0);
}
@Test
public void testForwardOnlyFalseSendWhenNotMatchedSetToFalse() throws Exception {
if (connection == null) {
connection = createConnection();
}
connection.start();
ConsumerBean messageList1 = new ConsumerBean();
ConsumerBean messageList2 = new ConsumerBean();
messageList1.setVerbose(true);
messageList2.setVerbose(true);
// messageList1.waitForMessagesToArrive(0);
// messageList2.waitForMessagesToArrive(1);
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Destination producerDestination = new ActiveMQQueue("R.S");
Destination destination1 = new ActiveMQQueue("R.S");
Destination destination2 = new ActiveMQQueue("R.T");
LOG.info("Sending to: " + producerDestination);
LOG.info("Consuming from: " + destination1 + " and " + destination2);
MessageConsumer c1 = session.createConsumer(destination1);
MessageConsumer c2 = session.createConsumer(destination2);
c1.setMessageListener(messageList1);
c2.setMessageListener(messageList2);
// create topic producer
MessageProducer producer = session.createProducer(producerDestination);
assertNotNull(producer);
producer.send(createMessage(session, "tet13"));
messageList1.assertMessagesArrived(1);
messageList2.assertMessagesArrived(1);
}
protected Destination getConsumer1Dsetination() {
return new ActiveMQQueue("A.B");
}
protected Destination getConsumer2Dsetination() {
return new ActiveMQQueue("A.C");
}
protected Destination getProducerDestination() {
return new ActiveMQQueue("A.B");
}
protected TextMessage createMessage(Session session, String testid) throws JMSException {
TextMessage textMessage = session.createTextMessage("testMessage");
textMessage.setStringProperty("testid", testid);
return textMessage;
}
protected BrokerService createBroker() throws Exception {
BrokerService answer = new BrokerService();
answer.setPersistent(isPersistent());
answer.getManagementContext().setCreateConnector(false);
answer.addConnector(bindAddress);
/*
* <destinationInterceptors> <virtualDestinationInterceptor>
* <virtualDestinations> <compositeQueue name="A.B" forwardOnly="false">
* <forwardTo> <filteredDestination selector="testid LIKE 'test%'" queue="A.C"/>
* </forwardTo> </compositeQueue> </virtualDestinations>
* </virtualDestinationInterceptor> </destinationInterceptors>
*/
/*
* SendWhenNotMatched = true A message will be always forwarded to if not matched to filtered destination
* ForwardOnly setting has no impact
*/
CompositeQueue compositeQueue = new CompositeQueue();
compositeQueue.setName("A.B");
compositeQueue.setForwardOnly(true); // By default it is true
compositeQueue.setSendWhenNotMatched(true);// By default it is false
FilteredDestination filteredQueue = new FilteredDestination();
filteredQueue.setQueue("A.C");
filteredQueue.setSelector("testid LIKE 'test%'");
final ArrayList<Object> forwardDestinations = new ArrayList<Object>();
forwardDestinations.add(filteredQueue);
compositeQueue.setForwardTo(forwardDestinations);
CompositeQueue compositeQueue0 = new CompositeQueue();
compositeQueue0.setName("A.D");
compositeQueue0.setForwardOnly(false); // By default it is true
compositeQueue0.setSendWhenNotMatched(true);// By default it is false
FilteredDestination filteredQueue0 = new FilteredDestination();
filteredQueue0.setQueue("A.E");
filteredQueue0.setSelector("testid LIKE 'test%'");
final ArrayList<Object> forwardDestinations0 = new ArrayList<Object>();
forwardDestinations0.add(filteredQueue0);
compositeQueue0.setForwardTo(forwardDestinations0);
//Back compatibility test 1
CompositeQueue compositeQueue01 = new CompositeQueue();
compositeQueue01.setName("A.X");
compositeQueue01.setForwardOnly(false); // By default it is true
//compositeQueue01.setSendWhenNotMatched(false);// By default it is false
FilteredDestination filteredQueue01 = new FilteredDestination();
filteredQueue01.setQueue("A.Y");
filteredQueue01.setSelector("testid LIKE 'test%'");
final ArrayList<Object> forwardDestinations01 = new ArrayList<Object>();
forwardDestinations01.add(filteredQueue01);
compositeQueue01.setForwardTo(forwardDestinations01);
//Back compatibility test 2
CompositeQueue compositeQueue02 = new CompositeQueue();
compositeQueue02.setName("A.W");
//compositeQueue02.setForwardOnly(true); // By default it is true
//compositeQueue01.setSendWhenNotMatched(false);// By default it is false
FilteredDestination filteredQueue02 = new FilteredDestination();
filteredQueue02.setQueue("A.V");
filteredQueue02.setSelector("testid LIKE 'test%'");
final ArrayList<Object> forwardDestinations02 = new ArrayList<Object>();
forwardDestinations02.add(filteredQueue02);
compositeQueue02.setForwardTo(forwardDestinations02);
CompositeQueue compositeQueue1 = new CompositeQueue();
compositeQueue1.setName("X.Y");
compositeQueue1.setForwardOnly(true); // By default it is true
ActiveMQQueue forwardQueue1 =new ActiveMQQueue("X.Z");
final ArrayList<Object> forwardDestinations1 = new ArrayList<Object>();
forwardDestinations1.add(forwardQueue1);
compositeQueue1.setForwardTo(forwardDestinations1);
CompositeQueue compositeQueue2 = new CompositeQueue();
compositeQueue2.setName("R.S");
compositeQueue2.setForwardOnly(false);
ActiveMQQueue forwardQueue2 =new ActiveMQQueue("R.T");
final ArrayList<Object> forwardDestinations2 = new ArrayList<Object>();
forwardDestinations2.add(forwardQueue2);
compositeQueue2.setForwardTo(forwardDestinations2);
VirtualDestinationInterceptor interceptor = new VirtualDestinationInterceptor();
interceptor.setVirtualDestinations(new VirtualDestination[] { compositeQueue,compositeQueue0,compositeQueue01,compositeQueue02,compositeQueue1,compositeQueue2 });
answer.setDestinationInterceptors(new DestinationInterceptor[] { interceptor });
return answer;
}
}