blob: 3ee7d0d7f8f83680c386fa7d7ac350796a4236f6 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.usecases;
import java.net.URI;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.DeliveryMode;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.Session;
import javax.jms.TextMessage;
import junit.framework.TestCase;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.broker.BrokerFactory;
import org.apache.activemq.broker.BrokerService;
public class TestBrokerConnectionDuplexExcludedDestinations extends TestCase {
BrokerService receiverBroker;
BrokerService senderBroker;
Connection hubConnection;
Session hubSession;
Connection spokeConnection;
Session spokeSession;
public void setUp() throws Exception {
// Hub broker
String configFileName = "org/apache/activemq/usecases/receiver-duplex.xml";
URI uri = new URI("xbean:" + configFileName);
receiverBroker = BrokerFactory.createBroker(uri);
receiverBroker.setPersistent(false);
receiverBroker.setBrokerName("Hub");
// Spoke broker
configFileName = "org/apache/activemq/usecases/sender-duplex.xml";
uri = new URI("xbean:" + configFileName);
senderBroker = BrokerFactory.createBroker(uri);
senderBroker.setPersistent(false);
senderBroker.setBrokerName("Spoke");
// Start both Hub and Spoke broker
receiverBroker.start();
senderBroker.start();
// create hub session
ConnectionFactory cfHub = new ActiveMQConnectionFactory("tcp://localhost:62002");
hubConnection = cfHub.createConnection();
hubConnection.start();
hubSession = hubConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
// create spoke session
ConnectionFactory cfSpoke = new ActiveMQConnectionFactory("tcp://localhost:62001");
spokeConnection = cfSpoke.createConnection();
spokeConnection.start();
spokeSession = spokeConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
}
public void tearDown() throws Exception {
hubSession.close();
hubConnection.stop();
hubConnection.close();
spokeSession.close();
spokeConnection.stop();
spokeConnection.close();
senderBroker.stop();
receiverBroker.stop();
}
public void testDuplexSendFromHubToSpoke()
throws Exception {
//create hub producer
MessageProducer hubProducer = hubSession.createProducer(null);
hubProducer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
hubProducer.setDisableMessageID(true);
hubProducer.setDisableMessageTimestamp(true);
Queue excludedQueueHub = hubSession.createQueue("exclude.test.foo");
TextMessage excludedMsgHub = hubSession.createTextMessage();
excludedMsgHub.setText(excludedQueueHub.toString());
Queue includedQueueHub = hubSession.createQueue("include.test.foo");
TextMessage includedMsgHub = hubSession.createTextMessage();
includedMsgHub.setText(includedQueueHub.toString());
// Sending from Hub queue
hubProducer.send(excludedQueueHub, excludedMsgHub);
hubProducer.send(includedQueueHub, includedMsgHub);
Queue excludedQueueSpoke = spokeSession.createQueue("exclude.test.foo");
MessageConsumer excludedConsumerSpoke = spokeSession.createConsumer(excludedQueueSpoke);
Thread.sleep(100);
Queue includedQueueSpoke = spokeSession.createQueue("include.test.foo");
MessageConsumer includedConsumerSpoke = spokeSession.createConsumer(includedQueueSpoke);
// Receiving from excluded Spoke queue
Message msg = excludedConsumerSpoke.receive(200);
assertNull(msg);
// Receiving from included Spoke queue
msg = includedConsumerSpoke.receive(200);
assertEquals(includedMsgHub, msg);
// we should be able to receive excluded queue message on Hub
MessageConsumer excludedConsumerHub = hubSession.createConsumer(excludedQueueHub);
msg = excludedConsumerHub.receive(200);;
assertEquals(excludedMsgHub, msg);
hubProducer.close();
excludedConsumerSpoke.close();
}
}