blob: c5c84648c31f8f165d6167a4e4aab027816c35c1 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.tests.smoke.brokerConnection;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.DeliveryMode;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.Session;
import org.apache.activemq.artemis.tests.smoke.common.SmokeTestBase;
import org.apache.activemq.artemis.tests.smoke.common.ContainerService;
import org.jboss.logging.Logger;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
/**
* You need to build the Artemis Centos image before this test is executed.
* Follow the instructions under artemis-docker and build the Docker-centos image.
*/
public class QpidDispatchPeerTest extends SmokeTestBase {
private static final Logger logger = Logger.getLogger(QpidDispatchPeerTest.class);
static Object network;
static Object qpidServer;
static Object artemisServer;
static ContainerService service = ContainerService.getService();
@Before
public void disableThreadcheck() {
disableCheckThread();
}
private static final String QDR_HOME = basedir + "/target/brokerConnect/qdr";
@BeforeClass
public static void startServers() throws Exception {
ValidateContainer.assumeArtemisContainer();
Assert.assertNotNull(basedir);
network = service.newNetwork();
artemisServer = service.newBrokerImage();
service.setNetwork(artemisServer, network);
service.exposePorts(artemisServer, 61616);
service.prepareInstance(QDR_HOME);
service.exposeBrokerHome(artemisServer, QDR_HOME);
service.startLogging(artemisServer, "ArtemisServer:");
qpidServer = service.newInterconnectImage();
service.setNetwork(qpidServer, network);
service.exposePorts(qpidServer, 5672);
service.exposeHosts(qpidServer, "qdr");
service.exposeFile(qpidServer, basedir + "/src/main/resources/servers/brokerConnect/qdr/qdrouterd.conf", "/tmp/qdrouterd.conf");
service.exposeFolder(qpidServer, basedir + "/target/brokerConnect/qdr", "/routerlog");
service.startLogging(qpidServer, "qpid-dispatch:");
service.start(qpidServer);
recreateBrokerDirectory(QDR_HOME);
service.start(artemisServer);
}
@AfterClass
public static void stopServer() {
service.stop(artemisServer);
service.stop(qpidServer);
}
@Test
public void testSendReceive() throws Exception {
int numberOfMessages = 100;
for (int dest = 0; dest < 5; dest++) {
{
ConnectionFactory factoryProducer = service.createCF(qpidServer, "amqp", 5672, "?amqpIdleTimeout=1000");
Connection connection = null;
connection = createConnectionDumbRetry(factoryProducer);
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Queue queue = session.createQueue("queue.test" + dest);
MessageProducer producer = session.createProducer(queue);
producer.setDeliveryMode(DeliveryMode.PERSISTENT);
for (int i = 0; i < numberOfMessages; i++) {
logger.debug("Sending " + i);
producer.send(session.createTextMessage("hello " + i));
}
connection.close();
}
{
ConnectionFactory factoryConsumer = service.createCF(artemisServer, "amqp", 61616, "?amqpIdleTimeout=1000");
Connection connectionConsumer = factoryConsumer.createConnection("artemis", "artemis");
Session sessionConsumer = connectionConsumer.createSession(false, Session.AUTO_ACKNOWLEDGE);
Queue queueConsumer = sessionConsumer.createQueue("queue.test" + dest);
MessageConsumer consumer = sessionConsumer.createConsumer(queueConsumer);
connectionConsumer.start();
for (int i = 0; i < numberOfMessages; i++) {
Message message = consumer.receive(5000);
Assert.assertNotNull(message);
}
connectionConsumer.close();
}
}
}
@Test
public void testSendReceiveDistinct() throws Exception {
int numberOfMessages = 100;
{
ConnectionFactory factoryProducer = service.createCF(qpidServer, "amqp", 5672, "?amqpIdleTimeout=1000");
Connection connection = null;
connection = createConnectionDumbRetry(factoryProducer);
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Queue queue = session.createQueue("queue.dist");
MessageProducer producer = session.createProducer(queue);
producer.setDeliveryMode(DeliveryMode.PERSISTENT);
for (int i = 0; i < numberOfMessages; i++) {
logger.debug("Sending " + i);
producer.send(session.createTextMessage("hello " + i));
}
connection.close();
}
{
ConnectionFactory factoryConsumer = service.createCF(artemisServer, "amqp", 61616, "?amqpIdleTimeout=1000");
Connection connectionConsumer = factoryConsumer.createConnection("artemis", "artemis");
Session sessionConsumer = connectionConsumer.createSession(false, Session.AUTO_ACKNOWLEDGE);
Queue queueConsumer = sessionConsumer.createQueue("queue.dist::distinct.dist");
MessageConsumer consumer = sessionConsumer.createConsumer(queueConsumer);
connectionConsumer.start();
for (int i = 0; i < numberOfMessages; i++) {
Message message = consumer.receive(5000);
Assert.assertNotNull(message);
}
connectionConsumer.close();
}
}
private Connection createConnectionDumbRetry(ConnectionFactory factoryProducer) throws InterruptedException {
for (int i = 0; i < 100; i++) {
try {
// Some retry
return factoryProducer.createConnection();
} catch (Exception e) {
Thread.sleep(10);
}
}
return null;
}
}