blob: 319ce3fd828df69e66fd8af639565f3471b930f2 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.qpid.jms.producer;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import javax.jms.CompletionListener;
import javax.jms.Connection;
import javax.jms.DeliveryMode;
import javax.jms.Destination;
import javax.jms.InvalidDestinationException;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageProducer;
import javax.jms.Session;
import org.apache.qpid.jms.JmsConnectionFactory;
import org.apache.qpid.jms.JmsConnectionTestSupport;
import org.apache.qpid.jms.JmsDestination;
import org.apache.qpid.jms.JmsMessageProducer;
import org.apache.qpid.jms.JmsQueue;
import org.apache.qpid.jms.JmsSession;
import org.apache.qpid.jms.message.JmsOutboundMessageDispatch;
import org.apache.qpid.jms.provider.mock.MockRemotePeer;
import org.apache.qpid.jms.test.Wait;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.mockito.Mockito;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Test basic functionality around JmsConnection
*/
public class JmsMessageProducerTest extends JmsConnectionTestSupport {
private static final Logger LOG = LoggerFactory.getLogger(JmsMessageProducerTest.class);
private final MyCompletionListener completionListener = new MyCompletionListener();
private JmsSession session;
private final MockRemotePeer remotePeer = new MockRemotePeer();
@Override
@Before
public void setUp() throws Exception {
super.setUp();
remotePeer.start();
connection = createConnectionToMockProvider();
session = (JmsSession) connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
}
@Override
@After
public void tearDown() throws Exception {
remotePeer.shutdown();
super.tearDown();
}
@Test(timeout = 10000)
public void testMultipleCloseCallsNoErrors() throws Exception {
MessageProducer producer = session.createProducer(null);
producer.close();
producer.close();
}
@Test(timeout = 10000)
public void testCreateProducerWithNullDestination() throws Exception {
MessageProducer producer = session.createProducer(null);
assertNull(producer.getDestination());
}
@Test(timeout = 10000)
public void testGetDisableMessageID() throws Exception {
MessageProducer producer = session.createProducer(null);
assertFalse(producer.getDisableMessageID());
producer.setDisableMessageID(true);
assertTrue(producer.getDisableMessageID());
}
@Test(timeout = 10000)
public void testGetDisableTimeStamp() throws Exception {
MessageProducer producer = session.createProducer(null);
assertFalse(producer.getDisableMessageTimestamp());
producer.setDisableMessageTimestamp(true);
assertTrue(producer.getDisableMessageTimestamp());
}
@Test(timeout = 10000)
public void testPriorityConfiguration() throws Exception {
MessageProducer producer = session.createProducer(null);
assertEquals(Message.DEFAULT_PRIORITY, producer.getPriority());
producer.setPriority(9);
assertEquals(9, producer.getPriority());
}
@Test(timeout = 10000)
public void testTimeToLiveConfiguration() throws Exception {
MessageProducer producer = session.createProducer(null);
assertEquals(Message.DEFAULT_TIME_TO_LIVE, producer.getTimeToLive());
producer.setTimeToLive(1000);
assertEquals(1000, producer.getTimeToLive());
}
@Test(timeout = 10000)
public void testDeliveryModeConfiguration() throws Exception {
MessageProducer producer = session.createProducer(null);
assertEquals(Message.DEFAULT_DELIVERY_MODE, producer.getDeliveryMode());
producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
assertEquals(DeliveryMode.NON_PERSISTENT, producer.getDeliveryMode());
}
@Test(timeout = 10000)
public void testDeliveryDelayConfiguration() throws Exception {
MessageProducer producer = session.createProducer(null);
assertEquals(Message.DEFAULT_DELIVERY_DELAY, producer.getDeliveryDelay());
producer.setDeliveryDelay(2000);
assertEquals(2000, producer.getDeliveryDelay());
}
@Test(timeout = 10000)
public void testAnonymousProducerThrowsUOEWhenExplictDestinationNotProvided() throws Exception {
JmsMessageProducer producer = (JmsMessageProducer) session.createProducer(null);
Message message = Mockito.mock(Message.class);
try {
producer.send(message);
fail("Expected exception not thrown");
} catch (UnsupportedOperationException uoe) {
// expected
}
try {
producer.send(message, completionListener);
fail("Expected exception not thrown");
} catch (UnsupportedOperationException uoe) {
// expected
}
try {
producer.send(message, Message.DEFAULT_DELIVERY_MODE, Message.DEFAULT_PRIORITY, Message.DEFAULT_TIME_TO_LIVE);
fail("Expected exception not thrown");
} catch (UnsupportedOperationException uoe) {
// expected
}
try {
producer.send(message, Message.DEFAULT_DELIVERY_MODE, Message.DEFAULT_PRIORITY, Message.DEFAULT_TIME_TO_LIVE, completionListener);
fail("Expected exception not thrown");
} catch (UnsupportedOperationException uoe) {
// expected
}
}
@Test(timeout = 10000)
public void testExplicitProducerThrowsUOEWhenExplictDestinationIsProvided() throws Exception {
JmsDestination dest = new JmsQueue("explicitDestination");
JmsMessageProducer producer = (JmsMessageProducer) session.createProducer(dest);
Message message = Mockito.mock(Message.class);
try {
producer.send(dest, message);
fail("Expected exception not thrown");
} catch (UnsupportedOperationException uoe) {
// expected
}
try {
producer.send(dest, message, completionListener);
fail("Expected exception not thrown");
} catch (UnsupportedOperationException uoe) {
// expected
}
try {
producer.send(dest, message, Message.DEFAULT_DELIVERY_MODE, Message.DEFAULT_PRIORITY, Message.DEFAULT_TIME_TO_LIVE);
fail("Expected exception not thrown");
} catch (UnsupportedOperationException uoe) {
// expected
}
try {
producer.send(dest, message, Message.DEFAULT_DELIVERY_MODE, Message.DEFAULT_PRIORITY, Message.DEFAULT_TIME_TO_LIVE, completionListener);
fail("Expected exception not thrown");
} catch (UnsupportedOperationException uoe) {
// expected
}
}
@Test(timeout = 10000)
public void testAnonymousDestinationProducerThrowsIDEWhenNullDestinationIsProvided() throws Exception {
JmsMessageProducer producer = (JmsMessageProducer) session.createProducer(null);
Message message = Mockito.mock(Message.class);
try {
producer.send(null, message);
fail("Expected exception not thrown");
} catch (InvalidDestinationException ide) {
// expected
}
try {
producer.send(null, message, completionListener);
fail("Expected exception not thrown");
} catch (InvalidDestinationException ide) {
// expected
}
try {
producer.send(null, message, Message.DEFAULT_DELIVERY_MODE, Message.DEFAULT_PRIORITY, Message.DEFAULT_TIME_TO_LIVE);
fail("Expected exception not thrown");
} catch (InvalidDestinationException ide) {
// expected
}
try {
producer.send(null, message, Message.DEFAULT_DELIVERY_MODE, Message.DEFAULT_PRIORITY, Message.DEFAULT_TIME_TO_LIVE, completionListener);
fail("Expected exception not thrown");
} catch (InvalidDestinationException ide) {
// expected
}
}
@Test(timeout = 10000)
public void testAnonymousProducerThrowsIAEWhenNullCompletionListenerProvided() throws Exception {
JmsMessageProducer producer = (JmsMessageProducer) session.createProducer(null);
JmsDestination dest = new JmsQueue("explicitDestination");
Message message = Mockito.mock(Message.class);
try {
producer.send(dest, message, null);
fail("Expected exception not thrown");
} catch (IllegalArgumentException iae) {
// expected
}
try {
producer.send(dest, message, Message.DEFAULT_DELIVERY_MODE, Message.DEFAULT_PRIORITY, Message.DEFAULT_TIME_TO_LIVE, null);
fail("Expected exception not thrown");
} catch (IllegalArgumentException iae) {
// expected
}
}
@Test(timeout = 10000)
public void testExplicitProducerThrowsIAEWhenNullCompletionListenerIsProvided() throws Exception {
JmsDestination dest = new JmsQueue("explicitDestination");
JmsMessageProducer producer = (JmsMessageProducer) session.createProducer(dest);
Message message = Mockito.mock(Message.class);
try {
producer.send(message, null);
fail("Expected exception not thrown");
} catch (IllegalArgumentException iae) {
// expected
}
try {
producer.send(message, Message.DEFAULT_DELIVERY_MODE, Message.DEFAULT_PRIORITY, Message.DEFAULT_TIME_TO_LIVE, null);
fail("Expected exception not thrown");
} catch (IllegalArgumentException iae) {
// expected
}
}
@Test(timeout = 10000)
public void testInOrderSendAcksCompletionsReturnInOrder() throws Exception {
final int MESSAGE_COUNT = 3;
final MockRemotePeer remotePoor = MockRemotePeer.INSTANCE;
JmsConnectionFactory factory = new JmsConnectionFactory(
"mock://localhost?mock.delayCompletionCalls=true");
Connection connection = factory.createConnection();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
final Destination destination = new JmsQueue("explicitDestination");
JmsMessageProducer producer = (JmsMessageProducer) session.createProducer(destination);
final MyCompletionListener listener = new MyCompletionListener();
sendMessages(MESSAGE_COUNT, producer, listener);
assertTrue("Not all sends made it to the remote", Wait.waitFor(new Wait.Condition() {
@Override
public boolean isSatisified() throws Exception {
return remotePoor.getPendingCompletions(destination).size() == MESSAGE_COUNT;
}
}));
remotePoor.completeAllPendingSends(destination);
assertTrue("Not all completions triggered", Wait.waitFor(new Wait.Condition() {
@Override
public boolean isSatisified() throws Exception {
return listener.getCompletedSends().size() == MESSAGE_COUNT;
}
}));
assertMessageCompletedInOrder(MESSAGE_COUNT, listener);
connection.close();
}
@Test(timeout = 10000)
public void testReversedOrderSendAcksCompletionsReturnInOrder() throws Exception {
final int MESSAGE_COUNT = 3;
final MockRemotePeer remotePoor = MockRemotePeer.INSTANCE;
JmsConnectionFactory factory = new JmsConnectionFactory(
"mock://localhost?mock.delayCompletionCalls=true");
Connection connection = factory.createConnection();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
final Destination destination = new JmsQueue("explicitDestination");
JmsMessageProducer producer = (JmsMessageProducer) session.createProducer(destination);
final MyCompletionListener listener = new MyCompletionListener();
sendMessages(MESSAGE_COUNT, producer, listener);
assertTrue("Not all sends made it to the remote", Wait.waitFor(new Wait.Condition() {
@Override
public boolean isSatisified() throws Exception {
return remotePoor.getPendingCompletions(destination).size() == MESSAGE_COUNT;
}
}));
List<JmsOutboundMessageDispatch> pending = remotePoor.getPendingCompletions(destination);
assertEquals(MESSAGE_COUNT, pending.size());
Collections.reverse(pending);
for (JmsOutboundMessageDispatch envelope : pending) {
LOG.info("Trigger completion of message: {}", envelope.getMessage().getJMSMessageID());
remotePoor.completePendingSend(envelope);
}
assertTrue("Not all completions triggered", Wait.waitFor(new Wait.Condition() {
@Override
public boolean isSatisified() throws Exception {
return listener.getCompletedSends().size() == MESSAGE_COUNT;
}
}));
assertMessageCompletedInOrder(MESSAGE_COUNT, listener);
connection.close();
}
@Test(timeout = 10000)
public void testInOrderSendFailuresCompletionsReturnInOrder() throws Exception {
final int MESSAGE_COUNT = 3;
final MockRemotePeer remotePoor = MockRemotePeer.INSTANCE;
JmsConnectionFactory factory = new JmsConnectionFactory(
"mock://localhost?mock.delayCompletionCalls=true");
Connection connection = factory.createConnection();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
final Destination destination = new JmsQueue("explicitDestination");
JmsMessageProducer producer = (JmsMessageProducer) session.createProducer(destination);
final MyCompletionListener listener = new MyCompletionListener();
sendMessages(MESSAGE_COUNT, producer, listener);
assertTrue("Not all messages sent", Wait.waitFor(new Wait.Condition() {
@Override
public boolean isSatisified() throws Exception {
return remotePoor.getPendingCompletions(destination).size() == MESSAGE_COUNT;
}
}));
remotePoor.failAllPendingSends(destination, new JMSException("Could not send message"));
assertTrue("Not all completions triggered", Wait.waitFor(new Wait.Condition() {
@Override
public boolean isSatisified() throws Exception {
return listener.getFailedSends().size() == MESSAGE_COUNT;
}
}));
assertMessageFailedInOrder(MESSAGE_COUNT, listener);
connection.close();
}
@Test(timeout = 10000)
public void testReversedOrderSendAcksFailuresReturnInOrder() throws Exception {
final int MESSAGE_COUNT = 3;
final MockRemotePeer remotePoor = MockRemotePeer.INSTANCE;
JmsConnectionFactory factory = new JmsConnectionFactory(
"mock://localhost?mock.delayCompletionCalls=true");
Connection connection = factory.createConnection();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
final Destination destination = new JmsQueue("explicitDestination");
JmsMessageProducer producer = (JmsMessageProducer) session.createProducer(destination);
final MyCompletionListener listener = new MyCompletionListener();
sendMessages(MESSAGE_COUNT, producer, listener);
assertTrue("Not all sends made it to the remote", Wait.waitFor(new Wait.Condition() {
@Override
public boolean isSatisified() throws Exception {
return remotePoor.getPendingCompletions(destination).size() == MESSAGE_COUNT;
}
}));
List<JmsOutboundMessageDispatch> pending = remotePoor.getPendingCompletions(destination);
assertEquals(MESSAGE_COUNT, pending.size());
Collections.reverse(pending);
for (JmsOutboundMessageDispatch envelope : pending) {
LOG.info("Trigger failure of message: {}", envelope.getMessage().getJMSMessageID());
remotePoor.failPendingSend(envelope, new JMSException("Failed to send message"));
}
assertTrue("Not all failures triggered", Wait.waitFor(new Wait.Condition() {
@Override
public boolean isSatisified() throws Exception {
return listener.getFailedSends().size() == MESSAGE_COUNT;
}
}));
assertMessageFailedInOrder(MESSAGE_COUNT, listener);
connection.close();
}
@Test(timeout = 10000)
public void testInterleavedCompletionsReturnedInOrder() throws Exception {
final int MESSAGE_COUNT = 3;
final MockRemotePeer remotePoor = MockRemotePeer.INSTANCE;
JmsConnectionFactory factory = new JmsConnectionFactory(
"mock://localhost?mock.delayCompletionCalls=true");
Connection connection = factory.createConnection();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
final Destination destination = new JmsQueue("explicitDestination");
JmsMessageProducer producer = (JmsMessageProducer) session.createProducer(destination);
final MyCompletionListener listener = new MyCompletionListener();
sendMessages(MESSAGE_COUNT, producer, listener);
assertTrue("Not all sends made it to the remote", Wait.waitFor(new Wait.Condition() {
@Override
public boolean isSatisified() throws Exception {
return remotePoor.getPendingCompletions(destination).size() == MESSAGE_COUNT;
}
}));
List<JmsOutboundMessageDispatch> pending = remotePoor.getPendingCompletions(destination);
assertEquals(MESSAGE_COUNT, pending.size());
Collections.reverse(pending);
for (JmsOutboundMessageDispatch envelope : pending) {
int sequence = envelope.getMessage().getIntProperty("sequence");
if (sequence % 2 == 0) {
LOG.info("Trigger completion of message: {}", envelope.getMessage().getJMSMessageID());
remotePoor.completePendingSend(envelope);
} else {
LOG.info("Trigger failure of message: {}", envelope.getMessage().getJMSMessageID());
remotePoor.failPendingSend(envelope, new JMSException("Failed to send message"));
}
}
assertTrue("Not all completions triggered", Wait.waitFor(new Wait.Condition() {
@Override
public boolean isSatisified() throws Exception {
return listener.getCombinedSends().size() == MESSAGE_COUNT;
}
}));
assertTotalCompletionOrder(MESSAGE_COUNT, listener);
connection.close();
}
private void sendMessages(int count, JmsMessageProducer producer, MyCompletionListener listener) throws Exception {
for (int i = 0; i < count; ++i) {
Message message = session.createMessage();
message.setIntProperty("sequence", i);
producer.send(message, listener);
}
}
private void assertMessageCompletedInOrder(int expected, MyCompletionListener listener) throws Exception {
assertEquals("Did not get expected number of completions", expected, listener.completed.size());
for (int i = 0; i < listener.completed.size(); ++i) {
int sequence = listener.completed.get(i).getIntProperty("sequence");
assertEquals("Did not complete expected message: " + i + " got: " + sequence, i, sequence);
}
}
private void assertMessageFailedInOrder(int expected, MyCompletionListener listener) throws Exception {
assertEquals("Did not get expected number of failures", expected, listener.failed.size());
for (int i = 0; i < listener.failed.size(); ++i) {
int sequence = listener.failed.get(i).getIntProperty("sequence");
assertEquals("Did not fail expected message: " + i + " got: " + sequence, i, sequence);
}
}
private void assertTotalCompletionOrder(int expected, MyCompletionListener listener) throws Exception {
assertEquals("Did not get expected number of failures", expected, listener.combinedResult.size());
for (int i = 0; i < listener.combinedResult.size(); ++i) {
int sequence = listener.combinedResult.get(i).getIntProperty("sequence");
assertEquals("Did not fail expected message: " + i + " got: " + sequence, i, sequence);
}
}
private class MyCompletionListener implements CompletionListener {
private final List<Message> completed = new ArrayList<Message>();
private final List<Message> failed = new ArrayList<Message>();
private final List<Message> combinedResult = new ArrayList<Message>();
@Override
public void onCompletion(Message message) {
try {
LOG.debug("Recording completed send: {}", message.getJMSMessageID());
} catch (JMSException e) {
}
completed.add(message);
combinedResult.add(message);
}
@Override
public void onException(Message message, Exception exception) {
try {
LOG.debug("Recording failed send: {} -> error {}", message.getJMSMessageID(), exception.getMessage());
} catch (JMSException e) {
}
failed.add(message);
combinedResult.add(message);
}
public List<Message> getCombinedSends() {
return combinedResult;
}
public List<Message> getCompletedSends() {
return completed;
}
public List<Message> getFailedSends() {
return failed;
}
}
}