blob: e4ccb8057c8bc24efcf520d20439cc393e3d348d [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.tests.smoke.brokerConnection;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.activemq.artemis.tests.smoke.common.ContainerService;
import org.apache.activemq.artemis.tests.smoke.common.SmokeTestBase;
import org.jboss.logging.Logger;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
public class SplitMirrorTest extends SmokeTestBase {
private static final Logger logger = Logger.getLogger(SplitMirrorTest.class);
Object network;
public Object serverMainA;
public Object serverMainB;
public Object serverRoot;
ContainerService service = ContainerService.getService();
private final String SERVER_ROOT = basedir + "/target/brokerConnect/splitMirror/serverRoot";
private final String SERVER_A = basedir + "/target/brokerConnect/splitMirror/serverA";
private final String SERVER_B = basedir + "/target/brokerConnect/splitMirror/serverB";
@Before
public void beforeStart() throws Exception {
disableCheckThread();
ValidateContainer.assumeArtemisContainer();
Assert.assertNotNull(basedir);
recreateBrokerDirectory(SERVER_ROOT);
recreateBrokerDirectory(SERVER_A);
recreateBrokerDirectory(SERVER_B);
network = service.newNetwork();
serverMainA = service.newBrokerImage();
serverMainB = service.newBrokerImage();
serverRoot = service.newBrokerImage();
service.setNetwork(serverMainA, network);
service.setNetwork(serverMainB, network);
service.setNetwork(serverRoot, network);
service.exposePorts(serverMainA, 61616);
service.exposePorts(serverMainB, 61616);
service.exposePorts(serverRoot, 61616);
service.prepareInstance(SERVER_ROOT);
service.prepareInstance(SERVER_A);
service.prepareInstance(SERVER_B);
service.exposeBrokerHome(serverMainA, SERVER_A);
service.exposeBrokerHome(serverMainB, SERVER_B);
service.exposeBrokerHome(serverRoot, SERVER_ROOT);
service.exposeHosts(serverRoot, "artemisRoot");
service.exposeHosts(serverMainA, "artemisA");
service.exposeHosts(serverMainB, "artemisB");
service.start(serverMainA);
service.start(serverMainB);
service.start(serverRoot);
}
@After
public void afterStop() {
service.stop(serverRoot);
service.stop(serverMainA);
service.stop(serverMainB);
}
@Test
public void testSplitMirror() throws Throwable {
ConnectionFactory cfRoot = service.createCF(serverRoot, "amqp");
ConnectionFactory cfA = service.createCF(serverMainA, "amqp");
ConnectionFactory cfB = service.createCF(serverMainB, "amqp");
try (Connection connection = cfRoot.createConnection()) {
Session session = connection.createSession();
Queue queue = session.createQueue("someQueue");
MessageProducer producer = session.createProducer(queue);
for (int i = 0; i < 10; i++) {
producer.send(session.createTextMessage("sessionRoot " + i));
}
connection.start();
MessageConsumer consumer = session.createConsumer(queue);
for (int i = 0; i < 5; i++) {
Assert.assertNotNull(consumer.receive(5000));
}
consumer.close();
}
Thread.sleep(1000);
try (Connection connection = cfA.createConnection()) {
Session session = connection.createSession();
Queue queue = session.createQueue("someQueue");
MessageConsumer consumer = session.createConsumer(queue);
connection.start();
for (int i = 5; i < 10; i++) {
TextMessage message = (TextMessage) consumer.receive(5000);
Assert.assertNotNull(message);
}
Assert.assertNull(consumer.receiveNoWait());
}
try (Connection connection = cfB.createConnection()) {
Session session = connection.createSession();
Queue queue = session.createQueue("someQueue");
MessageConsumer consumer = session.createConsumer(queue);
connection.start();
for (int i = 5; i < 10; i++) {
TextMessage message = (TextMessage) consumer.receive(5000);
Assert.assertNotNull(message);
}
Assert.assertNull(consumer.receiveNoWait());
}
try (Connection connection = cfRoot.createConnection()) {
Session session = connection.createSession();
Queue queue = session.createQueue("someQueue");
MessageConsumer consumer = session.createConsumer(queue);
connection.start();
for (int i = 5; i < 10; i++) {
TextMessage message = (TextMessage) consumer.receive(5000);
Assert.assertNotNull(message);
}
Assert.assertNull(consumer.receiveNoWait());
consumer.close();
service.kill(serverMainA);
MessageProducer producer = session.createProducer(queue);
for (int i = 0; i < 33; i++) {
producer.send(session.createTextMessage("afterKill " + i));
}
}
Thread.sleep(1000);
try (Connection connection = cfB.createConnection()) {
Session session = connection.createSession();
Queue queue = session.createQueue("someQueue");
MessageConsumer consumer = session.createConsumer(queue);
connection.start();
for (int i = 0; i < 33; i++) {
TextMessage message = (TextMessage) consumer.receive(5000);
Assert.assertNotNull(message);
System.out.println("message.getText() = " + message.getText() + " i = " + i);
Assert.assertEquals("afterKill " + i, message.getText());
}
Assert.assertNull(consumer.receiveNoWait());
}
service.start(serverMainA);
cfA = service.createCF(serverMainA, "amqp");
try (Connection connection = cfA.createConnection()) {
Session session = connection.createSession();
Queue queue = session.createQueue("someQueue");
MessageConsumer consumer = session.createConsumer(queue);
connection.start();
for (int i = 0; i < 33; i++) {
TextMessage message = (TextMessage) consumer.receive(5000);
Assert.assertNotNull(message);
Assert.assertEquals("afterKill " + i, message.getText());
}
Assert.assertNull(consumer.receiveNoWait());
}
}
}