blob: 323fdfdff101ab68733f419295eed953c0c85cc5 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.tests.integration.amqp.connect;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.Session;
import javax.jms.TextMessage;
import java.io.PrintStream;
import java.net.URI;
import java.util.concurrent.TimeUnit;
import org.apache.activemq.artemis.api.core.QueueConfiguration;
import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.cli.commands.tools.PrintData;
import org.apache.activemq.artemis.core.config.amqpBrokerConnectivity.AMQPBrokerConnectConfiguration;
import org.apache.activemq.artemis.core.config.amqpBrokerConnectivity.AMQPMirrorBrokerConnectionElement;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.impl.AddressInfo;
import org.apache.activemq.artemis.logs.AssertionLoggerHandler;
import org.apache.activemq.artemis.tests.integration.amqp.AmqpClientTestSupport;
import org.apache.activemq.artemis.tests.util.CFUtil;
import org.apache.activemq.artemis.utils.StringPrintStream;
import org.apache.activemq.artemis.utils.Wait;
import org.apache.activemq.transport.amqp.client.AmqpClient;
import org.apache.activemq.transport.amqp.client.AmqpConnection;
import org.apache.activemq.transport.amqp.client.AmqpMessage;
import org.apache.activemq.transport.amqp.client.AmqpReceiver;
import org.apache.activemq.transport.amqp.client.AmqpSender;
import org.apache.activemq.transport.amqp.client.AmqpSession;
import org.jboss.logging.Logger;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
public class BrokerInSyncTest extends AmqpClientTestSupport {
public static final int TIME_BEFORE_RESTART = 1000;
protected static final int AMQP_PORT_2 = 5673;
protected static final int AMQP_PORT_3 = 5674;
private static final Logger logger = Logger.getLogger(BrokerInSyncTest.class);
ActiveMQServer server_2;
@Before
public void startLogging() {
AssertionLoggerHandler.startCapture();
}
@After
public void stopLogging() {
try {
Assert.assertFalse(AssertionLoggerHandler.findText("AMQ222214"));
} finally {
AssertionLoggerHandler.stopCapture();
}
}
@Override
protected ActiveMQServer createServer() throws Exception {
return createServer(AMQP_PORT, false);
}
@Test
public void testSyncOnCreateQueues() throws Exception {
server.setIdentity("Server1");
{
AMQPBrokerConnectConfiguration amqpConnection = new AMQPBrokerConnectConfiguration("connectTowardsServer2", "tcp://localhost:" + AMQP_PORT_2).setReconnectAttempts(3).setRetryInterval(100);
amqpConnection.addElement(new AMQPMirrorBrokerConnectionElement().setDurable(true));
server.getConfiguration().addAMQPConnection(amqpConnection);
}
server.start();
server_2 = createServer(AMQP_PORT_2, false);
server_2.setIdentity("Server2");
{
AMQPBrokerConnectConfiguration amqpConnection = new AMQPBrokerConnectConfiguration("connectTowardsServer1", "tcp://localhost:" + AMQP_PORT).setReconnectAttempts(-1).setRetryInterval(100);
amqpConnection.addElement(new AMQPMirrorBrokerConnectionElement().setDurable(true));
server_2.getConfiguration().addAMQPConnection(amqpConnection);
}
server_2.start();
server_2.addAddressInfo(new AddressInfo("sometest").setAutoCreated(false));
server_2.createQueue(new QueueConfiguration("sometest").setDurable(true));
Wait.assertTrue(() -> server_2.locateQueue("sometest") != null);
Wait.assertTrue(() -> server.locateQueue("sometest") != null);
server.addAddressInfo(new AddressInfo("OnServer1").setAutoCreated(false));
server.createQueue(new QueueConfiguration("OnServer1").setDurable(true));
Wait.assertTrue(() -> server.locateQueue("OnServer1") != null);
Wait.assertTrue("Sync is not working on the way back", () -> server_2.locateQueue("OnServer1") != null, 2000);
Wait.assertTrue(() -> server_2.locateQueue("sometest") != null);
Wait.assertTrue(() -> server.locateQueue("sometest") != null);
for (int i = 0; i < 10; i++) {
final int queueID = i;
server_2.createQueue(new QueueConfiguration("test2_" + i).setDurable(true));
server.createQueue(new QueueConfiguration("test1_" + i).setDurable(true));
Wait.assertTrue(() -> server.locateQueue("test2_" + queueID) != null);
Wait.assertTrue(() -> server.locateQueue("test1_" + queueID) != null);
Wait.assertTrue(() -> server_2.locateQueue("test2_" + queueID) != null);
Wait.assertTrue(() -> server_2.locateQueue("test1_" + queueID) != null);
}
server_2.stop();
server.stop();
}
@Test
public void testSingleMessage() throws Exception {
server.setIdentity("Server1");
{
AMQPBrokerConnectConfiguration amqpConnection = new AMQPBrokerConnectConfiguration("connectTowardsServer2", "tcp://localhost:" + AMQP_PORT_2).setReconnectAttempts(3).setRetryInterval(100);
amqpConnection.addElement(new AMQPMirrorBrokerConnectionElement().setDurable(true));
server.getConfiguration().addAMQPConnection(amqpConnection);
}
server.start();
server_2 = createServer(AMQP_PORT_2, false);
server_2.setIdentity("Server2");
{
AMQPBrokerConnectConfiguration amqpConnection = new AMQPBrokerConnectConfiguration("connectTowardsServer1", "tcp://localhost:" + AMQP_PORT).setReconnectAttempts(-1).setRetryInterval(100);
amqpConnection.addElement(new AMQPMirrorBrokerConnectionElement().setDurable(true));
server_2.getConfiguration().addAMQPConnection(amqpConnection);
}
server_2.start();
server_2.addAddressInfo(new AddressInfo(getQueueName()).setAutoCreated(false).addRoutingType(RoutingType.ANYCAST));
server_2.createQueue(new QueueConfiguration(getQueueName()).setDurable(true).setRoutingType(RoutingType.ANYCAST));
Wait.assertTrue(() -> server_2.locateQueue(getQueueName()) != null);
Wait.assertTrue(() -> server.locateQueue(getQueueName()) != null);
ConnectionFactory cf1 = CFUtil.createConnectionFactory("AMQP", "tcp://localhost:" + AMQP_PORT);
Connection connection1 = cf1.createConnection();
Session session1 = connection1.createSession(true, Session.SESSION_TRANSACTED);
connection1.start();
ConnectionFactory cf2 = CFUtil.createConnectionFactory("AMQP", "tcp://localhost:" + AMQP_PORT_2);
Connection connection2 = cf2.createConnection();
Session session2 = connection2.createSession(true, Session.SESSION_TRANSACTED);
connection2.start();
Queue queue = session1.createQueue(getQueueName());
MessageProducer producerServer1 = session1.createProducer(queue);
MessageProducer producerServer2 = session2.createProducer(queue);
TextMessage message = session1.createTextMessage("test");
message.setIntProperty("i", 0);
message.setStringProperty("server", server.getIdentity());
producerServer1.send(message);
session1.commit();
org.apache.activemq.artemis.core.server.Queue queueOnServer1 = server.locateQueue(getQueueName());
org.apache.activemq.artemis.core.server.Queue queueOnServer2 = server_2.locateQueue(getQueueName());
Assert.assertNotNull(queueOnServer1);
Assert.assertNotNull(queueOnServer2);
Wait.assertEquals(1, queueOnServer1::getMessageCount);
Wait.assertEquals(1, queueOnServer2::getMessageCount);
message = session1.createTextMessage("test");
message.setIntProperty("i", 1);
message.setStringProperty("server", server_2.getIdentity());
producerServer2.send(message);
session2.commit();
if (logger.isDebugEnabled() && !Wait.waitFor(() -> queueOnServer1.getMessageCount() == 2)) {
debugData();
}
Wait.assertEquals(2, queueOnServer1::getMessageCount);
Wait.assertEquals(2, queueOnServer2::getMessageCount);
connection1.close();
connection2.close();
server_2.stop();
server.stop();
}
@Test
public void testSyncData() throws Exception {
int NUMBER_OF_MESSAGES = 100;
server.setIdentity("Server1");
{
AMQPBrokerConnectConfiguration amqpConnection = new AMQPBrokerConnectConfiguration("connectTowardsServer2", "tcp://localhost:" + AMQP_PORT_2).setReconnectAttempts(3).setRetryInterval(100);
amqpConnection.addElement(new AMQPMirrorBrokerConnectionElement().setDurable(true));
server.getConfiguration().addAMQPConnection(amqpConnection);
}
server.start();
server_2 = createServer(AMQP_PORT_2, false);
server_2.setIdentity("Server2");
{
AMQPBrokerConnectConfiguration amqpConnection = new AMQPBrokerConnectConfiguration("connectTowardsServer1", "tcp://localhost:" + AMQP_PORT).setReconnectAttempts(-1).setRetryInterval(100);
amqpConnection.addElement(new AMQPMirrorBrokerConnectionElement().setDurable(true));
server_2.getConfiguration().addAMQPConnection(amqpConnection);
}
server_2.start();
server_2.addAddressInfo(new AddressInfo(getQueueName()).setAutoCreated(false).addRoutingType(RoutingType.ANYCAST));
server_2.createQueue(new QueueConfiguration(getQueueName()).setDurable(true).setRoutingType(RoutingType.ANYCAST));
Wait.assertTrue(() -> server_2.locateQueue(getQueueName()) != null);
Wait.assertTrue(() -> server.locateQueue(getQueueName()) != null);
ConnectionFactory cf1 = CFUtil.createConnectionFactory("AMQP", "tcp://localhost:" + AMQP_PORT);
Connection connection1 = cf1.createConnection();
Session session1 = connection1.createSession(true, Session.SESSION_TRANSACTED);
connection1.start();
ConnectionFactory cf2 = CFUtil.createConnectionFactory("AMQP", "tcp://localhost:" + AMQP_PORT_2);
Connection connection2 = cf2.createConnection();
Session session2 = connection2.createSession(true, Session.SESSION_TRANSACTED);
connection2.start();
Queue queue = session1.createQueue(getQueueName());
MessageProducer producerServer1 = session1.createProducer(queue);
MessageProducer producerServer2 = session2.createProducer(queue);
for (int i = 0; i < NUMBER_OF_MESSAGES; i++) {
TextMessage message = session1.createTextMessage("test " + i);
message.setIntProperty("i", i);
message.setStringProperty("server", server.getIdentity());
producerServer1.send(message);
}
session1.commit();
org.apache.activemq.artemis.core.server.Queue queueOnServer1 = server.locateQueue(getQueueName());
org.apache.activemq.artemis.core.server.Queue queueOnServer2 = server_2.locateQueue(getQueueName());
Assert.assertNotNull(queueOnServer1);
Assert.assertNotNull(queueOnServer2);
Wait.assertEquals(NUMBER_OF_MESSAGES, queueOnServer1::getMessageCount);
Wait.assertEquals(NUMBER_OF_MESSAGES, queueOnServer2::getMessageCount);
for (int i = NUMBER_OF_MESSAGES; i < NUMBER_OF_MESSAGES * 2; i++) {
TextMessage message = session1.createTextMessage("test " + i);
message.setIntProperty("i", i);
message.setStringProperty("server", server_2.getIdentity());
producerServer2.send(message);
}
session2.commit();
if (logger.isDebugEnabled() && !Wait.waitFor(() -> queueOnServer1.getMessageCount() == NUMBER_OF_MESSAGES * 2)) {
debugData();
}
Wait.assertEquals(NUMBER_OF_MESSAGES * 2, queueOnServer1::getMessageCount);
Wait.assertEquals(NUMBER_OF_MESSAGES * 2, queueOnServer2::getMessageCount);
MessageConsumer consumerOn1 = session1.createConsumer(queue);
for (int i = 0; i < NUMBER_OF_MESSAGES * 2; i++) {
TextMessage message = (TextMessage) consumerOn1.receive(5000);
logger.debug("### Client acking message(" + i + ") on server 1, a message that was original sent on " + message.getStringProperty("server") + " text = " + message.getText());
Assert.assertNotNull(message);
Assert.assertEquals(i, message.getIntProperty("i"));
Assert.assertEquals("test " + i, message.getText());
session1.commit();
}
boolean bothConsumed = Wait.waitFor(() -> {
long q1 = queueOnServer1.getMessageCount();
long q2 = queueOnServer2.getMessageCount();
logger.debug("Queue on Server 1 = " + q1);
logger.debug("Queue on Server 2 = " + q2);
return q1 == 0 && q2 == 0;
}, 5_000, 1000);
if (logger.isDebugEnabled() && !bothConsumed) {
debugData();
Assert.fail("q1 = " + queueOnServer1.getMessageCount() + ", q2 = " + queueOnServer2.getMessageCount());
}
Assert.assertEquals(0, queueOnServer1.getMessageCount());
Assert.assertEquals(0, queueOnServer2.getConsumerCount());
System.out.println("Queue on Server 1 = " + queueOnServer1.getMessageCount());
System.out.println("Queue on Server 2 = " + queueOnServer2.getMessageCount());
server_2.stop();
server.stop();
}
private void debugData() throws Exception {
StringPrintStream stringPrintStream = new StringPrintStream();
PrintStream out = stringPrintStream.newStream();
org.apache.activemq.artemis.core.server.Queue queueToDebugOn1 = server.locateQueue(getQueueName());
org.apache.activemq.artemis.core.server.Queue queueToDebugOn2 = server_2.locateQueue(getQueueName());
out.println("*******************************************************************************************************************************");
out.println("Queue on Server 1 with count = " + queueToDebugOn1.getMessageCount());
queueToDebugOn1.forEach((r) -> out.println("Server1 has reference " + r.getMessage()));
out.println("*******************************************************************************************************************************");
out.println("Queue on Server 2 with count = " + queueToDebugOn2.getMessageCount());
queueToDebugOn2.forEach((r) -> out.println("Server2 has reference " + r.getMessage()));
out.println("*******************************************************************************************************************************");
out.println("PrintData Server 1");
PrintData.printMessages(server.getConfiguration().getJournalLocation(), out, false, false, true, false);
out.println("*******************************************************************************************************************************");
out.println("PrintData Server 2");
PrintData.printMessages(server_2.getConfiguration().getJournalLocation(), out, false, false, true, false);
logger.debug("Data Available on Servers:\n" + stringPrintStream.toString());
}
@Test
public void testSyncDataNoSuppliedID() throws Exception {
int NUMBER_OF_MESSAGES = 100;
server.setIdentity("Server1");
{
AMQPBrokerConnectConfiguration amqpConnection = new AMQPBrokerConnectConfiguration("connectTowardsServer2", "tcp://localhost:" + AMQP_PORT_2).setReconnectAttempts(3).setRetryInterval(100);
amqpConnection.addElement(new AMQPMirrorBrokerConnectionElement().setDurable(true));
server.getConfiguration().addAMQPConnection(amqpConnection);
}
server.start();
server_2 = createServer(AMQP_PORT_2, false);
server_2.setIdentity("Server2");
{
AMQPBrokerConnectConfiguration amqpConnection = new AMQPBrokerConnectConfiguration("connectTowardsServer1", "tcp://localhost:" + AMQP_PORT).setReconnectAttempts(-1).setRetryInterval(100);
amqpConnection.addElement(new AMQPMirrorBrokerConnectionElement().setDurable(true));
server_2.getConfiguration().addAMQPConnection(amqpConnection);
}
server_2.start();
server_2.addAddressInfo(new AddressInfo(getQueueName()).setAutoCreated(false).addRoutingType(RoutingType.ANYCAST));
server_2.createQueue(new QueueConfiguration(getQueueName()).setDurable(true).setRoutingType(RoutingType.ANYCAST));
Wait.assertTrue(() -> server_2.locateQueue(getQueueName()) != null);
Wait.assertTrue(() -> server.locateQueue(getQueueName()) != null);
AmqpClient cf1 = new AmqpClient(new URI("tcp://localhost:" + AMQP_PORT), null, null);
AmqpConnection connection1 = cf1.createConnection();
connection1.connect();
AmqpSession session1 = connection1.createSession();
AmqpClient cf2 = new AmqpClient(new URI("tcp://localhost:" + AMQP_PORT_2), null, null);
AmqpConnection connection2 = cf2.createConnection();
connection2.connect();
AmqpSession session2 = connection2.createSession();
AmqpSender producerServer1 = session1.createSender(getQueueName());
AmqpSender producerServer2 = session2.createSender(getQueueName());
for (int i = 0; i < NUMBER_OF_MESSAGES; i++) {
AmqpMessage message = new AmqpMessage();
message.setDurable(true);
message.setApplicationProperty("i", i);
producerServer1.send(message);
}
org.apache.activemq.artemis.core.server.Queue queueOnServer1 = server.locateQueue(getQueueName());
org.apache.activemq.artemis.core.server.Queue queueOnServer2 = server_2.locateQueue(getQueueName());
Assert.assertNotNull(queueOnServer1);
Assert.assertNotNull(queueOnServer2);
Wait.assertEquals(NUMBER_OF_MESSAGES, queueOnServer1::getMessageCount);
Wait.assertEquals(NUMBER_OF_MESSAGES, queueOnServer2::getMessageCount);
for (int i = NUMBER_OF_MESSAGES; i < NUMBER_OF_MESSAGES * 2; i++) {
AmqpMessage message = new AmqpMessage();
message.setDurable(true);
message.setApplicationProperty("i", i);
producerServer2.send(message);
}
Wait.assertEquals(NUMBER_OF_MESSAGES * 2, queueOnServer1::getMessageCount);
Wait.assertEquals(NUMBER_OF_MESSAGES * 2, queueOnServer2::getMessageCount);
AmqpReceiver consumerOn1 = session1.createReceiver(getQueueName());
consumerOn1.flow(NUMBER_OF_MESSAGES * 2 + 1);
for (int i = 0; i < NUMBER_OF_MESSAGES * 2; i++) {
AmqpMessage message = consumerOn1.receive(5, TimeUnit.SECONDS);
Assert.assertNotNull(message);
message.accept();
Assert.assertEquals(i, (int) message.getApplicationProperty("i"));
}
Wait.assertEquals(0, queueOnServer1::getMessageCount);
Wait.assertEquals(0, () -> {
System.out.println(queueOnServer2.getMessageCount());
return queueOnServer2.getMessageCount();
});
connection1.close();
connection2.close();
server_2.stop();
server.stop();
}
}