blob: 3b7e3d88221cf6f389e9ac269990aa25def283df [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
* <br>
* http://www.apache.org/licenses/LICENSE-2.0
* <br>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.tests.smoke.brokerConnection;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.DeliveryMode;
import javax.jms.JMSException;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.Session;
import javax.jms.TextMessage;
import java.io.File;
import java.io.FileOutputStream;
import java.io.PrintStream;
import org.apache.activemq.artemis.cli.commands.tools.PrintData;
import org.apache.activemq.artemis.tests.smoke.common.SmokeTestBase;
import org.apache.activemq.artemis.tests.util.CFUtil;
import org.apache.activemq.artemis.util.ServerUtil;
import org.jboss.logging.Logger;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
public class DualMirrorNoContainerTest extends SmokeTestBase {
// Change this to true to generate a print-data in certain cases on this test
private static final boolean PRINT_DATA = false;
private static final Logger logger = Logger.getLogger(DualMirrorNoContainerTest.class);
public static final String SERVER_NAME_A = "brokerConnect/mirrorSecurityA";
public static final String SERVER_NAME_B = "brokerConnect/mirrorSecurityB";
Process processB;
Process processA;
@Before
public void beforeClass() throws Exception {
cleanupData(SERVER_NAME_A);
cleanupData(SERVER_NAME_B);
processB = startServer(SERVER_NAME_B, 0, 0);
processA = startServer(SERVER_NAME_A, 0, 0);
ServerUtil.waitForServerToStart(1, "B", "B", 30000);
ServerUtil.waitForServerToStart(0, "A", "A", 30000);
}
@Test
public void testMirrorWithTX() throws Throwable {
testMirrorOverBokerConnection(true);
}
@Test
public void testMirrorWithoutTX() throws Throwable {
testMirrorOverBokerConnection(false);
}
@Test
public void testRollback() throws Throwable {
ConnectionFactory cfA = CFUtil.createConnectionFactory("amqp", "tcp://localhost:61616");
ConnectionFactory cfB = CFUtil.createConnectionFactory("amqp", "tcp://localhost:61617");
try (Connection connectionA = cfA.createConnection("A", "A");
Connection connectionB = cfB.createConnection("B", "B")) {
// Testing things on the direction from mirroring from A to B...
Session sessionA = connectionA.createSession(true, Session.SESSION_TRANSACTED);
Queue queue = sessionA.createQueue("someQueue");
MessageProducer producerA = sessionA.createProducer(queue);
sendMessages(true, sessionA, producerA, 0, 99);
connectionA.start();
MessageConsumer consumerA = sessionA.createConsumer(queue);
receiveMessages(true, sessionA, consumerA, 0, 49);
receiveMessages(false, sessionA, consumerA, 50, 70);
// notice we will leave the messages not committed here, and we will move to the other side.. messages should still be there.
// Switching consumption to the server B
Thread.sleep(1000); // The bridge on acks is asynchronous. We need to wait some time to avoid intermittent failures
// I could replace the wait here with a Wait clause, but I would need to configure JMX or management in order to get the Queue Counts
Session sessionB = connectionB.createSession(true, Session.SESSION_TRANSACTED);
MessageConsumer consumerB = sessionB.createConsumer(queue);
MessageProducer producerB = sessionB.createProducer(queue);
connectionB.start();
receiveMessages(false, sessionB, consumerB, 50, 99);
consumerA.close();
sessionA.rollback(); // this is needed to clear up delivering state
sessionB.commit();
}
}
public void testMirrorOverBokerConnection(boolean tx) throws Throwable {
ConnectionFactory cfA = CFUtil.createConnectionFactory("amqp", "tcp://localhost:61616");
ConnectionFactory cfB = CFUtil.createConnectionFactory("amqp", "tcp://localhost:61617");
try (Connection connectionA = cfA.createConnection("A", "A");
Connection connectionB = cfB.createConnection("B", "B")) {
// Testing things on the direction from mirroring from A to B...
Session sessionA = connectionA.createSession(tx, tx ? Session.SESSION_TRANSACTED : Session.AUTO_ACKNOWLEDGE);
Queue queue = sessionA.createQueue("someQueue");
MessageProducer producerA = sessionA.createProducer(queue);
sendMessages(tx, sessionA, producerA, 0, 9);
connectionA.start();
MessageConsumer consumerA = sessionA.createConsumer(queue);
receiveMessages(tx, sessionA, consumerA, 0, 4);
consumerA.close();
// Switching consumption to the server B
Thread.sleep(1000); // The bridge on acks is asynchronous. We need to wait some time to avoid intermittent failures
// I could replace the wait here with a Wait clause, but I would need to configure JMX or management in order to get the Queue Counts
Session sessionB = connectionB.createSession(tx, tx ? Session.SESSION_TRANSACTED : Session.AUTO_ACKNOWLEDGE);
MessageConsumer consumerB = sessionB.createConsumer(queue);
MessageProducer producerB = sessionB.createProducer(queue);
connectionB.start();
receiveMessages(tx, sessionB, consumerB, 5, 9);
Assert.assertNull(consumerB.receiveNoWait());
sendMessages(tx, sessionB, producerB, 0, 19);
receiveMessages(tx, sessionB, consumerB, 0, 9);
consumerB.close();
// switching over back again to A, some sleep here
Thread.sleep(1000);
consumerA = sessionA.createConsumer(queue);
receiveMessages(tx, sessionA, consumerA, 10, 19);
Assert.assertNull(consumerA.receiveNoWait());
}
}
@Test
public void testReconnectMirror() throws Throwable {
testReconnectMirror(false);
}
@Test
public void testReconnectMirrorLargeMessage() throws Throwable {
testReconnectMirror(true);
}
private void testReconnectMirror(boolean isLarge) throws Throwable {
ConnectionFactory cfA = CFUtil.createConnectionFactory("amqp", "tcp://localhost:61616");
ConnectionFactory cfB = CFUtil.createConnectionFactory("amqp", "tcp://localhost:61617");
String largeBuffer = "";
if (isLarge) {
StringBuffer buffer = new StringBuffer();
while (buffer.length() < 200 * 1024) {
buffer.append("This is large ");
}
largeBuffer = buffer.toString();
}
int NUMBER_OF_MESSAGES = isLarge ? 100 : 1_000;
int FAILURE_INTERVAL = isLarge ? 10 : 100;
try (Connection connectionA = cfA.createConnection("A", "A")) {
// Testing things on the direction from mirroring from A to B...
Session sessionA = connectionA.createSession(true, Session.SESSION_TRANSACTED);
Queue queue = sessionA.createQueue("someQueue");
MessageProducer producerA = sessionA.createProducer(queue);
producerA.setDeliveryMode(DeliveryMode.PERSISTENT);
for (int i = 0; i < NUMBER_OF_MESSAGES; i++) {
TextMessage message = sessionA.createTextMessage("message " + i + largeBuffer);
message.setStringProperty("color", i % 2 == 0 ? "yellow" : "red");
message.setIntProperty("i", i);
producerA.send(message);
if (i % 1000 == 0 && i > 0) {
System.out.println("Message " + i);
sessionA.commit();
}
if (i % FAILURE_INTERVAL == 0 && i > 0) {
restartB();
}
}
sessionA.commit();
connectionA.start();
}
try (Connection connectionB = cfB.createConnection("B", "B")) {
// Testing things on the direction from mirroring from A to B...
Session sessionB = connectionB.createSession(true, Session.SESSION_TRANSACTED);
Queue queue = sessionB.createQueue("someQueue");
connectionB.start();
MessageConsumer consumerB = sessionB.createConsumer(queue);
for (int i = 0; i < NUMBER_OF_MESSAGES; i++) {
TextMessage message = (TextMessage)consumerB.receive(5_000);
Assert.assertNotNull("expected message at " + i, message);
Assert.assertEquals("message " + i + largeBuffer, message.getText());
}
Assert.assertNull(consumerB.receiveNoWait());
sessionB.rollback();
}
int restarted = 0;
try (Connection connectionB = cfB.createConnection("B", "B")) {
// Testing things on the direction from mirroring from A to B...
Session sessionB = connectionB.createSession(false, Session.AUTO_ACKNOWLEDGE);
Queue queue = sessionB.createQueue("someQueue");
connectionB.start();
MessageConsumer consumerB = sessionB.createConsumer(queue, "color='yellow'");
int op = 0;
for (int i = 0; i < NUMBER_OF_MESSAGES; i += 2) {
//System.out.println("Received message on i=" + i);
TextMessage message = (TextMessage)consumerB.receive(5_000);
Assert.assertNotNull("expected message at " + i, message);
Assert.assertEquals("message " + i + largeBuffer, message.getText());
if (op++ > 0 && op % FAILURE_INTERVAL == 0) {
restartA(++restarted);
}
}
Assert.assertNull(consumerB.receiveNoWait());
}
System.out.println("Restarted serverA " + restarted + " times");
Thread.sleep(1000);
try (Connection connectionA = cfA.createConnection("A", "A")) {
// Testing things on the direction from mirroring from A to B...
Session sessionA = connectionA.createSession(true, Session.SESSION_TRANSACTED);
Queue queue = sessionA.createQueue("someQueue");
connectionA.start();
MessageConsumer consumerA = sessionA.createConsumer(queue);
for (int i = 1; i < NUMBER_OF_MESSAGES; i += 2) {
TextMessage message = (TextMessage)consumerA.receive(5_000);
Assert.assertNotNull("expected message at " + i, message);
// We should only have red left
Assert.assertEquals("Unexpected message at " + i + " with i=" + message.getIntProperty("i"), "red", message.getStringProperty("color"));
Assert.assertEquals("message " + i + largeBuffer, message.getText());
}
sessionA.commit();
Assert.assertNull(consumerA.receiveNoWait());
}
Thread.sleep(5000);
try (Connection connectionB = cfB.createConnection("B", "B")) {
// Testing things on the direction from mirroring from A to B...
Session sessionB = connectionB.createSession(false, Session.AUTO_ACKNOWLEDGE);
Queue queue = sessionB.createQueue("someQueue");
connectionB.start();
MessageConsumer consumerB = sessionB.createConsumer(queue);
TextMessage message = (TextMessage)consumerB.receiveNoWait();
if (message != null) {
Assert.fail("was expected null, however received " + message.getText());
}
}
}
private void restartB() throws Exception {
processB.destroyForcibly();
Thread.sleep(500);
/*String localtionServerB = getServerLocation(SERVER_NAME_B);
File fileB = new File(localtionServerB + "/data");
PrintData.printData(new File(fileB, "bindings"), new File(fileB, "journal"), new File(fileB, "paging"), false); */
processB = startServer(SERVER_NAME_B, 0, 0);
ServerUtil.waitForServerToStart(1, "B", "B", 30000);
}
private void restartA(int restartNumber) throws Exception {
System.out.println("Restarting A");
processA.destroyForcibly();
Thread.sleep(1000);
if (PRINT_DATA) {
String localtionServerA = getServerLocation(SERVER_NAME_A);
File fileA = new File(localtionServerA + "/data");
File fileOutput = new File(localtionServerA + "/log/print-data-" + restartNumber + ".txt");
FileOutputStream fileOutputStream = new FileOutputStream(fileOutput);
PrintData.printData(new File(fileA, "bindings"), new File(fileA, "journal"), new File(fileA, "paging"), new PrintStream(fileOutputStream), false, false);
fileOutputStream.close();
}
processA = startServer(SERVER_NAME_A, 0, 0);
ServerUtil.waitForServerToStart(0, "A", "A", 30000);
Thread.sleep(1000);
}
private void receiveMessages(boolean tx, Session session, MessageConsumer consumer, int start, int end) throws JMSException {
for (int i = start; i <= end; i++) {
TextMessage message = (TextMessage) consumer.receive(1000);
Assert.assertNotNull(message);
Assert.assertEquals("message " + i, message.getText());
}
if (tx) session.commit();
}
private void sendMessages(boolean tx, Session session, MessageProducer producer, int start, int end) throws JMSException {
for (int i = start; i <= end; i++) {
producer.send(session.createTextMessage("message " + i));
}
if (tx) session.commit();
}
}