blob: 2fe17f607f646c1bdf533899dfe35ddc0e5fb405 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.tests.smoke.brokerConnection;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.DeliveryMode;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.activemq.artemis.tests.smoke.common.ContainerService;
import org.apache.activemq.artemis.tests.smoke.common.SmokeTestBase;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
public class DualMirrorWithContainerTest extends SmokeTestBase {
Object network;
public Object serverA;
public Object serverB;
private final String SERVER_A_LOCATION = basedir + "/target/brokerConnect/serverA";
private final String SERVER_B_LOCATION = basedir + "/target/brokerConnect/serverB";
@Before
public void beforeStart() throws Exception {
disableCheckThread();
ValidateContainer.assumeArtemisContainer();
Assert.assertNotNull(basedir);
recreateBrokerDirectory(SERVER_B_LOCATION);
recreateBrokerDirectory(SERVER_A_LOCATION);
ContainerService service = ContainerService.getService();
network = service.newNetwork();
serverA = service.newBrokerImage();
serverB = service.newBrokerImage();
service.setNetwork(serverA, network);
service.setNetwork(serverB, network);
service.exposePorts(serverA, 61616);
service.exposePorts(serverB, 61616);
service.prepareInstance(SERVER_A_LOCATION);
service.prepareInstance(SERVER_B_LOCATION);
service.exposeBrokerHome(serverA, SERVER_A_LOCATION);
service.exposeBrokerHome(serverB, SERVER_B_LOCATION);
service.exposeHosts(serverA, "serverA");
service.exposeHosts(serverB, "serverB");
service.start(serverA);
service.start(serverB);
service.waitForServerToStart(serverA, "artemis", "artemis", 10_000);
service.waitForServerToStart(serverB, "artemis", "artemis", 10_000);
cfA = service.createCF(serverA, "amqp");
cfB = service.createCF(serverA, "amqp");
}
@After
public void afterStop() {
ContainerService.getService().stop(serverA);
ContainerService.getService().stop(serverB);
}
ConnectionFactory cfA;
ConnectionFactory cfB;
@Test
public void testReconnectMirror() throws Throwable {
testReconnectMirror(false);
}
@Test
public void testReconnectMirrorLarge() throws Throwable {
testReconnectMirror(true);
}
private void testReconnectMirror(boolean largemessage) throws Throwable {
int NUMBER_OF_MESSAGES = 1_000;
int FAILURE_INTERVAL = 500;
String extraBody = "message ";
if (largemessage) {
StringBuffer buffer = new StringBuffer();
while (buffer.length() < 200 * 1024) {
buffer.append("This is large ");
}
extraBody = buffer.toString();
}
try (Connection connectionA = cfA.createConnection("artemis", "artemis")) {
// Testing things on the direction from mirroring from A to B...
Session sessionA = connectionA.createSession(true, Session.SESSION_TRANSACTED);
Queue queue = sessionA.createQueue("someQueue");
MessageProducer producerA = sessionA.createProducer(queue);
producerA.setDeliveryMode(DeliveryMode.PERSISTENT);
for (int i = 0; i < NUMBER_OF_MESSAGES; i++) {
TextMessage message = sessionA.createTextMessage(extraBody + i);
message.setStringProperty("color", i % 2 == 0 ? "yellow" : "red");
message.setIntProperty("i", i);
producerA.send(message);
if (i % 100 == 0 && i > 0) {
System.out.println("Message " + i);
sessionA.commit();
}
if (i % FAILURE_INTERVAL == 0 && i > 0) {
restartB();
}
}
sessionA.commit();
connectionA.start();
}
try (Connection connectionB = cfB.createConnection("artemis", "artemis")) {
// Testing things on the direction from mirroring from A to B...
Session sessionB = connectionB.createSession(true, Session.SESSION_TRANSACTED);
Queue queue = sessionB.createQueue("someQueue");
connectionB.start();
MessageConsumer consumerB = sessionB.createConsumer(queue);
for (int i = 0; i < NUMBER_OF_MESSAGES; i++) {
TextMessage message = (TextMessage)consumerB.receive(5_000);
Assert.assertNotNull("expected message at " + i, message);
Assert.assertEquals(extraBody + i, message.getText());
}
Assert.assertNull(consumerB.receiveNoWait());
sessionB.rollback();
}
int restarted = 0;
try (Connection connectionB = cfB.createConnection("artemis", "artemis")) {
// Testing things on the direction from mirroring from A to B...
Session sessionB = connectionB.createSession(false, Session.AUTO_ACKNOWLEDGE);
Queue queue = sessionB.createQueue("someQueue");
connectionB.start();
MessageConsumer consumerB = sessionB.createConsumer(queue, "color='yellow'");
int op = 0;
for (int i = 0; i < NUMBER_OF_MESSAGES; i += 2) {
TextMessage message = (TextMessage)consumerB.receive(5_000);
Assert.assertNotNull("expected message at " + i, message);
Assert.assertEquals(extraBody + i, message.getText());
if (op++ > 0 && op % FAILURE_INTERVAL == 0) {
restartA(++restarted);
}
}
Assert.assertNull(consumerB.receiveNoWait());
}
System.out.println("Restarted serverA " + restarted + " times");
Thread.sleep(5000);
try (Connection connectionA = cfA.createConnection("artemis", "artemis")) {
// Testing things on the direction from mirroring from A to B...
Session sessionA = connectionA.createSession(true, Session.SESSION_TRANSACTED);
Queue queue = sessionA.createQueue("someQueue");
connectionA.start();
MessageConsumer consumerA = sessionA.createConsumer(queue);
for (int i = 1; i < NUMBER_OF_MESSAGES; i += 2) {
TextMessage message = (TextMessage)consumerA.receive(5_000);
Assert.assertNotNull("expected message at " + i, message);
// We should only have red left
Assert.assertEquals("Unexpected message at " + i + " with i=" + message.getIntProperty("i"), "red", message.getStringProperty("color"));
Assert.assertEquals(extraBody + i, message.getText());
}
sessionA.commit();
Assert.assertNull(consumerA.receiveNoWait());
}
Thread.sleep(5000);
try (Connection connectionB = cfB.createConnection("artemis", "artemis")) {
// Testing things on the direction from mirroring from A to B...
Session sessionB = connectionB.createSession(false, Session.AUTO_ACKNOWLEDGE);
Queue queue = sessionB.createQueue("someQueue");
connectionB.start();
MessageConsumer consumerB = sessionB.createConsumer(queue);
TextMessage message = (TextMessage)consumerB.receiveNoWait();
if (message != null) {
Assert.fail("was expected null, however received " + message.getText());
}
}
}
private void restartB() throws Exception {
ContainerService.getService().restart(serverB);
cfB = ContainerService.getService().createCF(serverB, "amqp");
}
private void restartA(int restartNumber) throws Exception {
ContainerService.getService().restart(serverA);
cfA = ContainerService.getService().createCF(serverB, "amqp");
}
}