blob: cf02d9214b63709a020cc0e19e22a08819b09a37 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.tests.soak.retention;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.Session;
import javax.jms.TextMessage;
import java.io.File;
import java.lang.invoke.MethodHandles;
import java.util.HashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration;
import org.apache.activemq.artemis.api.core.management.ActiveMQServerControl;
import org.apache.activemq.artemis.api.core.management.ObjectNameBuilder;
import org.apache.activemq.artemis.api.core.management.SimpleManagement;
import org.apache.activemq.artemis.tests.soak.SoakTestBase;
import org.apache.activemq.artemis.tests.util.CFUtil;
import org.apache.activemq.artemis.utils.RandomUtil;
import org.apache.activemq.artemis.utils.Wait;
import org.apache.activemq.artemis.utils.cli.helper.HelperCreate;
import org.junit.Assert;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
// The server used by this test has the journal retention configured.
// The server should not enter into a deadlock state just because retention is being used.
// The focus of this test is to make sure all messages are sent and received normally
public class LargeMessageRetentionTest extends SoakTestBase {
private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
private static final String JMX_SERVER_HOSTNAME = "localhost";
private static final int JMX_SERVER_PORT_0 = 1099;
static String liveURI = "service:jmx:rmi:///jndi/rmi://" + JMX_SERVER_HOSTNAME + ":" + JMX_SERVER_PORT_0 + "/jmxrmi";
static ObjectNameBuilder nameBuilder = ObjectNameBuilder.create(ActiveMQDefaultConfiguration.getDefaultJmxDomain(), "replay", true);
public static final String SERVER_NAME_0 = "replay/large-message";
@BeforeClass
public static void createServers() throws Exception {
{
File serverLocation = getFileServerLocation(SERVER_NAME_0);
deleteDirectory(serverLocation);
HelperCreate cliCreateServer = new HelperCreate();
cliCreateServer.setRole("amq").setUser("artemis").setPassword("artemis").setAllowAnonymous(true).setNoWeb(false);
cliCreateServer.setAllowAnonymous(true).setNoWeb(true).setArtemisInstance(serverLocation);
cliCreateServer.setConfiguration("./src/main/resources/servers/replay/large-message");
cliCreateServer.setArgs("--java-options", "-Djava.rmi.server.hostname=localhost", "--journal-retention", "1", "--queues", "RetentionTest", "--name", "large-message");
cliCreateServer.createServer();
}
}
@Before
public void before() throws Exception {
cleanupData(SERVER_NAME_0);
startServer(SERVER_NAME_0, 0, 30000);
disableCheckThread();
}
@Test
public void testRetentionOpenWire() throws Throwable {
testRetention("OPENWIRE", 100, 10, 200 * 1024, 10);
}
@Test
public void testRetentionAMQP() throws Throwable {
testRetention("AMQP", 100, 10, 50 * 1024, 10);
}
@Test
public void testRetentionAMQPRealLarge() throws Throwable {
testRetention("AMQP", 100, 10, 300 * 1024, 10);
}
// in this case messages are not really > min-large-message-size, but they will be converted because of the journal small buffer size
@Test
public void testRetentionCore() throws Throwable {
testRetention("CORE", 100, 10, 50 * 1024, 10);
}
// in this case the messages are actually large
@Test
public void testRetentionCoreRealLarge() throws Throwable {
testRetention("CORE", 100, 10, 300 * 1024, 10);
}
private void testRetention(String protocol, int NUMBER_OF_MESSAGES, int backlog, int bodySize, int producers) throws Throwable {
Assert.assertTrue(NUMBER_OF_MESSAGES % producers == 0); // checking that it is a multiple
ActiveMQServerControl serverControl = getServerControl(liveURI, nameBuilder, 5000);
final Semaphore consumerCredits = new Semaphore(-backlog);
final String queueName = "RetentionTest";
final AtomicInteger errors = new AtomicInteger(0);
final CountDownLatch latchReceiver = new CountDownLatch(1);
final CountDownLatch latchSender = new CountDownLatch(producers);
String bufferStr;
{
StringBuffer buffer = new StringBuffer();
for (int i = 0; i < bodySize; i++) {
buffer.append("*");
}
bufferStr = RandomUtil.randomString() + buffer;
}
ConnectionFactory factory = CFUtil.createConnectionFactory(protocol, "tcp://localhost:61616");
ExecutorService executor = Executors.newFixedThreadPool(1 * producers);
runAfter(executor::shutdownNow);
executor.execute(() -> {
try (Connection consumerConnection = factory.createConnection()) {
HashMap<Integer, AtomicInteger> messageSequences = new HashMap<>();
Session consumerSession = consumerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Queue queue = consumerSession.createQueue(queueName);
consumerConnection.start();
MessageConsumer consumer = consumerSession.createConsumer(queue);
for (int i = 0; i < NUMBER_OF_MESSAGES; i++) {
logger.debug("Acquiring semop at {}", i);
Assert.assertTrue(consumerCredits.tryAcquire(1, TimeUnit.MINUTES));
TextMessage message = (TextMessage) consumer.receive(60_000);
Assert.assertNotNull(message);
int producerI = message.getIntProperty("producerI");
AtomicInteger messageSequence = messageSequences.get(producerI);
if (messageSequence == null) {
messageSequence = new AtomicInteger(0);
messageSequences.put(producerI, messageSequence);
}
Assert.assertEquals(messageSequence.getAndIncrement(), message.getIntProperty("messageI"));
logger.info("Received message {}", i);
Assert.assertEquals(bufferStr, message.getText());
}
} catch (Throwable e) {
logger.warn(e.getMessage(), e);
errors.incrementAndGet();
} finally {
latchReceiver.countDown();
}
});
for (int producerID = 0; producerID < producers; producerID++) {
int theProducerID = producerID; // to be used within the executor's inner method
executor.execute(() -> {
try (Connection producerConnection = factory.createConnection()) {
Session producerSession = producerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Queue queue = producerSession.createQueue(queueName);
MessageProducer producer = producerSession.createProducer(queue);
for (int messageI = 0; messageI < NUMBER_OF_MESSAGES / producers; messageI++) {
logger.info("Sending message {} from producerID", messageI, theProducerID);
Message message = producerSession.createTextMessage(bufferStr);
message.setIntProperty("messageI", messageI);
message.setIntProperty("producerI", theProducerID);
producer.send(message);
consumerCredits.release();
}
} catch (Throwable e) {
logger.warn(e.getMessage(), e);
errors.incrementAndGet();
} finally {
latchSender.countDown();
}
});
}
Assert.assertTrue(latchSender.await(10, TimeUnit.MINUTES));
consumerCredits.release(backlog);
Assert.assertTrue(latchReceiver.await(10, TimeUnit.MINUTES));
Assert.assertEquals(0, errors.get());
try (Connection consumerConnection = factory.createConnection()) {
HashMap<Integer, AtomicInteger> messageSequences = new HashMap<>();
Session consumerSession = consumerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Queue queue = consumerSession.createQueue(queueName);
consumerConnection.start();
MessageConsumer consumer = consumerSession.createConsumer(queue);
Assert.assertNull(consumer.receiveNoWait());
serverControl.replay(queueName, queueName, "producerI=0 AND messageI>=0 AND messageI<10");
SimpleManagement simpleManagement = new SimpleManagement("tcp://localhost:61616", null, null);
Wait.assertEquals(10, () -> simpleManagement.getMessageCountOnQueue(queueName), 5000);
for (int i = 0; i < 10; i++) {
TextMessage message = (TextMessage) consumer.receive(300_000);
Assert.assertNotNull(message);
logger.info("Received replay message {}", i);
Assert.assertEquals(0, message.getIntProperty("producerI"));
Assert.assertEquals(i, message.getIntProperty("messageI"));
Assert.assertEquals(bufferStr, message.getText());
}
Assert.assertNull(consumer.receiveNoWait());
}
}
}