blob: 354f8af7fc16b854a0b2a5959d2bcbd80003b4b5 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.usecases;
import org.apache.activemq.JmsMultipleBrokersTestSupport;
import org.apache.activemq.util.MessageIdList;
import org.springframework.core.io.ClassPathResource;
import javax.jms.Destination;
import javax.jms.MessageConsumer;
public class TopicReplicationTest extends JmsMultipleBrokersTestSupport {
public static final int MSG_COUNT = 10;
public void testReplication() throws Exception {
createBroker(new ClassPathResource("org/apache/activemq/usecases/replication-broker1.xml"));
createBroker(new ClassPathResource("org/apache/activemq/usecases/replication-broker2.xml"));
createBroker(new ClassPathResource("org/apache/activemq/usecases/replication-broker3.xml"));
createBroker(new ClassPathResource("org/apache/activemq/usecases/replication-broker4.xml"));
brokers.get("replication-broker1").broker.waitUntilStarted();
brokers.get("replication-broker2").broker.waitUntilStarted();
brokers.get("replication-broker3").broker.waitUntilStarted();
brokers.get("replication-broker4").broker.waitUntilStarted();
Destination dest = createDestination("replication", true);
// Setup consumers
MessageConsumer clientA = createConsumer("replication-broker2", dest);
MessageConsumer clientB = createConsumer("replication-broker3", dest);
MessageConsumer clientC = createConsumer("replication-broker4", dest);
MessageConsumer clientD = createConsumer("replication-broker4", dest);
//let consumers propogate around the network
Thread.sleep(2000);
// Get message count
MessageIdList msgsA = getConsumerMessages("replication-broker2", clientA);
MessageIdList msgsB = getConsumerMessages("replication-broker3", clientB);
MessageIdList msgsC = getConsumerMessages("replication-broker4", clientC);
MessageIdList msgsD = getConsumerMessages("replication-broker4", clientD);
// send messages to broker1
sendMessages("replication-broker1", dest, MSG_COUNT);
msgsA.waitForMessagesToArrive(MSG_COUNT);
msgsB.waitForMessagesToArrive(MSG_COUNT);
msgsC.waitForMessagesToArrive(MSG_COUNT);
msgsD.waitForMessagesToArrive(MSG_COUNT);
assertEquals(MSG_COUNT, msgsA.getMessageCount());
assertEquals(MSG_COUNT, msgsB.getMessageCount());
assertEquals(MSG_COUNT, msgsC.getMessageCount());
assertEquals(MSG_COUNT, msgsD.getMessageCount());
// send messages to broker4
sendMessages("replication-broker4", dest, MSG_COUNT);
msgsA.waitForMessagesToArrive(2 * MSG_COUNT);
msgsB.waitForMessagesToArrive(2 * MSG_COUNT);
msgsC.waitForMessagesToArrive(2 * MSG_COUNT);
msgsD.waitForMessagesToArrive(2 * MSG_COUNT);
assertEquals(2 * MSG_COUNT, msgsA.getMessageCount());
assertEquals(2 * MSG_COUNT, msgsB.getMessageCount());
assertEquals(2 * MSG_COUNT, msgsC.getMessageCount());
assertEquals(2 * MSG_COUNT, msgsD.getMessageCount());
// send messages to broker3
sendMessages("replication-broker3", dest, MSG_COUNT);
msgsA.waitForMessagesToArrive(3 * MSG_COUNT);
msgsB.waitForMessagesToArrive(3 * MSG_COUNT);
msgsC.waitForMessagesToArrive(3 * MSG_COUNT);
msgsD.waitForMessagesToArrive(3 * MSG_COUNT);
assertEquals(3 * MSG_COUNT, msgsA.getMessageCount());
assertEquals(3 * MSG_COUNT, msgsB.getMessageCount());
assertEquals(3 * MSG_COUNT, msgsC.getMessageCount());
assertEquals(3 * MSG_COUNT, msgsD.getMessageCount());
// send messages to broker2
sendMessages("replication-broker2", dest, MSG_COUNT);
msgsA.waitForMessagesToArrive(4 * MSG_COUNT);
msgsB.waitForMessagesToArrive(4 * MSG_COUNT);
msgsC.waitForMessagesToArrive(4 * MSG_COUNT);
msgsD.waitForMessagesToArrive(4 * MSG_COUNT);
assertEquals(4 * MSG_COUNT, msgsA.getMessageCount());
assertEquals(4 * MSG_COUNT, msgsB.getMessageCount());
assertEquals(4 * MSG_COUNT, msgsC.getMessageCount());
assertEquals(4 * MSG_COUNT, msgsD.getMessageCount());
}
}