blob: 196bbc4767331329943a520303bd50690f8c859d [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.tests.soak.brokerConnection.mirror;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.jms.Topic;
import java.io.File;
import java.io.StringWriter;
import java.lang.invoke.MethodHandles;
import java.util.HashMap;
import java.util.Properties;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.activemq.artemis.api.core.management.SimpleManagement;
import org.apache.activemq.artemis.core.config.amqpBrokerConnectivity.AMQPBrokerConnectionAddressType;
import org.apache.activemq.artemis.core.persistence.impl.journal.JournalRecordIds;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.tests.soak.SoakTestBase;
import org.apache.activemq.artemis.tests.util.CFUtil;
import org.apache.activemq.artemis.util.ServerUtil;
import org.apache.activemq.artemis.utils.FileUtil;
import org.apache.activemq.artemis.utils.TestParameters;
import org.apache.activemq.artemis.utils.Wait;
import org.apache.activemq.artemis.utils.actors.OrderedExecutor;
import org.apache.activemq.artemis.utils.cli.helper.HelperCreate;
import org.junit.After;
import org.junit.Assert;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class SingleMirrorSoakTest extends SoakTestBase {
private static final String TEST_NAME = "SINGLE_MIRROR_SOAK";
private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
// Set this to true and log4j will be configured with some relevant log.trace for the AckManager at the server's
private static final boolean TRACE_LOGS = Boolean.parseBoolean(TestParameters.testProperty(TEST_NAME, "TRACE_LOGS", "false"));
private static final int NUMBER_MESSAGES = TestParameters.testProperty(TEST_NAME, "NUMBER_MESSAGES", 2_500);
private static final int RECEIVE_COMMIT = TestParameters.testProperty(TEST_NAME, "RECEIVE_COMMIT", 100);
private static final int SEND_COMMIT = TestParameters.testProperty(TEST_NAME, "SEND_COMMIT", 100);
private static final int KILL_INTERNAL = TestParameters.testProperty(TEST_NAME, "KILL_INTERVAL", 500);
private static final int SNF_TIMEOUT = TestParameters.testProperty(TEST_NAME, "SNF_TIMEOUT", 60_000);
private static final int GENERAL_WAIT_TIMEOUT = TestParameters.testProperty(TEST_NAME, "GENERAL_TIMEOUT", 10_000);
/*
* Time each consumer takes to process a message received to allow some messages accumulating.
* This sleep happens right before the commit.
*/
private static final int CONSUMER_PROCESSING_TIME = TestParameters.testProperty(TEST_NAME, "CONSUMER_PROCESSING_TIME", 0);
private static final String TOPIC_NAME = "topicTest";
private static String body;
static {
StringWriter writer = new StringWriter();
while (writer.getBuffer().length() < 30 * 1024) {
writer.append("The sky is blue, ..... watch out for poop from the birds though!...");
}
body = writer.toString();
}
public static final String DC1_NODE = "SingleMirrorSoakTest/DC1";
public static final String DC2_NODE = "SingleMirrorSoakTest/DC2";
volatile Process processDC1;
volatile Process processDC2;
@After
public void destroyServers() throws Exception {
if (processDC1 != null) {
processDC1.destroyForcibly();
processDC1.waitFor(1, TimeUnit.MINUTES);
processDC1 = null;
}
if (processDC2 != null) {
processDC2.destroyForcibly();
processDC2.waitFor(1, TimeUnit.MINUTES);
processDC2 = null;
}
}
private static final String DC1_URI = "tcp://localhost:61616";
private static final String DC2_URI = "tcp://localhost:61618";
private static void createServer(String serverName,
String connectionName,
String mirrorURI,
int porOffset,
boolean paging) throws Exception {
File serverLocation = getFileServerLocation(serverName);
deleteDirectory(serverLocation);
HelperCreate cliCreateServer = new HelperCreate();
cliCreateServer.setAllowAnonymous(true).setArtemisInstance(serverLocation);
cliCreateServer.setMessageLoadBalancing("ON_DEMAND");
cliCreateServer.setClustered(false);
cliCreateServer.setNoWeb(false);
cliCreateServer.setArgs("--no-stomp-acceptor", "--no-hornetq-acceptor", "--no-mqtt-acceptor", "--no-amqp-acceptor", "--max-hops", "1", "--name", DC1_NODE);
cliCreateServer.addArgs("--addresses", TOPIC_NAME);
cliCreateServer.setPortOffset(porOffset);
cliCreateServer.createServer();
Properties brokerProperties = new Properties();
brokerProperties.put("AMQPConnections." + connectionName + ".uri", mirrorURI);
brokerProperties.put("AMQPConnections." + connectionName + ".retryInterval", "1000");
brokerProperties.put("AMQPConnections." + connectionName + ".type", AMQPBrokerConnectionAddressType.MIRROR.toString());
brokerProperties.put("AMQPConnections." + connectionName + ".connectionElements.mirror.sync", "false");
brokerProperties.put("largeMessageSync", "false");
brokerProperties.put("mirrorAckManagerPageAttempts", "10");
brokerProperties.put("mirrorAckManagerRetryDelay", "1000");
if (paging) {
brokerProperties.put("addressSettings.#.maxSizeMessages", "1");
brokerProperties.put("addressSettings.#.maxReadPageMessages", "1000");
brokerProperties.put("addressSettings.#.maxReadPageBytes", "-1");
brokerProperties.put("addressSettings.#.prefetchPageMessages", "100");
// un-comment this line if you want to rather use the work around without the fix on the PostOfficeImpl
// brokerProperties.put("addressSettings.#.iDCacheSize", "1000");
}
// if we don't use pageTransactions we may eventually get a few duplicates
brokerProperties.put("mirrorPageTransaction", "true");
File brokerPropertiesFile = new File(serverLocation, "broker.properties");
saveProperties(brokerProperties, brokerPropertiesFile);
File brokerXml = new File(serverLocation, "/etc/broker.xml");
Assert.assertTrue(brokerXml.exists());
// Adding redistribution delay to broker configuration
Assert.assertTrue(FileUtil.findReplace(brokerXml, "<address-setting match=\"#\">", "<address-setting match=\"#\">\n\n" + " <redistribution-delay>0</redistribution-delay> <!-- added by SimpleMirrorSoakTest.java --> \n"));
if (TRACE_LOGS) {
File log4j = new File(serverLocation, "/etc/log4j2.properties");
Assert.assertTrue(FileUtil.findReplace(log4j, "logger.artemis_utils.level=INFO", "logger.artemis_utils.level=INFO\n" +
"\n" + "logger.ack.name=org.apache.activemq.artemis.protocol.amqp.connect.mirror.AckManager\n"
+ "logger.ack.level=TRACE\n"
+ "logger.config.name=org.apache.activemq.artemis.core.config.impl.ConfigurationImpl\n"
+ "logger.config.level=TRACE\n"
+ "appender.console.filter.threshold.type = ThresholdFilter\n"
+ "appender.console.filter.threshold.level = info"));
}
}
public static void createRealServers(boolean paging) throws Exception {
createServer(DC1_NODE, "mirror", DC2_URI, 0, paging);
createServer(DC2_NODE, "mirror", DC1_URI, 2, paging);
}
private void startServers() throws Exception {
processDC1 = startServer(DC1_NODE, -1, -1, new File(getServerLocation(DC1_NODE), "broker.properties"));
processDC2 = startServer(DC2_NODE, -1, -1, new File(getServerLocation(DC2_NODE), "broker.properties"));
ServerUtil.waitForServerToStart(0, 10_000);
ServerUtil.waitForServerToStart(2, 10_000);
}
@Test
public void testInterruptedMirrorTransfer() throws Exception {
createRealServers(true);
startServers();
Assert.assertTrue(KILL_INTERNAL > SEND_COMMIT);
String clientIDA = "nodeA";
String clientIDB = "nodeB";
String subscriptionID = "my-order";
String snfQueue = "$ACTIVEMQ_ARTEMIS_MIRROR_mirror";
ConnectionFactory connectionFactoryDC1A = CFUtil.createConnectionFactory("amqp", DC1_URI);
consume(connectionFactoryDC1A, clientIDA, subscriptionID, 0, 0, false, false, RECEIVE_COMMIT);
consume(connectionFactoryDC1A, clientIDB, subscriptionID, 0, 0, false, false, RECEIVE_COMMIT);
SimpleManagement managementDC1 = new SimpleManagement(DC1_URI, null, null);
SimpleManagement managementDC2 = new SimpleManagement(DC2_URI, null, null);
runAfter(() -> managementDC1.close());
runAfter(() -> managementDC2.close());
Wait.assertEquals(0, () -> getCount(managementDC1, clientIDA + "." + subscriptionID));
Wait.assertEquals(0, () -> getCount(managementDC2, clientIDA + "." + subscriptionID));
Wait.assertEquals(0, () -> getCount(managementDC1, clientIDB + "." + subscriptionID));
Wait.assertEquals(0, () -> getCount(managementDC2, clientIDB + "." + subscriptionID));
ExecutorService executorService = Executors.newFixedThreadPool(3);
runAfter(executorService::shutdownNow);
executorService.execute(() -> {
try {
consume(connectionFactoryDC1A, clientIDA, subscriptionID, 0, NUMBER_MESSAGES, true, false, RECEIVE_COMMIT);
} catch (Exception e) {
logger.warn(e.getMessage(), e);
}
});
executorService.execute(() -> {
try {
consume(connectionFactoryDC1A, clientIDB, subscriptionID, 0, NUMBER_MESSAGES, true, false, RECEIVE_COMMIT);
} catch (Exception e) {
logger.warn(e.getMessage(), e);
}
});
OrderedExecutor restartExeuctor = new OrderedExecutor(executorService);
AtomicBoolean running = new AtomicBoolean(true);
runAfter(() -> running.set(false));
try (Connection connection = connectionFactoryDC1A.createConnection()) {
Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
MessageProducer producer = session.createProducer(session.createTopic(TOPIC_NAME));
for (int i = 0; i < NUMBER_MESSAGES; i++) {
TextMessage message = session.createTextMessage(body);
message.setIntProperty("i", i);
message.setBooleanProperty("large", false);
producer.send(message);
if (i > 0 && i % SEND_COMMIT == 0) {
logger.info("Sent {} messages", i);
session.commit();
}
if (i > 0 && i % KILL_INTERNAL == 0) {
restartExeuctor.execute(() -> {
if (running.get()) {
try {
logger.info("Restarting target server (DC2)");
if (processDC2 != null) {
processDC2.destroyForcibly();
processDC2.waitFor(1, TimeUnit.MINUTES);
processDC2 = null;
}
processDC2 = startServer(DC2_NODE, 2, 10_000, new File(getServerLocation(DC2_NODE), "broker.properties"));
} catch (Exception e) {
logger.warn(e.getMessage(), e);
}
}
});
}
}
session.commit();
running.set(false);
}
Wait.assertEquals(0, () -> getCount(managementDC1, snfQueue), SNF_TIMEOUT);
Wait.assertEquals(0, () -> getCount(managementDC2, snfQueue), SNF_TIMEOUT);
Wait.assertEquals(0, () -> getCount(managementDC1, clientIDA + "." + subscriptionID), GENERAL_WAIT_TIMEOUT);
Wait.assertEquals(0, () -> getCount(managementDC1, clientIDB + "." + subscriptionID), GENERAL_WAIT_TIMEOUT);
Wait.assertEquals(0, () -> getCount(managementDC2, clientIDA + "." + subscriptionID), GENERAL_WAIT_TIMEOUT);
Wait.assertEquals(0, () -> getCount(managementDC2, clientIDB + "." + subscriptionID), GENERAL_WAIT_TIMEOUT);
destroyServers();
// counting the number of records on duplicate cache
// to validate if ARTEMIS-4765 is fixed
ActiveMQServer server = createServer(true, false);
server.getConfiguration().setJournalDirectory(getServerLocation(DC2_NODE) + "/data/journal");
server.getConfiguration().setBindingsDirectory(getServerLocation(DC2_NODE) + "/data/bindings");
server.getConfiguration().setPagingDirectory(getServerLocation(DC2_NODE) + "/data/paging");
server.start();
server.getStorageManager().getMessageJournal().scheduleCompactAndBlock(10_000);
HashMap<Integer, AtomicInteger> records = countJournal(server.getConfiguration());
AtomicInteger duplicateRecordsCount = records.get((int) JournalRecordIds.DUPLICATE_ID);
Assert.assertNotNull(duplicateRecordsCount);
// 1000 credits by default
Assert.assertTrue(duplicateRecordsCount.get() <= 1000);
}
private static void consume(ConnectionFactory factory,
String clientID,
String subscriptionID,
int start,
int numberOfMessages,
boolean expectEmpty,
boolean assertBody,
int batchCommit) throws Exception {
try (Connection connection = factory.createConnection()) {
connection.setClientID(clientID);
Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
Topic topic = session.createTopic(TOPIC_NAME);
connection.start();
MessageConsumer consumer = session.createDurableConsumer(topic, subscriptionID);
boolean failed = false;
int pendingCommit = 0;
for (int i = start; i < start + numberOfMessages; i++) {
TextMessage message = (TextMessage) consumer.receive(10_000);
Assert.assertNotNull(message);
logger.debug("Received message {}, large={}", message.getIntProperty("i"), message.getBooleanProperty("large"));
if (message.getIntProperty("i") != i) {
failed = true;
logger.warn("Expected message {} but got {}", i, message.getIntProperty("i"));
}
logger.debug("Consumed {}, large={}", i, message.getBooleanProperty("large"));
pendingCommit++;
if (pendingCommit >= batchCommit) {
if (CONSUMER_PROCESSING_TIME > 0) {
Thread.sleep(CONSUMER_PROCESSING_TIME);
}
logger.info("received {}", i);
session.commit();
pendingCommit = 0;
}
}
session.commit();
Assert.assertFalse(failed);
if (expectEmpty) {
Assert.assertNull(consumer.receiveNoWait());
}
}
}
public long getCount(SimpleManagement simpleManagement, String queue) throws Exception {
try {
long value = simpleManagement.getMessageCountOnQueue(queue);
return value;
} catch (Exception e) {
logger.warn(e.getMessage(), e);
return -1;
}
}
}