blob: 22db7e1faf72b421c0affdc7d77e9ce7d43773a5 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.tests.smoke.brokerConnection;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.activemq.artemis.tests.smoke.common.ContainerService;
import org.apache.activemq.artemis.tests.smoke.common.SmokeTestBase;
import org.jboss.logging.Logger;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
public class DualMirrorFailoverWithContainerTest extends SmokeTestBase {
private static final Logger logger = Logger.getLogger(DualMirrorFailoverWithContainerTest.class);
Object network;
public Object serverMainA;
public Object serverBackupA;
public Object serverMainB;
public Object serverBackupB;
ContainerService service = ContainerService.getService();
private final String SERVER_MAIN_A_LOCATION = basedir + "/target/brokerConnect/replicaMainServerA";
private final String SERVER_BACKUP_A_LOCATION = basedir + "/target/brokerConnect/replicaBackupServerA";
private final String SERVER_MAIN_B_LOCATION = basedir + "/target/brokerConnect/replicaMainServerB";
private final String SERVER_BACKUP_B_LOCATION = basedir + "/target/brokerConnect/replicaBackupServerB";
@Before
public void beforeStart() throws Exception {
disableCheckThread();
ValidateContainer.assumeArtemisContainer();
Assert.assertNotNull(basedir);
recreateBrokerDirectory(SERVER_MAIN_A_LOCATION);
recreateBrokerDirectory(SERVER_BACKUP_A_LOCATION);
recreateBrokerDirectory(SERVER_MAIN_B_LOCATION);
recreateBrokerDirectory(SERVER_BACKUP_B_LOCATION);
network = service.newNetwork();
serverMainA = service.newBrokerImage();
serverMainB = service.newBrokerImage();
serverBackupA = service.newBrokerImage();
serverBackupB = service.newBrokerImage();
service.setNetwork(serverMainA, network);
service.setNetwork(serverBackupA, network);
service.setNetwork(serverMainB, network);
service.setNetwork(serverBackupB, network);
service.exposePorts(serverMainA, 61616);
service.exposePorts(serverMainB, 61616);
service.exposePorts(serverBackupA, 61616);
service.exposePorts(serverBackupB, 61616);
service.prepareInstance(SERVER_MAIN_A_LOCATION);
service.prepareInstance(SERVER_MAIN_B_LOCATION);
service.prepareInstance(SERVER_BACKUP_A_LOCATION);
service.prepareInstance(SERVER_BACKUP_B_LOCATION);
service.exposeBrokerHome(serverMainA, SERVER_MAIN_A_LOCATION);
service.exposeBrokerHome(serverMainB, SERVER_MAIN_B_LOCATION);
service.exposeBrokerHome(serverBackupA, SERVER_BACKUP_A_LOCATION);
service.exposeBrokerHome(serverBackupB, SERVER_BACKUP_B_LOCATION);
service.exposeHosts(serverMainA, "mainA");
service.exposeHosts(serverBackupA, "backupA");
service.exposeHosts(serverMainB, "mainB");
service.exposeHosts(serverBackupB, "backupB");
service.logWait(serverBackupA, ".*AMQ221024.*"); // replica is synchronized
service.logWait(serverBackupB, ".*AMQ221024.*");
service.start(serverMainA);
service.start(serverMainB);
service.start(serverBackupA);
service.start(serverBackupB);
cfA = service.createCF(serverMainA, "amqp");
}
@After
public void afterStop() {
service.stop(serverBackupA);
service.stop(serverBackupB);
service.stop(serverMainA);
service.stop(serverMainB);
}
ConnectionFactory cfA;
@Test
public void testReconnectMirrorFailover() throws Throwable {
try {
roundTrip(serverMainA, serverMainB);
service.kill(serverMainB);
waitForServerToStart(service.createURI(serverBackupB, 61616), null, null, 10_000);
roundTrip(serverMainA, serverBackupB);
service.kill(serverMainA);
waitForServerToStart(service.createURI(serverBackupA, 61616), null, null, 10_000);
roundTrip(serverBackupA, serverBackupB);
} catch (Throwable e) {
// this is just so we can instant feedback in case of an assertion error, instead of having to wait tearDown the server,
// which is useful when debugging the test
logger.warn(e.getMessage(), e);
throw e;
}
}
private void roundTrip(Object serverA, Object serverB) throws Throwable {
final String QUEUE_NAME = "exampleQueue";
int NUMBER_OF_MESSAGES = 100;
{
ConnectionFactory factory = service.createCF(serverA, "amqp");
Connection connection = factory.createConnection();
Session session = connection.createSession();
Queue queue = session.createQueue(QUEUE_NAME);
MessageProducer producer = session.createProducer(queue);
for (int i = 0; i < NUMBER_OF_MESSAGES; i++) {
TextMessage msg = session.createTextMessage("message " + i);
msg.setStringProperty("body", "message " + i);
producer.send(msg);
}
connection.close();
}
Thread.sleep(1000);
{
ConnectionFactory factory = service.createCF(serverB, "amqp");
Connection connection = factory.createConnection();
connection.start();
Session session = connection.createSession();
Queue queue = session.createQueue(QUEUE_NAME);
MessageConsumer consumer = session.createConsumer(queue);
for (int i = 0; i < NUMBER_OF_MESSAGES; i++) {
TextMessage message = (TextMessage)consumer.receive(5000);
Assert.assertNotNull(message);
Assert.assertEquals("message " + i, message.getText());
}
Assert.assertNull(consumer.receiveNoWait());
// trying the way back
MessageProducer producer = session.createProducer(queue);
for (int i = 0; i < NUMBER_OF_MESSAGES; i++) {
TextMessage msg = session.createTextMessage("next-message " + i);
msg.setStringProperty("body", "next-message " + i);
producer.send(msg);
}
connection.close();
}
Thread.sleep(1000);
{
ConnectionFactory factory = service.createCF(serverA, "amqp");
Connection connection = factory.createConnection();
connection.start();
Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
Queue queue = session.createQueue(QUEUE_NAME);
MessageConsumer consumer = session.createConsumer(queue);
for (int i = 0; i < NUMBER_OF_MESSAGES; i++) {
TextMessage message = (TextMessage)consumer.receive(5000);
Assert.assertNotNull(message);
Assert.assertEquals("next-message " + i, message.getText());
}
session.commit();
connection.close();
}
Thread.sleep(1000);
}
}