blob: 50b7e2e23bd933038deedd57b75406603c3f6362 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.transport.amqp;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import java.util.ArrayList;
import java.util.Enumeration;
import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import javax.jms.Connection;
import javax.jms.DeliveryMode;
import javax.jms.Destination;
import javax.jms.ExceptionListener;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.QueueBrowser;
import javax.jms.Session;
import javax.jms.TemporaryQueue;
import javax.jms.TemporaryTopic;
import javax.jms.TextMessage;
import javax.jms.Topic;
import javax.jms.TopicConnection;
import javax.jms.TopicSession;
import javax.jms.TopicSubscriber;
import org.apache.activemq.broker.jmx.BrokerView;
import org.apache.activemq.broker.jmx.BrokerViewMBean;
import org.apache.activemq.broker.jmx.ConnectorViewMBean;
import org.apache.activemq.broker.jmx.QueueViewMBean;
import org.apache.activemq.transport.amqp.joram.ActiveMQAdmin;
import org.apache.activemq.util.Wait;
import org.junit.Ignore;
import org.junit.Test;
import org.objectweb.jtests.jms.framework.TestConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class JMSClientTest extends JMSClientTestSupport {
protected static final Logger LOG = LoggerFactory.getLogger(JMSClientTest.class);
protected java.util.logging.Logger frameLoggger = java.util.logging.Logger.getLogger("FRM");
@SuppressWarnings("rawtypes")
@Test(timeout=30000)
public void testProducerConsume() throws Exception {
ActiveMQAdmin.enableJMSFrameTracing();
connection = createConnection();
{
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Queue queue = session.createQueue(getDestinationName());
MessageProducer p = session.createProducer(queue);
TextMessage message = session.createTextMessage();
message.setText("hello");
p.send(message);
QueueBrowser browser = session.createBrowser(queue);
Enumeration enumeration = browser.getEnumeration();
while (enumeration.hasMoreElements()) {
Message m = (Message) enumeration.nextElement();
assertTrue(m instanceof TextMessage);
}
MessageConsumer consumer = session.createConsumer(queue);
Message msg = consumer.receive(TestConfig.TIMEOUT);
assertNotNull(msg);
assertTrue(msg instanceof TextMessage);
}
}
@Test(timeout=30000)
public void testAnonymousProducerConsume() throws Exception {
ActiveMQAdmin.enableJMSFrameTracing();
connection = createConnection();
{
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Queue queue1 = session.createQueue(getDestinationName() + "1");
Queue queue2 = session.createQueue(getDestinationName() + "2");
MessageProducer p = session.createProducer(null);
TextMessage message = session.createTextMessage();
message.setText("hello");
p.send(queue1, message);
p.send(queue2, message);
{
MessageConsumer consumer = session.createConsumer(queue1);
Message msg = consumer.receive(TestConfig.TIMEOUT);
assertNotNull(msg);
assertTrue(msg instanceof TextMessage);
consumer.close();
}
{
MessageConsumer consumer = session.createConsumer(queue2);
Message msg = consumer.receive(TestConfig.TIMEOUT);
assertNotNull(msg);
assertTrue(msg instanceof TextMessage);
consumer.close();
}
}
}
@Test(timeout=30*1000)
public void testTransactedConsumer() throws Exception {
ActiveMQAdmin.enableJMSFrameTracing();
final int msgCount = 1;
connection = createConnection();
Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
Queue queue = session.createQueue(getDestinationName());
sendMessages(connection, queue, msgCount);
QueueViewMBean queueView = getProxyToQueue(getDestinationName());
LOG.info("Queue size after produce is: {}", queueView.getQueueSize());
assertEquals(msgCount, queueView.getQueueSize());
MessageConsumer consumer = session.createConsumer(queue);
Message msg = consumer.receive(TestConfig.TIMEOUT);
assertNotNull(msg);
assertTrue(msg instanceof TextMessage);
LOG.info("Queue size before session commit is: {}", queueView.getQueueSize());
assertEquals(msgCount, queueView.getQueueSize());
session.commit();
LOG.info("Queue size after session commit is: {}", queueView.getQueueSize());
assertEquals(0, queueView.getQueueSize());
}
@Test(timeout=30000)
public void testRollbackRececeivedMessage() throws Exception {
ActiveMQAdmin.enableJMSFrameTracing();
final int msgCount = 1;
connection = createConnection();
Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
Queue queue = session.createQueue(getDestinationName());
sendMessages(connection, queue, msgCount);
QueueViewMBean queueView = getProxyToQueue(getDestinationName());
LOG.info("Queue size after produce is: {}", queueView.getQueueSize());
assertEquals(msgCount, queueView.getQueueSize());
MessageConsumer consumer = session.createConsumer(queue);
// Receive and roll back, first receive should not show redelivered.
Message msg = consumer.receive(TestConfig.TIMEOUT);
LOG.info("Test received msg: {}", msg);
assertNotNull(msg);
assertTrue(msg instanceof TextMessage);
assertEquals(false, msg.getJMSRedelivered());
session.rollback();
// Receive and roll back, first receive should not show redelivered.
msg = consumer.receive(TestConfig.TIMEOUT);
assertNotNull(msg);
assertTrue(msg instanceof TextMessage);
assertEquals(true, msg.getJMSRedelivered());
LOG.info("Queue size after produce is: {}", queueView.getQueueSize());
assertEquals(msgCount, queueView.getQueueSize());
session.commit();
LOG.info("Queue size after produce is: {}", queueView.getQueueSize());
assertEquals(0, queueView.getQueueSize());
session.close();
}
@Test(timeout = 60000)
public void testRollbackSomeThenReceiveAndCommit() throws Exception {
int totalCount = 5;
int consumeBeforeRollback = 2;
connection = createConnection();
Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
Queue queue = session.createQueue(getDestinationName());
sendMessages(connection, queue, totalCount);
QueueViewMBean proxy = getProxyToQueue(name.getMethodName());
assertEquals(totalCount, proxy.getQueueSize());
MessageConsumer consumer = session.createConsumer(queue);
for (int i = 1; i <= consumeBeforeRollback; i++) {
Message message = consumer.receive(1000);
assertNotNull(message);
assertEquals("Unexpected message number", i, message.getIntProperty(AmqpTestSupport.MESSAGE_NUMBER));
}
session.rollback();
assertEquals(totalCount, proxy.getQueueSize());
// Consume again..check we receive all the messages.
Set<Integer> messageNumbers = new HashSet<Integer>();
for (int i = 1; i <= totalCount; i++) {
messageNumbers.add(i);
}
for (int i = 1; i <= totalCount; i++) {
Message message = consumer.receive(1000);
assertNotNull(message);
int msgNum = message.getIntProperty(AmqpTestSupport.MESSAGE_NUMBER);
messageNumbers.remove(msgNum);
}
session.commit();
assertTrue("Did not consume all expected messages, missing messages: " + messageNumbers, messageNumbers.isEmpty());
assertEquals("Queue should have no messages left after commit", 0, proxy.getQueueSize());
}
@Test(timeout=60000)
public void testTXConsumerAndLargeNumberOfMessages() throws Exception {
ActiveMQAdmin.enableJMSFrameTracing();
final int msgCount = 300;
connection = createConnection();
Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
Queue queue = session.createQueue(getDestinationName());
sendMessages(connection, queue, msgCount);
QueueViewMBean queueView = getProxyToQueue(getDestinationName());
LOG.info("Queue size after produce is: {}", queueView.getQueueSize());
assertEquals(msgCount, queueView.getQueueSize());
// Consumer all in TX and commit.
{
MessageConsumer consumer = session.createConsumer(queue);
for (int i = 0; i < msgCount; ++i) {
if ((i % 100) == 0) {
LOG.info("Attempting receive of Message #{}", i);
}
Message msg = consumer.receive(TestConfig.TIMEOUT);
assertNotNull("Should receive message: " + i, msg);
assertTrue(msg instanceof TextMessage);
}
session.commit();
consumer.close();
session.close();
}
LOG.info("Queue size after produce is: {}", queueView.getQueueSize());
assertEquals(0, queueView.getQueueSize());
}
@SuppressWarnings("rawtypes")
@Test(timeout=30000)
public void testSelectors() throws Exception{
ActiveMQAdmin.enableJMSFrameTracing();
connection = createConnection();
{
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Queue queue = session.createQueue(getDestinationName());
MessageProducer p = session.createProducer(queue);
TextMessage message = session.createTextMessage();
message.setText("hello");
p.send(message, DeliveryMode.PERSISTENT, 5, 0);
message = session.createTextMessage();
message.setText("hello + 9");
p.send(message, DeliveryMode.PERSISTENT, 9, 0);
QueueBrowser browser = session.createBrowser(queue);
Enumeration enumeration = browser.getEnumeration();
int count = 0;
while (enumeration.hasMoreElements()) {
Message m = (Message) enumeration.nextElement();
assertTrue(m instanceof TextMessage);
count ++;
}
assertEquals(2, count);
MessageConsumer consumer = session.createConsumer(queue, "JMSPriority > 8");
Message msg = consumer.receive(TestConfig.TIMEOUT);
assertNotNull(msg);
assertTrue(msg instanceof TextMessage);
assertEquals("hello + 9", ((TextMessage) msg).getText());
}
}
@SuppressWarnings("rawtypes")
@Test(timeout=30000)
public void testSelectorsWithJMSType() throws Exception{
ActiveMQAdmin.enableJMSFrameTracing();
connection = createConnection();
{
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Queue queue = session.createQueue(getDestinationName());
MessageProducer p = session.createProducer(queue);
TextMessage message = session.createTextMessage();
message.setText("text");
p.send(message, DeliveryMode.NON_PERSISTENT, Message.DEFAULT_PRIORITY, Message.DEFAULT_TIME_TO_LIVE);
TextMessage message2 = session.createTextMessage();
String type = "myJMSType";
message2.setJMSType(type);
message2.setText("text + type");
p.send(message2, DeliveryMode.NON_PERSISTENT, Message.DEFAULT_PRIORITY, Message.DEFAULT_TIME_TO_LIVE);
QueueBrowser browser = session.createBrowser(queue);
Enumeration enumeration = browser.getEnumeration();
int count = 0;
while (enumeration.hasMoreElements()) {
Message m = (Message) enumeration.nextElement();
assertTrue(m instanceof TextMessage);
count ++;
}
assertEquals(2, count);
MessageConsumer consumer = session.createConsumer(queue, "JMSType = '"+ type +"'");
Message msg = consumer.receive(TestConfig.TIMEOUT);
assertNotNull(msg);
assertTrue(msg instanceof TextMessage);
assertEquals("Unexpected JMSType value", type, msg.getJMSType());
assertEquals("Unexpected message content", "text + type", ((TextMessage) msg).getText());
}
}
abstract class Testable implements Runnable {
protected String msg;
synchronized boolean passed() {
if (msg != null) {
fail(msg);
}
return true;
}
}
@Test(timeout=30000)
public void testProducerThrowsWhenBrokerStops() throws Exception {
connection = createConnection();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Queue queue = session.createQueue(getDestinationName());
connection.start();
final MessageProducer producer = session.createProducer(queue);
producer.setDeliveryMode(DeliveryMode.PERSISTENT);
final Message m = session.createTextMessage("Sample text");
Testable t = new Testable() {
@Override
public synchronized void run() {
try {
for (int i = 0; i < 30; ++i) {
producer.send(m);
synchronized (producer) {
producer.notifyAll();
}
TimeUnit.MILLISECONDS.sleep(100);
}
msg = "Should have thrown an IllegalStateException";
} catch (Exception ex) {
LOG.info("Caught exception on send: {}", ex);
}
}
};
synchronized(producer) {
new Thread(t).start();
// wait until we know that the producer was able to send a message
producer.wait(10000);
}
stopBroker();
assertTrue(t.passed());
}
@Test(timeout=30000)
public void testProducerCreateThrowsWhenBrokerStops() throws Exception {
connection = createConnection();
final Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
final Queue queue = session.createQueue(getDestinationName());
connection.start();
Testable t = new Testable() {
@Override
public synchronized void run() {
try {
for (int i = 0; i < 10; ++i) {
MessageProducer producer = session.createProducer(queue);
synchronized (session) {
session.notifyAll();
}
if (producer == null) {
msg = "Producer should not be null";
}
TimeUnit.SECONDS.sleep(1);
}
msg = "Should have thrown an IllegalStateException";
} catch (Exception ex) {
LOG.info("Caught exception on create producer: {}", ex);
}
}
};
synchronized (session) {
new Thread(t).start();
session.wait(10000);
}
stopBroker();
assertTrue(t.passed());
}
@Test(timeout=30000)
public void testConsumerCreateThrowsWhenBrokerStops() throws Exception {
connection = createConnection();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Queue queue = session.createQueue(getDestinationName());
connection.start();
MessageProducer producer = session.createProducer(queue);
producer.setDeliveryMode(DeliveryMode.PERSISTENT);
Message m = session.createTextMessage("Sample text");
producer.send(m);
stopBroker();
try {
session.createConsumer(queue);
fail("Should have thrown an IllegalStateException");
} catch (Exception ex) {
LOG.info("Caught exception on consumer create: {}", ex);
}
}
@Test(timeout=30000)
public void testConsumerReceiveNoWaitThrowsWhenBrokerStops() throws Exception {
connection = createConnection();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Queue queue = session.createQueue(getDestinationName());
connection.start();
final MessageConsumer consumer=session.createConsumer(queue);
Testable t = new Testable() {
@Override
public synchronized void run() {
try {
for (int i = 0; i < 10; ++i) {
consumer.receiveNoWait();
synchronized (consumer) {
consumer.notifyAll();
}
TimeUnit.MILLISECONDS.sleep(1000 + (i * 100));
}
msg = "Should have thrown an IllegalStateException";
} catch (Exception ex) {
LOG.info("Caught exception on receiveNoWait: {}", ex);
}
}
};
synchronized (consumer) {
new Thread(t).start();
consumer.wait(10000);
}
stopBroker();
assertTrue(t.passed());
}
@Test(timeout=30000)
public void testConsumerReceiveTimedThrowsWhenBrokerStops() throws Exception {
connection = createConnection();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Queue queue = session.createQueue(getDestinationName());
connection.start();
final MessageConsumer consumer=session.createConsumer(queue);
Testable t = new Testable() {
@Override
public synchronized void run() {
try {
for (int i = 0; i < 10; ++i) {
consumer.receive(100 + (i * 1000));
synchronized (consumer) {
consumer.notifyAll();
}
}
msg = "Should have thrown an IllegalStateException";
} catch (Exception ex) {
LOG.info("Caught exception on receive(1000): {}", ex);
}
}
};
synchronized (consumer) {
new Thread(t).start();
consumer.wait(10000);
consumer.notifyAll();
}
stopBroker();
assertTrue(t.passed());
}
@Test(timeout=30000)
public void testConsumerReceiveReturnsBrokerStops() throws Exception {
connection = createConnection();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Queue queue = session.createQueue(getDestinationName());
connection.start();
final MessageConsumer consumer=session.createConsumer(queue);
Testable t = new Testable() {
@Override
public synchronized void run() {
try {
Message m = consumer.receive(1);
synchronized (consumer) {
consumer.notifyAll();
if (m != null) {
msg = "Should have returned null";
return;
}
}
m = consumer.receive();
if (m != null) {
msg = "Should have returned null";
}
} catch (Exception ex) {
LOG.info("Caught exception on receive(): {}", ex);
}
}
};
synchronized (consumer) {
new Thread(t).start();
consumer.wait(10000);
}
stopBroker();
assertTrue(t.passed());
}
@Test(timeout=30000)
public void testBrokerRestartWontHangConnectionClose() throws Exception {
connection = createConnection();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Queue queue = session.createQueue(getDestinationName());
connection.start();
MessageProducer producer = session.createProducer(queue);
producer.setDeliveryMode(DeliveryMode.PERSISTENT);
Message m = session.createTextMessage("Sample text");
producer.send(m);
restartBroker();
try {
connection.close();
} catch (Exception ex) {
LOG.error("Should not thrown on disconnected connection close(): {}", ex);
fail("Should not have thrown an exception.");
}
}
@Test(timeout=30 * 1000)
public void testProduceAndConsumeLargeNumbersOfMessages() throws Exception {
int count = 1000;
connection = createConnection();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Queue queue = session.createQueue(getDestinationName());
connection.start();
MessageProducer producer= session.createProducer(queue);
for (int i = 0; i < count; i++) {
Message m=session.createTextMessage("Test-Message:"+i);
producer.send(m);
}
MessageConsumer consumer=session.createConsumer(queue);
for(int i = 0; i < count; i++) {
Message message = consumer.receive(5000);
assertNotNull(message);
assertEquals("Test-Message:" + i,((TextMessage) message).getText());
}
assertNull(consumer.receiveNoWait());
}
@Test(timeout=30000)
public void testSyncSends() throws Exception {
ActiveMQAdmin.enableJMSFrameTracing();
connection = createConnection(true);
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Queue queue = session.createQueue(getDestinationName());
connection.start();
MessageProducer producer = session.createProducer(queue);
producer.setDeliveryMode(DeliveryMode.PERSISTENT);
Message toSend = session.createTextMessage("Sample text");
producer.send(toSend);
MessageConsumer consumer = session.createConsumer(queue);
Message received = consumer.receive(5000);
assertNotNull(received);
}
@Test(timeout=30000)
public void testDurableConsumerAsync() throws Exception {
ActiveMQAdmin.enableJMSFrameTracing();
final CountDownLatch latch = new CountDownLatch(1);
final AtomicReference<Message> received = new AtomicReference<Message>();
String durableClientId = getDestinationName() + "-ClientId";
connection = createConnection(durableClientId);
{
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Topic topic = session.createTopic(getDestinationName());
MessageConsumer consumer = session.createDurableSubscriber(topic, "DurbaleTopic");
consumer.setMessageListener(new MessageListener() {
@Override
public void onMessage(Message message) {
received.set(message);
latch.countDown();
}
});
MessageProducer producer = session.createProducer(topic);
producer.setDeliveryMode(DeliveryMode.PERSISTENT);
connection.start();
TextMessage message = session.createTextMessage();
message.setText("hello");
producer.send(message);
assertTrue(latch.await(10, TimeUnit.SECONDS));
assertNotNull("Should have received a message by now.", received.get());
assertTrue("Should be an instance of TextMessage", received.get() instanceof TextMessage);
}
}
@Test(timeout=30000)
public void testDurableConsumerSync() throws Exception {
ActiveMQAdmin.enableJMSFrameTracing();
String durableClientId = getDestinationName() + "-ClientId";
connection = createConnection(durableClientId);
{
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Topic topic = session.createTopic(getDestinationName());
final MessageConsumer consumer = session.createDurableSubscriber(topic, "DurbaleTopic");
MessageProducer producer = session.createProducer(topic);
producer.setDeliveryMode(DeliveryMode.PERSISTENT);
connection.start();
TextMessage message = session.createTextMessage();
message.setText("hello");
producer.send(message);
final AtomicReference<Message> msg = new AtomicReference<Message>();
assertTrue(Wait.waitFor(new Wait.Condition() {
@Override
public boolean isSatisified() throws Exception {
msg.set(consumer.receiveNoWait());
return msg.get() != null;
}
}, TimeUnit.SECONDS.toMillis(25), TimeUnit.MILLISECONDS.toMillis(200)));
assertNotNull("Should have received a message by now.", msg.get());
assertTrue("Should be an instance of TextMessage", msg.get() instanceof TextMessage);
}
}
@Test(timeout=30000)
public void testTopicConsumerAsync() throws Exception {
ActiveMQAdmin.enableJMSFrameTracing();
final CountDownLatch latch = new CountDownLatch(1);
final AtomicReference<Message> received = new AtomicReference<Message>();
connection = createConnection();
{
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Topic topic = session.createTopic(getDestinationName());
MessageConsumer consumer = session.createConsumer(topic);
consumer.setMessageListener(new MessageListener() {
@Override
public void onMessage(Message message) {
received.set(message);
latch.countDown();
}
});
MessageProducer producer = session.createProducer(topic);
producer.setDeliveryMode(DeliveryMode.PERSISTENT);
connection.start();
TextMessage message = session.createTextMessage();
message.setText("hello");
producer.send(message);
assertTrue(latch.await(10, TimeUnit.SECONDS));
assertNotNull("Should have received a message by now.", received.get());
assertTrue("Should be an instance of TextMessage", received.get() instanceof TextMessage);
}
connection.close();
}
@Test(timeout=45000)
public void testTopicConsumerSync() throws Exception {
ActiveMQAdmin.enableJMSFrameTracing();
connection = createConnection();
{
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Topic topic = session.createTopic(getDestinationName());
final MessageConsumer consumer = session.createConsumer(topic);
MessageProducer producer = session.createProducer(topic);
producer.setDeliveryMode(DeliveryMode.PERSISTENT);
connection.start();
TextMessage message = session.createTextMessage();
message.setText("hello");
producer.send(message);
final AtomicReference<Message> msg = new AtomicReference<Message>();
assertTrue(Wait.waitFor(new Wait.Condition() {
@Override
public boolean isSatisified() throws Exception {
msg.set(consumer.receiveNoWait());
return msg.get() != null;
}
}));
assertNotNull("Should have received a message by now.", msg.get());
assertTrue("Should be an instance of TextMessage", msg.get() instanceof TextMessage);
}
}
@Test(timeout=30000)
public void testConnectionsAreClosed() throws Exception {
ActiveMQAdmin.enableJMSFrameTracing();
final ConnectorViewMBean connector = getProxyToConnectionView(getTargetConnectorName());
LOG.info("Current number of Connections is: {}", connector.connectionCount());
ArrayList<Connection> connections = new ArrayList<Connection>();
for (int i = 0; i < 10; i++) {
connections.add(createConnection(null));
}
LOG.info("Current number of Connections is: {}", connector.connectionCount());
for (Connection connection : connections) {
connection.close();
}
assertTrue("Should have no connections left.", Wait.waitFor(new Wait.Condition() {
@Override
public boolean isSatisified() throws Exception {
LOG.info("Current number of Connections is: {}", connector.connectionCount());
return connector.connectionCount() == 0;
}
}));
}
protected String getTargetConnectorName() {
return "amqp";
}
@Test(timeout=30000)
public void testExecptionListenerCalledOnBrokerStop() throws Exception {
ActiveMQAdmin.enableJMSFrameTracing();
connection = createConnection();
Session s = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
connection.start();
final CountDownLatch called = new CountDownLatch(1);
connection.setExceptionListener(new ExceptionListener() {
@Override
public void onException(JMSException exception) {
LOG.info("Exception listener called: ", exception);
called.countDown();
}
});
// This makes sure the connection is completely up and connected
Destination destination = s.createTemporaryQueue();
MessageProducer producer = s.createProducer(destination);
assertNotNull(producer);
stopBroker();
assertTrue("No exception listener event fired.", called.await(15, TimeUnit.SECONDS));
}
@Test(timeout=30000)
public void testSessionTransactedCommit() throws Exception {
ActiveMQAdmin.enableJMSFrameTracing();
connection = createConnection();
Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
Queue queue = session.createQueue(getDestinationName());
connection.start();
// transacted producer
MessageProducer pr = session.createProducer(queue);
for (int i = 0; i < 10; i++) {
Message m = session.createTextMessage("TestMessage" + i);
pr.send(m);
}
// No commit in place, so no message should be dispatched.
QueueViewMBean queueView = getProxyToQueue(getDestinationName());
assertEquals(0, queueView.getQueueSize());
session.commit();
// No commit in place, so no message should be dispatched.
assertEquals(10, queueView.getQueueSize());
session.close();
}
@Test(timeout=30000)
public void testSessionTransactedRollback() throws Exception {
ActiveMQAdmin.enableJMSFrameTracing();
connection = createConnection();
Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
Queue queue = session.createQueue(getDestinationName());
connection.start();
// transacted producer
MessageProducer pr = session.createProducer(queue);
for (int i = 0; i < 10; i++) {
Message m = session.createTextMessage("TestMessage" + i);
pr.send(m);
}
session.rollback();
MessageConsumer consumer = session.createConsumer(queue);
// No commit in place, so no message should be dispatched.
QueueViewMBean queueView = getProxyToQueue(getDestinationName());
assertEquals(0, queueView.getQueueSize());
assertNull(consumer.receive(100));
consumer.close();
session.close();
}
private String createLargeString(int sizeInBytes) {
byte[] base = {1, 2, 3, 4, 5, 6, 7, 8, 9, 0};
StringBuilder builder = new StringBuilder();
for (int i = 0; i < sizeInBytes; i++) {
builder.append(base[i % base.length]);
}
LOG.debug("Created string with size : " + builder.toString().getBytes().length + " bytes");
return builder.toString();
}
@Test(timeout = 30 * 1000)
public void testSendLargeMessage() throws JMSException, InterruptedException {
connection = createConnection();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
String queueName = getDestinationName();
Queue queue = session.createQueue(queueName);
MessageProducer producer=session.createProducer(queue);
int messageSize = 1024 * 1024;
String messageText = createLargeString(messageSize);
Message m=session.createTextMessage(messageText);
LOG.debug("Sending message of {} bytes on queue {}", messageSize, queueName);
producer.send(m);
MessageConsumer consumer=session.createConsumer(queue);
Message message = consumer.receive();
assertNotNull(message);
assertTrue(message instanceof TextMessage);
TextMessage textMessage = (TextMessage) message;
LOG.debug(">>>> Received message of length {}", textMessage.getText().length());
assertEquals(messageSize, textMessage.getText().length());
assertEquals(messageText, textMessage.getText());
}
@Test(timeout=30*1000)
public void testDurableTopicStateAfterSubscriberClosed() throws Exception {
String durableClientId = getDestinationName() + "-ClientId";
String durableSubscriberName = getDestinationName() + "-SubscriptionName";
BrokerView adminView = this.brokerService.getAdminView();
int durableSubscribersAtStart = adminView.getDurableTopicSubscribers().length;
int inactiveSubscribersAtStart = adminView.getInactiveDurableTopicSubscribers().length;
LOG.debug(">>>> At Start, durable Subscribers {} inactiveDurableSubscribers {}", durableSubscribersAtStart, inactiveSubscribersAtStart);
TopicConnection subscriberConnection =
JMSClientContext.INSTANCE.createTopicConnection(getBrokerURI(), "admin", "password");
subscriberConnection.setClientID(durableClientId);
TopicSession subscriberSession = subscriberConnection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
Topic topic = subscriberSession.createTopic(getDestinationName());
TopicSubscriber messageConsumer = subscriberSession.createDurableSubscriber(topic, durableSubscriberName);
assertNotNull(messageConsumer);
int durableSubscribers = adminView.getDurableTopicSubscribers().length;
int inactiveSubscribers = adminView.getInactiveDurableTopicSubscribers().length;
LOG.debug(">>>> durable Subscribers after creation {} inactiveDurableSubscribers {}", durableSubscribers, inactiveSubscribers);
assertEquals("Wrong number of durable subscribers after first subscription", 1, (durableSubscribers - durableSubscribersAtStart));
assertEquals("Wrong number of inactive durable subscribers after first subscription", 0, (inactiveSubscribers - inactiveSubscribersAtStart));
subscriberConnection.close();
durableSubscribers = adminView.getDurableTopicSubscribers().length;
inactiveSubscribers = adminView.getInactiveDurableTopicSubscribers().length;
LOG.debug(">>>> durable Subscribers after close {} inactiveDurableSubscribers {}", durableSubscribers, inactiveSubscribers);
assertEquals("Wrong number of durable subscribers after close", 0, (durableSubscribersAtStart));
assertEquals("Wrong number of inactive durable subscribers after close", 1, (inactiveSubscribers - inactiveSubscribersAtStart));
}
@Test(timeout=30000)
public void testDurableConsumerUnsubscribe() throws Exception {
ActiveMQAdmin.enableJMSFrameTracing();
String durableClientId = getDestinationName() + "-ClientId";
final BrokerViewMBean broker = getProxyToBroker();
connection = createConnection(durableClientId);
connection.start();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Topic topic = session.createTopic(getDestinationName());
MessageConsumer consumer = session.createDurableSubscriber(topic, "DurbaleTopic");
assertTrue(Wait.waitFor(new Wait.Condition() {
@Override
public boolean isSatisified() throws Exception {
return broker.getInactiveDurableTopicSubscribers().length == 0 &&
broker.getDurableTopicSubscribers().length == 1;
}
}, TimeUnit.SECONDS.toMillis(20), TimeUnit.MILLISECONDS.toMillis(250)));
consumer.close();
assertTrue(Wait.waitFor(new Wait.Condition() {
@Override
public boolean isSatisified() throws Exception {
return broker.getInactiveDurableTopicSubscribers().length == 1 &&
broker.getDurableTopicSubscribers().length == 0;
}
}, TimeUnit.SECONDS.toMillis(20), TimeUnit.MILLISECONDS.toMillis(250)));
session.unsubscribe("DurbaleTopic");
assertTrue(Wait.waitFor(new Wait.Condition() {
@Override
public boolean isSatisified() throws Exception {
return broker.getInactiveDurableTopicSubscribers().length == 0 &&
broker.getDurableTopicSubscribers().length == 0;
}
}, TimeUnit.SECONDS.toMillis(20), TimeUnit.MILLISECONDS.toMillis(250)));
}
@Test(timeout=30000)
public void testDurableConsumerUnsubscribeWhileNoSubscription() throws Exception {
ActiveMQAdmin.enableJMSFrameTracing();
final BrokerViewMBean broker = getProxyToBroker();
connection = createConnection();
connection.start();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
assertTrue(Wait.waitFor(new Wait.Condition() {
@Override
public boolean isSatisified() throws Exception {
return broker.getInactiveDurableTopicSubscribers().length == 0 &&
broker.getDurableTopicSubscribers().length == 0;
}
}, TimeUnit.SECONDS.toMillis(20), TimeUnit.MILLISECONDS.toMillis(250)));
try {
session.unsubscribe("DurbaleTopic");
fail("Should have thrown as subscription is in use.");
} catch (JMSException ex) {
}
}
@Test(timeout=30000)
public void testDurableConsumerUnsubscribeWhileActive() throws Exception {
ActiveMQAdmin.enableJMSFrameTracing();
String durableClientId = getDestinationName() + "-ClientId";
final BrokerViewMBean broker = getProxyToBroker();
connection = createConnection(durableClientId);
connection.start();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Topic topic = session.createTopic(getDestinationName());
session.createDurableSubscriber(topic, "DurbaleTopic");
assertTrue(Wait.waitFor(new Wait.Condition() {
@Override
public boolean isSatisified() throws Exception {
return broker.getInactiveDurableTopicSubscribers().length == 0 &&
broker.getDurableTopicSubscribers().length == 1;
}
}, TimeUnit.SECONDS.toMillis(20), TimeUnit.MILLISECONDS.toMillis(250)));
try {
session.unsubscribe("DurbaleTopic");
fail("Should have thrown as subscription is in use.");
} catch (JMSException ex) {
}
}
@Test(timeout=30000)
public void testRedeliveredHeader() throws Exception {
connection = createConnection();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Queue queue = session.createQueue(getDestinationName());
connection.start();
MessageProducer producer = session.createProducer(queue);
producer.setDeliveryMode(DeliveryMode.PERSISTENT);
for (int i = 1; i < 100; i++) {
Message m = session.createTextMessage(i + ". Sample text");
producer.send(m);
}
MessageConsumer consumer = session.createConsumer(queue);
receiveMessages(consumer);
consumer.close();
consumer = session.createConsumer(queue);
receiveMessages(consumer);
consumer.close();
}
@Test(timeout=30000)
public void testCreateTemporaryQueue() throws Exception {
ActiveMQAdmin.enableJMSFrameTracing();
connection = createConnection();
{
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Queue queue = session.createTemporaryQueue();
assertNotNull(queue);
assertTrue(queue instanceof TemporaryQueue);
final BrokerViewMBean broker = getProxyToBroker();
assertEquals(1, broker.getTemporaryQueues().length);
}
}
@Ignore("Legacy QPid client does not support creation of TemporaryTopics correctly")
@Test(timeout=30000)
public void testDeleteTemporaryQueue() throws Exception {
ActiveMQAdmin.enableJMSFrameTracing();
connection = createConnection();
{
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Queue queue = session.createTemporaryQueue();
assertNotNull(queue);
assertTrue(queue instanceof TemporaryQueue);
final BrokerViewMBean broker = getProxyToBroker();
assertEquals(1, broker.getTemporaryQueues().length);
TemporaryQueue tempQueue = (TemporaryQueue) queue;
tempQueue.delete();
assertTrue("Temp Queue should be deleted.", Wait.waitFor(new Wait.Condition() {
@Override
public boolean isSatisified() throws Exception {
return broker.getTemporaryQueues().length == 0;
}
}, TimeUnit.SECONDS.toMillis(30), TimeUnit.MILLISECONDS.toMillis(50)));
}
}
@Test(timeout=30000)
public void testCreateTemporaryTopic() throws Exception {
ActiveMQAdmin.enableJMSFrameTracing();
connection = createConnection();
{
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Topic topic = session.createTemporaryTopic();
assertNotNull(topic);
assertTrue(topic instanceof TemporaryTopic);
final BrokerViewMBean broker = getProxyToBroker();
assertEquals(1, broker.getTemporaryTopics().length);
}
}
@Test(timeout=30000)
public void testDeleteTemporaryTopic() throws Exception {
ActiveMQAdmin.enableJMSFrameTracing();
connection = createConnection();
{
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Topic topic = session.createTemporaryTopic();
assertNotNull(topic);
assertTrue(topic instanceof TemporaryTopic);
final BrokerViewMBean broker = getProxyToBroker();
assertEquals(1, broker.getTemporaryTopics().length);
TemporaryTopic tempTopic = (TemporaryTopic) topic;
tempTopic.delete();
assertTrue("Temp Topic should be deleted.", Wait.waitFor(new Wait.Condition() {
@Override
public boolean isSatisified() throws Exception {
return broker.getTemporaryTopics().length == 0;
}
}, TimeUnit.SECONDS.toMillis(30), TimeUnit.MILLISECONDS.toMillis(50)));
}
}
protected void receiveMessages(MessageConsumer consumer) throws Exception {
for (int i = 0; i < 10; i++) {
Message message = consumer.receive(1000);
assertNotNull(message);
assertFalse(message.getJMSRedelivered());
}
}
}