blob: 522cf28babee3943d97b83c9123744cf75697875 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
*/
package org.apache.qpid.systests.jms_1_1.message;
import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.CoreMatchers.is;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNotSame;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assume.assumeThat;
import java.util.Enumeration;
import javax.jms.Connection;
import javax.jms.DeliveryMode;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageFormatException;
import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.junit.Test;
import org.apache.qpid.systests.JmsTestBase;
public class JMSHeadersAndPropertiesTest extends JmsTestBase
{
@Test
public void resentJMSMessageGetsReplacementJMSMessageID() throws Exception
{
Queue queue = createQueue(getTestName());
Connection connection = getConnection();
try
{
connection.start();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageProducer producer = session.createProducer(queue);
final Message sentMessage = session.createMessage();
producer.send(sentMessage);
final String originalId = sentMessage.getJMSMessageID();
assertNotNull("JMSMessageID must be set after first publish", originalId);
producer.send(sentMessage);
final String firstResendID = sentMessage.getJMSMessageID();
assertNotNull("JMSMessageID must be set after first resend", firstResendID);
assertNotSame("JMSMessageID must be changed second publish", originalId, firstResendID);
}
finally
{
connection.close();
}
}
@Test
public void redelivered() throws Exception
{
Queue queue = createQueue(getTestName());
Connection connection = getConnectionBuilder().setPrefetch(1).build();
try
{
Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
MessageProducer producer = session.createProducer(queue);
producer.send(session.createTextMessage("A"));
producer.send(session.createTextMessage("B"));
session.commit();
MessageConsumer consumer = session.createConsumer(queue);
connection.start();
Message message = consumer.receive(getReceiveTimeout());
assertTrue("TextMessage should be received", message instanceof TextMessage);
assertFalse("Unexpected JMSRedelivered after first receive", message.getJMSRedelivered());
assertEquals("Unexpected message content", "A", ((TextMessage) message).getText());
session.rollback();
message = consumer.receive(getReceiveTimeout());
assertTrue("TextMessage should be received", message instanceof TextMessage);
assertTrue("Unexpected JMSRedelivered after second receive", message.getJMSRedelivered());
assertEquals("Unexpected message content", "A", ((TextMessage) message).getText());
message = consumer.receive(getReceiveTimeout());
assertTrue("TextMessage should be received", message instanceof TextMessage);
assertFalse("Unexpected JMSRedelivered for second message", message.getJMSRedelivered());
assertEquals("Unexpected message content", "B", ((TextMessage) message).getText());
session.commit();
}
finally
{
connection.close();
}
}
@Test
public void headers() throws Exception
{
final Queue queue = createQueue(getTestName());
final Destination replyTo = createQueue(getTestName() + "_replyTo");
final Connection consumerConnection = getConnection();
try
{
final Session consumerSession = consumerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
final MessageConsumer consumer = consumerSession.createConsumer(queue);
final String correlationId = "testCorrelationId";
final String jmsType = "testJmsType";
final int priority = 1;
final long timeToLive = 30 * 60 * 1000;
final Connection producerConnection = getConnection();
try
{
final Session producerSession = producerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
final MessageProducer producer = producerSession.createProducer(queue);
final Message message = producerSession.createMessage();
message.setJMSCorrelationID(correlationId);
message.setJMSType(jmsType);
message.setJMSReplyTo(replyTo);
long currentTime = System.currentTimeMillis();
producer.send(message, DeliveryMode.NON_PERSISTENT, priority, timeToLive);
consumerConnection.start();
Message receivedMessage = consumer.receive(getReceiveTimeout());
assertNotNull(receivedMessage);
assertEquals("JMSCorrelationID mismatch", correlationId, receivedMessage.getJMSCorrelationID());
assertEquals("JMSType mismatch", message.getJMSType(), receivedMessage.getJMSType());
assertEquals("JMSReply To mismatch", message.getJMSReplyTo(), receivedMessage.getJMSReplyTo());
assertTrue("JMSMessageID does not start 'ID:'", receivedMessage.getJMSMessageID().startsWith("ID:"));
assertEquals("JMSPriority mismatch", priority, receivedMessage.getJMSPriority());
assertTrue(String.format(
"Unexpected JMSExpiration: got '%d', but expected value equals or greater than '%d'",
receivedMessage.getJMSExpiration(),
currentTime + timeToLive),
receivedMessage.getJMSExpiration() >= currentTime + timeToLive
&& receivedMessage.getJMSExpiration() <= System.currentTimeMillis() + timeToLive);
}
finally
{
producerConnection.close();
}
}
finally
{
consumerConnection.close();
}
}
@Test
public void groupIDAndGroupSeq() throws Exception
{
final Connection connection = getConnection();
try
{
assumeThat(isJMSXPropertySupported(connection, "JMSXGroupID"), is(equalTo(true)));
assumeThat(isJMSXPropertySupported(connection, "JMSXGroupSeq"), is(equalTo(true)));
final String groupId = "testGroup";
final int groupSequence = 3;
final Queue queue = createQueue(getTestName());
final Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
final MessageProducer producer = session.createProducer(queue);
final MessageConsumer consumer = session.createConsumer(queue);
final Message message = session.createMessage();
message.setStringProperty("JMSXGroupID", groupId);
message.setIntProperty("JMSXGroupSeq", groupSequence);
producer.send(message);
connection.start();
final Message receivedMessage = consumer.receive(getReceiveTimeout());
assertNotNull(receivedMessage);
assertEquals("Unexpected JMSXGroupID", groupId, receivedMessage.getStringProperty("JMSXGroupID"));
assertEquals("Unexpected JMSXGroupSeq", groupSequence, receivedMessage.getIntProperty("JMSXGroupSeq"));
}
finally
{
connection.close();
}
}
@Test
public void propertyValues() throws Exception
{
Queue queue = createQueue(getTestName());
Connection connection = getConnection();
try
{
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageProducer producer = session.createProducer(queue);
Message message = session.createMessage();
message.setBooleanProperty("boolean", true);
message.setByteProperty("byte", Byte.MAX_VALUE);
message.setShortProperty("short", Short.MAX_VALUE);
message.setIntProperty("int", Integer.MAX_VALUE);
message.setFloatProperty("float", Float.MAX_VALUE);
message.setDoubleProperty("double", Double.MAX_VALUE);
producer.send(message);
final MessageConsumer consumer = session.createConsumer(queue);
connection.start();
final Message receivedMessage = consumer.receive(getReceiveTimeout());
assertNotNull(receivedMessage);
assertEquals("Unexpected boolean property value", Boolean.TRUE, message.getBooleanProperty("boolean"));
assertEquals("Unexpected byte property value", Byte.MAX_VALUE, message.getByteProperty("byte"));
assertEquals("Unexpected short property value", Short.MAX_VALUE, message.getShortProperty("short"));
assertEquals("Unexpected int property value", Integer.MAX_VALUE, message.getIntProperty("int"));
assertEquals("Unexpected float property value", Float.MAX_VALUE, message.getFloatProperty("float"), 0f);
assertEquals("Unexpected double property value", Double.MAX_VALUE, message.getDoubleProperty("double"), 0d);
}
finally
{
connection.close();
}
}
@Test
public void unsupportedObjectPropertyValue() throws Exception
{
Queue queue = createQueue(getTestName());
Connection connection = getConnection();
try
{
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageProducer producer = session.createProducer(queue);
Message message = session.createMessage();
try
{
message.setObjectProperty("invalidObject", new Exception());
}
catch (MessageFormatException e)
{
// pass
}
String validValue = "validValue";
message.setObjectProperty("validObject", validValue);
producer.send(message);
final MessageConsumer consumer = session.createConsumer(queue);
connection.start();
final Message receivedMessage = consumer.receive(getReceiveTimeout());
assertNotNull(receivedMessage);
assertFalse("Unexpected property found", message.propertyExists("invalidObject"));
assertEquals("Unexpected property value", validValue, message.getObjectProperty("validObject"));
}
finally
{
connection.close();
}
}
@Test
public void disableJMSMessageId() throws Exception
{
Queue queue = createQueue(getTestName());
Connection connection = getConnection();
try
{
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageProducer producer = session.createProducer(queue);
Message message = session.createMessage();
producer.send(message);
assertNotNull("Produced message is expected to have a JMSMessageID", message.getJMSMessageID());
final MessageConsumer consumer = session.createConsumer(queue);
connection.start();
final Message receivedMessageWithId = consumer.receive(getReceiveTimeout());
assertNotNull(receivedMessageWithId);
assertNotNull("Received message is expected to have a JMSMessageID", receivedMessageWithId.getJMSMessageID());
assertEquals("Received message JMSMessageID should match the sent", message.getJMSMessageID(), receivedMessageWithId.getJMSMessageID());
producer.setDisableMessageID(true);
producer.send(message);
assertNull("Produced message is expected to not have a JMSMessageID", message.getJMSMessageID());
final Message receivedMessageWithoutId = consumer.receive(getReceiveTimeout());
assertNotNull(receivedMessageWithoutId);
assertNull("Received message is not expected to have a JMSMessageID", receivedMessageWithoutId.getJMSMessageID());
}
finally
{
connection.close();
}
}
private boolean isJMSXPropertySupported(final Connection connection, final String propertyName) throws JMSException
{
boolean supported = false;
Enumeration props = connection.getMetaData().getJMSXPropertyNames();
while (props.hasMoreElements())
{
String name = (String) props.nextElement();
if (name.equals(propertyName))
{
supported = true;
}
}
return supported;
}
}