| /** |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * <p> |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * <p> |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.activemq.artemis.tests.smoke.brokerConnection; |
| |
| import javax.jms.Connection; |
| import javax.jms.ConnectionFactory; |
| import javax.jms.MessageConsumer; |
| import javax.jms.MessageProducer; |
| import javax.jms.Queue; |
| import javax.jms.Session; |
| import javax.jms.TextMessage; |
| |
| import org.apache.activemq.artemis.tests.smoke.common.ContainerService; |
| import org.apache.activemq.artemis.tests.smoke.common.SmokeTestBase; |
| import org.jboss.logging.Logger; |
| import org.junit.After; |
| import org.junit.Assert; |
| import org.junit.Before; |
| import org.junit.Test; |
| |
| public class SplitMirrorTest extends SmokeTestBase { |
| |
| private static final Logger logger = Logger.getLogger(SplitMirrorTest.class); |
| |
| Object network; |
| |
| public Object serverMainA; |
| |
| public Object serverMainB; |
| |
| public Object serverRoot; |
| |
| ContainerService service = ContainerService.getService(); |
| |
| private final String SERVER_ROOT = basedir + "/target/brokerConnect/splitMirror/serverRoot"; |
| private final String SERVER_A = basedir + "/target/brokerConnect/splitMirror/serverA"; |
| private final String SERVER_B = basedir + "/target/brokerConnect/splitMirror/serverB"; |
| |
| @Before |
| public void beforeStart() throws Exception { |
| disableCheckThread(); |
| ValidateContainer.assumeArtemisContainer(); |
| |
| Assert.assertNotNull(basedir); |
| recreateBrokerDirectory(SERVER_ROOT); |
| recreateBrokerDirectory(SERVER_A); |
| recreateBrokerDirectory(SERVER_B); |
| network = service.newNetwork(); |
| serverMainA = service.newBrokerImage(); |
| serverMainB = service.newBrokerImage(); |
| serverRoot = service.newBrokerImage(); |
| service.setNetwork(serverMainA, network); |
| service.setNetwork(serverMainB, network); |
| service.setNetwork(serverRoot, network); |
| service.exposePorts(serverMainA, 61616); |
| service.exposePorts(serverMainB, 61616); |
| service.exposePorts(serverRoot, 61616); |
| service.prepareInstance(SERVER_ROOT); |
| service.prepareInstance(SERVER_A); |
| service.prepareInstance(SERVER_B); |
| service.exposeBrokerHome(serverMainA, SERVER_A); |
| service.exposeBrokerHome(serverMainB, SERVER_B); |
| service.exposeBrokerHome(serverRoot, SERVER_ROOT); |
| service.exposeHosts(serverRoot, "artemisRoot"); |
| service.exposeHosts(serverMainA, "artemisA"); |
| service.exposeHosts(serverMainB, "artemisB"); |
| |
| service.start(serverMainA); |
| service.start(serverMainB); |
| service.start(serverRoot); |
| } |
| |
| @After |
| public void afterStop() { |
| service.stop(serverRoot); |
| service.stop(serverMainA); |
| service.stop(serverMainB); |
| } |
| |
| @Test |
| public void testSplitMirror() throws Throwable { |
| ConnectionFactory cfRoot = service.createCF(serverRoot, "amqp"); |
| ConnectionFactory cfA = service.createCF(serverMainA, "amqp"); |
| ConnectionFactory cfB = service.createCF(serverMainB, "amqp"); |
| try (Connection connection = cfRoot.createConnection()) { |
| Session session = connection.createSession(); |
| Queue queue = session.createQueue("someQueue"); |
| MessageProducer producer = session.createProducer(queue); |
| for (int i = 0; i < 10; i++) { |
| producer.send(session.createTextMessage("sessionRoot " + i)); |
| } |
| connection.start(); |
| |
| MessageConsumer consumer = session.createConsumer(queue); |
| for (int i = 0; i < 5; i++) { |
| Assert.assertNotNull(consumer.receive(5000)); |
| } |
| consumer.close(); |
| } |
| |
| Thread.sleep(1000); |
| |
| try (Connection connection = cfA.createConnection()) { |
| Session session = connection.createSession(); |
| Queue queue = session.createQueue("someQueue"); |
| MessageConsumer consumer = session.createConsumer(queue); |
| connection.start(); |
| for (int i = 5; i < 10; i++) { |
| TextMessage message = (TextMessage) consumer.receive(5000); |
| Assert.assertNotNull(message); |
| } |
| Assert.assertNull(consumer.receiveNoWait()); |
| } |
| |
| try (Connection connection = cfB.createConnection()) { |
| Session session = connection.createSession(); |
| Queue queue = session.createQueue("someQueue"); |
| MessageConsumer consumer = session.createConsumer(queue); |
| connection.start(); |
| for (int i = 5; i < 10; i++) { |
| TextMessage message = (TextMessage) consumer.receive(5000); |
| Assert.assertNotNull(message); |
| } |
| Assert.assertNull(consumer.receiveNoWait()); |
| } |
| |
| try (Connection connection = cfRoot.createConnection()) { |
| Session session = connection.createSession(); |
| Queue queue = session.createQueue("someQueue"); |
| MessageConsumer consumer = session.createConsumer(queue); |
| connection.start(); |
| for (int i = 5; i < 10; i++) { |
| TextMessage message = (TextMessage) consumer.receive(5000); |
| Assert.assertNotNull(message); |
| } |
| Assert.assertNull(consumer.receiveNoWait()); |
| consumer.close(); |
| service.kill(serverMainA); |
| MessageProducer producer = session.createProducer(queue); |
| for (int i = 0; i < 33; i++) { |
| producer.send(session.createTextMessage("afterKill " + i)); |
| } |
| } |
| |
| Thread.sleep(1000); |
| |
| try (Connection connection = cfB.createConnection()) { |
| Session session = connection.createSession(); |
| Queue queue = session.createQueue("someQueue"); |
| MessageConsumer consumer = session.createConsumer(queue); |
| connection.start(); |
| for (int i = 0; i < 33; i++) { |
| TextMessage message = (TextMessage) consumer.receive(5000); |
| Assert.assertNotNull(message); |
| System.out.println("message.getText() = " + message.getText() + " i = " + i); |
| Assert.assertEquals("afterKill " + i, message.getText()); |
| } |
| Assert.assertNull(consumer.receiveNoWait()); |
| } |
| |
| service.start(serverMainA); |
| |
| cfA = service.createCF(serverMainA, "amqp"); |
| |
| try (Connection connection = cfA.createConnection()) { |
| Session session = connection.createSession(); |
| Queue queue = session.createQueue("someQueue"); |
| MessageConsumer consumer = session.createConsumer(queue); |
| connection.start(); |
| for (int i = 0; i < 33; i++) { |
| TextMessage message = (TextMessage) consumer.receive(5000); |
| Assert.assertNotNull(message); |
| Assert.assertEquals("afterKill " + i, message.getText()); |
| } |
| Assert.assertNull(consumer.receiveNoWait()); |
| } |
| } |
| |
| } |