| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * <br> |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * <br> |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.activemq.artemis.tests.smoke.brokerConnection; |
| |
| import javax.jms.Connection; |
| import javax.jms.ConnectionFactory; |
| import javax.jms.DeliveryMode; |
| import javax.jms.JMSException; |
| import javax.jms.MessageConsumer; |
| import javax.jms.MessageProducer; |
| import javax.jms.Queue; |
| import javax.jms.Session; |
| import javax.jms.TextMessage; |
| |
| import java.io.File; |
| import java.io.FileOutputStream; |
| import java.io.PrintStream; |
| |
| import org.apache.activemq.artemis.cli.commands.tools.PrintData; |
| import org.apache.activemq.artemis.tests.smoke.common.SmokeTestBase; |
| import org.apache.activemq.artemis.tests.util.CFUtil; |
| import org.apache.activemq.artemis.util.ServerUtil; |
| import org.jboss.logging.Logger; |
| import org.junit.Assert; |
| import org.junit.Before; |
| import org.junit.Test; |
| |
| public class DualMirrorNoContainerTest extends SmokeTestBase { |
| |
| // Change this to true to generate a print-data in certain cases on this test |
| private static final boolean PRINT_DATA = false; |
| |
| private static final Logger logger = Logger.getLogger(DualMirrorNoContainerTest.class); |
| |
| public static final String SERVER_NAME_A = "brokerConnect/mirrorSecurityA"; |
| public static final String SERVER_NAME_B = "brokerConnect/mirrorSecurityB"; |
| |
| Process processB; |
| Process processA; |
| |
| @Before |
| public void beforeClass() throws Exception { |
| cleanupData(SERVER_NAME_A); |
| cleanupData(SERVER_NAME_B); |
| processB = startServer(SERVER_NAME_B, 0, 0); |
| processA = startServer(SERVER_NAME_A, 0, 0); |
| |
| ServerUtil.waitForServerToStart(1, "B", "B", 30000); |
| ServerUtil.waitForServerToStart(0, "A", "A", 30000); |
| } |
| |
| @Test |
| public void testMirrorWithTX() throws Throwable { |
| testMirrorOverBokerConnection(true); |
| } |
| |
| @Test |
| public void testMirrorWithoutTX() throws Throwable { |
| testMirrorOverBokerConnection(false); |
| } |
| |
| @Test |
| public void testRollback() throws Throwable { |
| ConnectionFactory cfA = CFUtil.createConnectionFactory("amqp", "tcp://localhost:61616"); |
| ConnectionFactory cfB = CFUtil.createConnectionFactory("amqp", "tcp://localhost:61617"); |
| |
| |
| try (Connection connectionA = cfA.createConnection("A", "A"); |
| Connection connectionB = cfB.createConnection("B", "B")) { |
| |
| // Testing things on the direction from mirroring from A to B... |
| Session sessionA = connectionA.createSession(true, Session.SESSION_TRANSACTED); |
| Queue queue = sessionA.createQueue("someQueue"); |
| MessageProducer producerA = sessionA.createProducer(queue); |
| sendMessages(true, sessionA, producerA, 0, 99); |
| |
| connectionA.start(); |
| |
| MessageConsumer consumerA = sessionA.createConsumer(queue); |
| receiveMessages(true, sessionA, consumerA, 0, 49); |
| receiveMessages(false, sessionA, consumerA, 50, 70); |
| // notice we will leave the messages not committed here, and we will move to the other side.. messages should still be there. |
| |
| |
| // Switching consumption to the server B |
| |
| Thread.sleep(1000); // The bridge on acks is asynchronous. We need to wait some time to avoid intermittent failures |
| // I could replace the wait here with a Wait clause, but I would need to configure JMX or management in order to get the Queue Counts |
| |
| Session sessionB = connectionB.createSession(true, Session.SESSION_TRANSACTED); |
| MessageConsumer consumerB = sessionB.createConsumer(queue); |
| MessageProducer producerB = sessionB.createProducer(queue); |
| connectionB.start(); |
| |
| receiveMessages(false, sessionB, consumerB, 50, 99); |
| consumerA.close(); |
| sessionA.rollback(); // this is needed to clear up delivering state |
| |
| sessionB.commit(); |
| } |
| |
| } |
| |
| public void testMirrorOverBokerConnection(boolean tx) throws Throwable { |
| ConnectionFactory cfA = CFUtil.createConnectionFactory("amqp", "tcp://localhost:61616"); |
| ConnectionFactory cfB = CFUtil.createConnectionFactory("amqp", "tcp://localhost:61617"); |
| |
| |
| try (Connection connectionA = cfA.createConnection("A", "A"); |
| Connection connectionB = cfB.createConnection("B", "B")) { |
| |
| // Testing things on the direction from mirroring from A to B... |
| Session sessionA = connectionA.createSession(tx, tx ? Session.SESSION_TRANSACTED : Session.AUTO_ACKNOWLEDGE); |
| Queue queue = sessionA.createQueue("someQueue"); |
| MessageProducer producerA = sessionA.createProducer(queue); |
| sendMessages(tx, sessionA, producerA, 0, 9); |
| |
| connectionA.start(); |
| |
| MessageConsumer consumerA = sessionA.createConsumer(queue); |
| receiveMessages(tx, sessionA, consumerA, 0, 4); |
| consumerA.close(); |
| |
| |
| // Switching consumption to the server B |
| |
| Thread.sleep(1000); // The bridge on acks is asynchronous. We need to wait some time to avoid intermittent failures |
| // I could replace the wait here with a Wait clause, but I would need to configure JMX or management in order to get the Queue Counts |
| |
| Session sessionB = connectionB.createSession(tx, tx ? Session.SESSION_TRANSACTED : Session.AUTO_ACKNOWLEDGE); |
| MessageConsumer consumerB = sessionB.createConsumer(queue); |
| MessageProducer producerB = sessionB.createProducer(queue); |
| connectionB.start(); |
| |
| receiveMessages(tx, sessionB, consumerB, 5, 9); |
| Assert.assertNull(consumerB.receiveNoWait()); |
| |
| sendMessages(tx, sessionB, producerB, 0, 19); |
| receiveMessages(tx, sessionB, consumerB, 0, 9); |
| |
| consumerB.close(); |
| |
| // switching over back again to A, some sleep here |
| Thread.sleep(1000); |
| |
| consumerA = sessionA.createConsumer(queue); |
| |
| receiveMessages(tx, sessionA, consumerA, 10, 19); |
| |
| Assert.assertNull(consumerA.receiveNoWait()); |
| } |
| } |
| |
| @Test |
| public void testReconnectMirror() throws Throwable { |
| testReconnectMirror(false); |
| } |
| |
| @Test |
| public void testReconnectMirrorLargeMessage() throws Throwable { |
| testReconnectMirror(true); |
| } |
| |
| private void testReconnectMirror(boolean isLarge) throws Throwable { |
| ConnectionFactory cfA = CFUtil.createConnectionFactory("amqp", "tcp://localhost:61616"); |
| ConnectionFactory cfB = CFUtil.createConnectionFactory("amqp", "tcp://localhost:61617"); |
| |
| String largeBuffer = ""; |
| |
| if (isLarge) { |
| StringBuffer buffer = new StringBuffer(); |
| while (buffer.length() < 200 * 1024) { |
| buffer.append("This is large "); |
| } |
| largeBuffer = buffer.toString(); |
| } |
| |
| int NUMBER_OF_MESSAGES = isLarge ? 100 : 1_000; |
| int FAILURE_INTERVAL = isLarge ? 10 : 100; |
| |
| try (Connection connectionA = cfA.createConnection("A", "A")) { |
| |
| // Testing things on the direction from mirroring from A to B... |
| Session sessionA = connectionA.createSession(true, Session.SESSION_TRANSACTED); |
| Queue queue = sessionA.createQueue("someQueue"); |
| MessageProducer producerA = sessionA.createProducer(queue); |
| producerA.setDeliveryMode(DeliveryMode.PERSISTENT); |
| |
| for (int i = 0; i < NUMBER_OF_MESSAGES; i++) { |
| TextMessage message = sessionA.createTextMessage("message " + i + largeBuffer); |
| message.setStringProperty("color", i % 2 == 0 ? "yellow" : "red"); |
| message.setIntProperty("i", i); |
| producerA.send(message); |
| |
| if (i % 1000 == 0 && i > 0) { |
| System.out.println("Message " + i); |
| sessionA.commit(); |
| } |
| |
| if (i % FAILURE_INTERVAL == 0 && i > 0) { |
| restartB(); |
| } |
| } |
| |
| sessionA.commit(); |
| |
| connectionA.start(); |
| |
| } |
| |
| try (Connection connectionB = cfB.createConnection("B", "B")) { |
| |
| // Testing things on the direction from mirroring from A to B... |
| Session sessionB = connectionB.createSession(true, Session.SESSION_TRANSACTED); |
| Queue queue = sessionB.createQueue("someQueue"); |
| |
| connectionB.start(); |
| |
| MessageConsumer consumerB = sessionB.createConsumer(queue); |
| |
| for (int i = 0; i < NUMBER_OF_MESSAGES; i++) { |
| TextMessage message = (TextMessage)consumerB.receive(5_000); |
| Assert.assertNotNull("expected message at " + i, message); |
| Assert.assertEquals("message " + i + largeBuffer, message.getText()); |
| } |
| Assert.assertNull(consumerB.receiveNoWait()); |
| sessionB.rollback(); |
| } |
| |
| int restarted = 0; |
| |
| try (Connection connectionB = cfB.createConnection("B", "B")) { |
| |
| // Testing things on the direction from mirroring from A to B... |
| Session sessionB = connectionB.createSession(false, Session.AUTO_ACKNOWLEDGE); |
| Queue queue = sessionB.createQueue("someQueue"); |
| |
| connectionB.start(); |
| |
| MessageConsumer consumerB = sessionB.createConsumer(queue, "color='yellow'"); |
| |
| int op = 0; |
| for (int i = 0; i < NUMBER_OF_MESSAGES; i += 2) { |
| //System.out.println("Received message on i=" + i); |
| TextMessage message = (TextMessage)consumerB.receive(5_000); |
| Assert.assertNotNull("expected message at " + i, message); |
| Assert.assertEquals("message " + i + largeBuffer, message.getText()); |
| |
| if (op++ > 0 && op % FAILURE_INTERVAL == 0) { |
| restartA(++restarted); |
| } |
| } |
| |
| Assert.assertNull(consumerB.receiveNoWait()); |
| } |
| |
| System.out.println("Restarted serverA " + restarted + " times"); |
| |
| Thread.sleep(1000); |
| |
| try (Connection connectionA = cfA.createConnection("A", "A")) { |
| |
| // Testing things on the direction from mirroring from A to B... |
| Session sessionA = connectionA.createSession(true, Session.SESSION_TRANSACTED); |
| Queue queue = sessionA.createQueue("someQueue"); |
| |
| connectionA.start(); |
| |
| MessageConsumer consumerA = sessionA.createConsumer(queue); |
| |
| for (int i = 1; i < NUMBER_OF_MESSAGES; i += 2) { |
| TextMessage message = (TextMessage)consumerA.receive(5_000); |
| Assert.assertNotNull("expected message at " + i, message); |
| // We should only have red left |
| Assert.assertEquals("Unexpected message at " + i + " with i=" + message.getIntProperty("i"), "red", message.getStringProperty("color")); |
| Assert.assertEquals("message " + i + largeBuffer, message.getText()); |
| } |
| |
| sessionA.commit(); |
| |
| Assert.assertNull(consumerA.receiveNoWait()); |
| } |
| |
| Thread.sleep(5000); |
| |
| try (Connection connectionB = cfB.createConnection("B", "B")) { |
| |
| // Testing things on the direction from mirroring from A to B... |
| Session sessionB = connectionB.createSession(false, Session.AUTO_ACKNOWLEDGE); |
| Queue queue = sessionB.createQueue("someQueue"); |
| |
| connectionB.start(); |
| |
| MessageConsumer consumerB = sessionB.createConsumer(queue); |
| |
| TextMessage message = (TextMessage)consumerB.receiveNoWait(); |
| |
| if (message != null) { |
| Assert.fail("was expected null, however received " + message.getText()); |
| } |
| } |
| |
| } |
| |
| private void restartB() throws Exception { |
| processB.destroyForcibly(); |
| |
| Thread.sleep(500); |
| |
| /*String localtionServerB = getServerLocation(SERVER_NAME_B); |
| File fileB = new File(localtionServerB + "/data"); |
| PrintData.printData(new File(fileB, "bindings"), new File(fileB, "journal"), new File(fileB, "paging"), false); */ |
| |
| processB = startServer(SERVER_NAME_B, 0, 0); |
| ServerUtil.waitForServerToStart(1, "B", "B", 30000); |
| } |
| |
| private void restartA(int restartNumber) throws Exception { |
| |
| System.out.println("Restarting A"); |
| processA.destroyForcibly(); |
| |
| Thread.sleep(1000); |
| |
| if (PRINT_DATA) { |
| String localtionServerA = getServerLocation(SERVER_NAME_A); |
| File fileA = new File(localtionServerA + "/data"); |
| File fileOutput = new File(localtionServerA + "/log/print-data-" + restartNumber + ".txt"); |
| FileOutputStream fileOutputStream = new FileOutputStream(fileOutput); |
| PrintData.printData(new File(fileA, "bindings"), new File(fileA, "journal"), new File(fileA, "paging"), new PrintStream(fileOutputStream), false, false); |
| fileOutputStream.close(); |
| } |
| |
| processA = startServer(SERVER_NAME_A, 0, 0); |
| ServerUtil.waitForServerToStart(0, "A", "A", 30000); |
| Thread.sleep(1000); |
| } |
| |
| private void receiveMessages(boolean tx, Session session, MessageConsumer consumer, int start, int end) throws JMSException { |
| for (int i = start; i <= end; i++) { |
| TextMessage message = (TextMessage) consumer.receive(1000); |
| Assert.assertNotNull(message); |
| Assert.assertEquals("message " + i, message.getText()); |
| } |
| if (tx) session.commit(); |
| } |
| |
| private void sendMessages(boolean tx, Session session, MessageProducer producer, int start, int end) throws JMSException { |
| for (int i = start; i <= end; i++) { |
| producer.send(session.createTextMessage("message " + i)); |
| } |
| if (tx) session.commit(); |
| } |
| |
| } |