blob: f4749a1169af4932e50acc65c6f56c423cc694fd [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq;
import java.net.URISyntaxException;
import java.util.Enumeration;
import java.util.HashMap;
import java.util.Vector;
import javax.jms.BytesMessage;
import javax.jms.ConnectionFactory;
import javax.jms.DeliveryMode;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MapMessage;
import javax.jms.MessageConsumer;
import javax.jms.MessageEOFException;
import javax.jms.MessageProducer;
import javax.jms.ObjectMessage;
import javax.jms.Session;
import javax.jms.StreamMessage;
import javax.jms.TextMessage;
import junit.framework.Test;
import org.apache.activemq.command.ActiveMQDestination;
/**
* Test cases used to test the JMS message consumer.
*
*
*/
public class JMSMessageTest extends JmsTestSupport {
public ActiveMQDestination destination;
public int deliveryMode = DeliveryMode.NON_PERSISTENT;
public int prefetch;
public int ackMode;
public byte destinationType = ActiveMQDestination.QUEUE_TYPE;
public boolean durableConsumer;
public String connectURL = "vm://localhost?marshal=false";
/**
* Run all these tests in both marshaling and non-marshaling mode.
*/
public void initCombos() {
addCombinationValues("connectURL", new Object[] {"vm://localhost?marshal=false",
"vm://localhost?marshal=true"});
addCombinationValues("deliveryMode", new Object[] {Integer.valueOf(DeliveryMode.NON_PERSISTENT),
Integer.valueOf(DeliveryMode.PERSISTENT)});
addCombinationValues("destinationType", new Object[] {Byte.valueOf(ActiveMQDestination.QUEUE_TYPE)});
}
public void testTextMessage() throws Exception {
// Receive a message with the JMS API
connection.start();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
destination = createDestination(session, destinationType);
MessageConsumer consumer = session.createConsumer(destination);
MessageProducer producer = session.createProducer(destination);
// Send the message.
{
TextMessage message = session.createTextMessage();
message.setText("Hi");
producer.send(message);
}
// Check the Message
{
TextMessage message = (TextMessage)consumer.receive(1000);
assertNotNull(message);
assertEquals("Hi", message.getText());
}
assertNull(consumer.receiveNoWait());
}
public static Test suite() {
return suite(JMSMessageTest.class);
}
public static void main(String[] args) {
junit.textui.TestRunner.run(suite());
}
@Override
protected ConnectionFactory createConnectionFactory() throws URISyntaxException {
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(connectURL);
return factory;
}
public void testBytesMessageLength() throws Exception {
// Receive a message with the JMS API
connection.start();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
destination = createDestination(session, destinationType);
MessageConsumer consumer = session.createConsumer(destination);
MessageProducer producer = session.createProducer(destination);
// Send the message
{
BytesMessage message = session.createBytesMessage();
message.writeInt(1);
message.writeInt(2);
message.writeInt(3);
message.writeInt(4);
producer.send(message);
}
// Check the message.
{
BytesMessage message = (BytesMessage)consumer.receive(1000);
assertNotNull(message);
assertEquals(16, message.getBodyLength());
}
assertNull(consumer.receiveNoWait());
}
public void testObjectMessage() throws Exception {
// Receive a message with the JMS API
connection.start();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
destination = createDestination(session, destinationType);
MessageConsumer consumer = session.createConsumer(destination);
MessageProducer producer = session.createProducer(destination);
// send the message.
{
ObjectMessage message = session.createObjectMessage();
message.setObject("Hi");
producer.send(message);
}
// Check the message
{
ObjectMessage message = (ObjectMessage)consumer.receive(1000);
assertNotNull(message);
assertEquals("Hi", message.getObject());
}
assertNull(consumer.receiveNoWait());
}
public void testBytesMessage() throws Exception {
// Receive a message with the JMS API
connection.start();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
destination = createDestination(session, destinationType);
MessageConsumer consumer = session.createConsumer(destination);
MessageProducer producer = session.createProducer(destination);
// Send the message
{
BytesMessage message = session.createBytesMessage();
message.writeBoolean(true);
producer.send(message);
}
// Check the message
{
BytesMessage message = (BytesMessage)consumer.receive(1000);
assertNotNull(message);
assertTrue(message.readBoolean());
try {
message.readByte();
fail("Expected exception not thrown.");
} catch (MessageEOFException e) {
}
}
assertNull(consumer.receiveNoWait());
}
public void testStreamMessage() throws Exception {
// Receive a message with the JMS API
connection.start();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
destination = createDestination(session, destinationType);
MessageConsumer consumer = session.createConsumer(destination);
MessageProducer producer = session.createProducer(destination);
// Send the message.
{
StreamMessage message = session.createStreamMessage();
message.writeString("This is a test to see how it works.");
producer.send(message);
}
// Check the message.
{
StreamMessage message = (StreamMessage)consumer.receive(1000);
assertNotNull(message);
// Invalid conversion should throw exception and not move the stream
// position.
try {
message.readByte();
fail("Should have received NumberFormatException");
} catch (NumberFormatException e) {
}
assertEquals("This is a test to see how it works.", message.readString());
// Invalid conversion should throw exception and not move the stream
// position.
try {
message.readByte();
fail("Should have received MessageEOFException");
} catch (MessageEOFException e) {
}
}
assertNull(consumer.receiveNoWait());
}
public void testMapMessage() throws Exception {
// Receive a message with the JMS API
connection.start();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
destination = createDestination(session, destinationType);
MessageConsumer consumer = session.createConsumer(destination);
MessageProducer producer = session.createProducer(destination);
// send the message.
{
MapMessage message = session.createMapMessage();
message.setBoolean("boolKey", true);
producer.send(message);
}
// get the message.
{
MapMessage message = (MapMessage)consumer.receive(1000);
assertNotNull(message);
assertTrue(message.getBoolean("boolKey"));
}
assertNull(consumer.receiveNoWait());
}
static class ForeignMessage implements TextMessage {
public int deliveryMode;
private String messageId;
private long timestamp;
private String correlationId;
private Destination replyTo;
private Destination destination;
private boolean redelivered;
private String type;
private long expiration;
private int priority;
private String text;
private final HashMap<String, Object> props = new HashMap<String, Object>();
@Override
public String getJMSMessageID() throws JMSException {
return messageId;
}
@Override
public void setJMSMessageID(String arg0) throws JMSException {
messageId = arg0;
}
@Override
public long getJMSTimestamp() throws JMSException {
return timestamp;
}
@Override
public void setJMSTimestamp(long arg0) throws JMSException {
timestamp = arg0;
}
@Override
public byte[] getJMSCorrelationIDAsBytes() throws JMSException {
return null;
}
@Override
public void setJMSCorrelationIDAsBytes(byte[] arg0) throws JMSException {
}
@Override
public void setJMSCorrelationID(String arg0) throws JMSException {
correlationId = arg0;
}
@Override
public String getJMSCorrelationID() throws JMSException {
return correlationId;
}
@Override
public Destination getJMSReplyTo() throws JMSException {
return replyTo;
}
@Override
public void setJMSReplyTo(Destination arg0) throws JMSException {
replyTo = arg0;
}
@Override
public Destination getJMSDestination() throws JMSException {
return destination;
}
@Override
public void setJMSDestination(Destination arg0) throws JMSException {
destination = arg0;
}
@Override
public int getJMSDeliveryMode() throws JMSException {
return deliveryMode;
}
@Override
public void setJMSDeliveryMode(int arg0) throws JMSException {
deliveryMode = arg0;
}
@Override
public boolean getJMSRedelivered() throws JMSException {
return redelivered;
}
@Override
public void setJMSRedelivered(boolean arg0) throws JMSException {
redelivered = arg0;
}
@Override
public String getJMSType() throws JMSException {
return type;
}
@Override
public void setJMSType(String arg0) throws JMSException {
type = arg0;
}
@Override
public long getJMSExpiration() throws JMSException {
return expiration;
}
@Override
public void setJMSExpiration(long arg0) throws JMSException {
expiration = arg0;
}
@Override
public long getJMSDeliveryTime() throws JMSException {
return 0;
}
@Override
public void setJMSDeliveryTime(long l) throws JMSException {
}
@Override
public int getJMSPriority() throws JMSException {
return priority;
}
@Override
public void setJMSPriority(int arg0) throws JMSException {
priority = arg0;
}
@Override
public void clearProperties() throws JMSException {
}
@Override
public boolean propertyExists(String arg0) throws JMSException {
return false;
}
@Override
public boolean getBooleanProperty(String arg0) throws JMSException {
return false;
}
@Override
public byte getByteProperty(String arg0) throws JMSException {
return 0;
}
@Override
public short getShortProperty(String arg0) throws JMSException {
return 0;
}
@Override
public int getIntProperty(String arg0) throws JMSException {
return 0;
}
@Override
public long getLongProperty(String arg0) throws JMSException {
return 0;
}
@Override
public float getFloatProperty(String arg0) throws JMSException {
return 0;
}
@Override
public double getDoubleProperty(String arg0) throws JMSException {
return 0;
}
@Override
public String getStringProperty(String arg0) throws JMSException {
return (String)props.get(arg0);
}
@Override
public Object getObjectProperty(String arg0) throws JMSException {
return props.get(arg0);
}
@Override
public Enumeration<?> getPropertyNames() throws JMSException {
return new Vector<String>(props.keySet()).elements();
}
@Override
public void setBooleanProperty(String arg0, boolean arg1) throws JMSException {
}
@Override
public void setByteProperty(String arg0, byte arg1) throws JMSException {
}
@Override
public void setShortProperty(String arg0, short arg1) throws JMSException {
}
@Override
public void setIntProperty(String arg0, int arg1) throws JMSException {
}
@Override
public void setLongProperty(String arg0, long arg1) throws JMSException {
}
@Override
public void setFloatProperty(String arg0, float arg1) throws JMSException {
}
@Override
public void setDoubleProperty(String arg0, double arg1) throws JMSException {
}
@Override
public void setStringProperty(String arg0, String arg1) throws JMSException {
props.put(arg0, arg1);
}
@Override
public void setObjectProperty(String arg0, Object arg1) throws JMSException {
props.put(arg0, arg1);
}
@Override
public void acknowledge() throws JMSException {
}
@Override
public void clearBody() throws JMSException {
}
@Override
public <T> T getBody(Class<T> aClass) throws JMSException {
return null;
}
@Override
public boolean isBodyAssignableTo(Class aClass) throws JMSException {
return true;
}
@Override
public void setText(String arg0) throws JMSException {
text = arg0;
}
@Override
public String getText() throws JMSException {
return text;
}
}
public void testForeignMessage() throws Exception {
// Receive a message with the JMS API
connection.start();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
destination = createDestination(session, destinationType);
MessageConsumer consumer = session.createConsumer(destination);
MessageProducer producer = session.createProducer(destination);
// Send the message.
{
ForeignMessage message = new ForeignMessage();
message.text = "Hello";
message.setStringProperty("test", "value");
long timeToLive = 10000L;
long start = System.currentTimeMillis();
producer.send(message, deliveryMode, 7, timeToLive);
long end = System.currentTimeMillis();
//validate jms spec 1.1 section 3.4.11 table 3.1
// JMSDestination, JMSDeliveryMode, JMSExpiration, JMSPriority, JMSMessageID, and JMSTimestamp
//must be set by sending a message.
assertNotNull(message.getJMSDestination());
assertEquals(deliveryMode, message.getJMSDeliveryMode());
assertTrue(start + timeToLive <= message.getJMSExpiration());
assertTrue(end + timeToLive >= message.getJMSExpiration());
assertEquals(7, message.getJMSPriority());
assertNotNull(message.getJMSMessageID());
assertTrue(start <= message.getJMSTimestamp());
assertTrue(end >= message.getJMSTimestamp());
}
// Validate message is OK.
{
TextMessage message = (TextMessage)consumer.receive(1000);
assertNotNull(message);
assertEquals("Hello", message.getText());
assertEquals("value", message.getStringProperty("test"));
}
assertNull(consumer.receiveNoWait());
}
}