blob: 6799639a7aed1caba4b40a72b079eef076ead602 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.tests.smoke.brokerConnection;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.activemq.artemis.tests.smoke.common.ContainerService;
import org.apache.activemq.artemis.tests.smoke.common.SmokeTestBase;
import org.jboss.logging.Logger;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
public class ChainedMirrorTest extends SmokeTestBase {
private static final Logger logger = Logger.getLogger(ChainedMirrorTest.class);
Object network;
public Object serverMainA;
public Object serverMainB;
public Object serverRoot;
ContainerService service = ContainerService.getService();
private final String SERVER_ROOT = basedir + "/target/brokerConnect/chainedMirror/serverRoot";
private final String SERVER_A = basedir + "/target/brokerConnect/chainedMirror/serverA";
private final String SERVER_B = basedir + "/target/brokerConnect/chainedMirror/serverB";
@Before
public void beforeStart() throws Exception {
disableCheckThread();
ValidateContainer.assumeArtemisContainer();
Assert.assertNotNull(basedir);
recreateBrokerDirectory(SERVER_ROOT);
recreateBrokerDirectory(SERVER_A);
recreateBrokerDirectory(SERVER_B);
network = service.newNetwork();
serverMainA = service.newBrokerImage();
serverMainB = service.newBrokerImage();
serverRoot = service.newBrokerImage();
service.setNetwork(serverMainA, network);
service.setNetwork(serverMainB, network);
service.setNetwork(serverRoot, network);
service.exposePorts(serverMainA, 61616);
service.exposePorts(serverMainB, 61616);
service.exposePorts(serverRoot, 61616);
service.prepareInstance(SERVER_ROOT);
service.prepareInstance(SERVER_A);
service.prepareInstance(SERVER_B);
service.exposeBrokerHome(serverMainA, SERVER_A);
service.exposeBrokerHome(serverMainB, SERVER_B);
service.exposeBrokerHome(serverRoot, SERVER_ROOT);
service.exposeHosts(serverRoot, "artemisTestRoot");
service.exposeHosts(serverMainA, "artemisTestA");
service.exposeHosts(serverMainB, "artemisTestB");
service.startLogging(serverMainB, "ServerB:");
service.start(serverMainB);
service.start(serverMainA);
service.start(serverRoot);
}
@After
public void afterStop() {
service.stop(serverRoot);
service.stop(serverMainA);
service.stop(serverMainB);
}
private String getQueueName() {
return "someQueue";
}
@Test
public void testChained() throws Throwable {
ConnectionFactory factory = service.createCF(serverRoot, "amqp");
ConnectionFactory factory2 = service.createCF(serverMainA, "amqp");
ConnectionFactory factory3 = service.createCF(serverMainB, "amqp");
try (Connection conn = factory.createConnection()) {
Session session = conn.createSession();
MessageProducer producer = session.createProducer(session.createQueue(getQueueName()));
for (int i = 0; i < 40; i++) {
producer.send(session.createTextMessage("message " + i));
}
}
Thread.sleep(5000); // some time to allow eventual loops
try (Connection conn = factory.createConnection()) {
Session session = conn.createSession();
conn.start();
MessageConsumer consumer = session.createConsumer(session.createQueue(getQueueName()));
for (int i = 0; i < 10; i++) {
TextMessage message = (TextMessage) consumer.receive(1000);
Assert.assertNotNull(message);
Assert.assertEquals("message " + i, message.getText());
}
consumer.close();
}
Thread.sleep(5000); // some time to allow eventual loops
try (Connection conn = factory2.createConnection()) {
Session session = conn.createSession();
conn.start();
MessageConsumer consumer = session.createConsumer(session.createQueue(getQueueName()));
for (int i = 10; i < 20; i++) {
TextMessage message = (TextMessage) consumer.receive(1000);
Assert.assertNotNull(message);
Assert.assertEquals("message " + i, message.getText());
}
consumer.close();
}
Thread.sleep(5000); // some time to allow eventual loops
try (Connection conn = factory3.createConnection()) {
Session session = conn.createSession();
conn.start();
MessageConsumer consumer = session.createConsumer(session.createQueue(getQueueName()));
for (int i = 20; i < 30; i++) {
TextMessage message = (TextMessage) consumer.receive(1000);
Assert.assertNotNull(message);
Assert.assertEquals("message " + i, message.getText());
}
consumer.close();
}
Thread.sleep(5000); // some time to allow eventual loops
try (Connection conn = factory.createConnection()) {
Session session = conn.createSession();
conn.start();
MessageConsumer consumer = session.createConsumer(session.createQueue(getQueueName()));
for (int i = 30; i < 40; i++) {
TextMessage message = (TextMessage) consumer.receive(1000);
Assert.assertNotNull(message);
Assert.assertEquals("message " + i, message.getText());
}
consumer.close();
}
Thread.sleep(5000); // some time to allow eventual loops
try (Connection conn = factory.createConnection()) {
Session session = conn.createSession();
conn.start();
MessageConsumer consumer = session.createConsumer(session.createQueue(getQueueName()));
Assert.assertNull(consumer.receiveNoWait());
consumer.close();
}
try (Connection conn = factory2.createConnection()) {
Session session = conn.createSession();
conn.start();
MessageConsumer consumer = session.createConsumer(session.createQueue(getQueueName()));
Assert.assertNull(consumer.receiveNoWait());
consumer.close();
}
try (Connection conn = factory3.createConnection()) {
Session session = conn.createSession();
conn.start();
MessageConsumer consumer = session.createConsumer(session.createQueue(getQueueName()));
Assert.assertNull(consumer.receiveNoWait());
consumer.close();
}
}
}