blob: e67760a6c3efa4e1713437b08fddc78fa1b16128 [file] [log] [blame]
/*
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
*/
package org.apache.qpid.test.unit.client.message;
import org.apache.qpid.client.AMQConnection;
import org.apache.qpid.client.AMQDestination;
import org.apache.qpid.client.AMQQueue;
import org.apache.qpid.client.AMQSession;
import org.apache.qpid.test.utils.QpidTestCase;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.MessageProducer;
import javax.jms.ObjectMessage;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
public class ObjectMessageTest extends QpidTestCase implements MessageListener
{
private static final Logger _logger = LoggerFactory.getLogger(ObjectMessageTest.class);
private AMQConnection connection;
private AMQDestination destination;
private AMQSession session;
private MessageProducer producer;
private Serializable[] data;
private volatile boolean waiting;
private int received;
private final ArrayList items = new ArrayList();
private String _broker = "vm://:1";
protected void setUp() throws Exception
{
super.setUp();
connection = (AMQConnection) getConnection("guest", "guest");
destination = new AMQQueue(connection, randomize("LatencyTest"), true);
session = (AMQSession) connection.createSession(false, AMQSession.NO_ACKNOWLEDGE);
// set up a consumer
session.createConsumer(destination).setMessageListener(this);
connection.start();
// create a publisher
producer = session.createProducer(destination, false, false, true);
A a1 = new A(1, "A");
A a2 = new A(2, "a");
B b = new B(1, "B");
C c = new C();
c.put("A1", a1);
c.put("a2", a2);
c.put("B", b);
c.put("String", "String");
data = new Serializable[] { a1, a2, b, c, "Hello World!", new Integer(1001) };
}
protected void tearDown() throws Exception
{
close();
super.tearDown();
}
public ObjectMessageTest()
{ }
ObjectMessageTest(String broker) throws Exception
{
_broker = broker;
}
public void testSendAndReceive() throws Exception
{
try
{
send();
waitUntilReceived(data.length);
check();
_logger.info("All " + data.length + " items matched.");
}
catch (Exception e)
{
e.printStackTrace();
fail("This Test should succeed but failed due to: " + e);
}
}
public void testSetObjectPropertyForString() throws Exception
{
String testStringProperty = "TestStringProperty";
ObjectMessage msg = session.createObjectMessage(data[0]);
msg.setObjectProperty("TestStringProperty", testStringProperty);
assertEquals(testStringProperty, msg.getObjectProperty("TestStringProperty"));
}
public void testSetObjectPropertyForBoolean() throws Exception
{
ObjectMessage msg = session.createObjectMessage(data[0]);
msg.setObjectProperty("TestBooleanProperty", Boolean.TRUE);
assertEquals(Boolean.TRUE, msg.getObjectProperty("TestBooleanProperty"));
}
public void testSetObjectPropertyForByte() throws Exception
{
ObjectMessage msg = session.createObjectMessage(data[0]);
msg.setObjectProperty("TestByteProperty", Byte.MAX_VALUE);
assertEquals(Byte.MAX_VALUE, msg.getObjectProperty("TestByteProperty"));
}
public void testSetObjectPropertyForShort() throws Exception
{
ObjectMessage msg = session.createObjectMessage(data[0]);
msg.setObjectProperty("TestShortProperty", Short.MAX_VALUE);
assertEquals(Short.MAX_VALUE, msg.getObjectProperty("TestShortProperty"));
}
public void testSetObjectPropertyForInteger() throws Exception
{
ObjectMessage msg = session.createObjectMessage(data[0]);
msg.setObjectProperty("TestIntegerProperty", Integer.MAX_VALUE);
assertEquals(Integer.MAX_VALUE, msg.getObjectProperty("TestIntegerProperty"));
}
public void testSetObjectPropertyForDouble() throws Exception
{
ObjectMessage msg = session.createObjectMessage(data[0]);
msg.setObjectProperty("TestDoubleProperty", Double.MAX_VALUE);
assertEquals(Double.MAX_VALUE, msg.getObjectProperty("TestDoubleProperty"));
}
public void testSetObjectPropertyForFloat() throws Exception
{
ObjectMessage msg = session.createObjectMessage(data[0]);
msg.setObjectProperty("TestFloatProperty", Float.MAX_VALUE);
assertEquals(Float.MAX_VALUE, msg.getObjectProperty("TestFloatProperty"));
}
public void testSetObjectPropertyForByteArray() throws Exception
{
byte[] array = { 1, 2, 3, 4, 5 };
ObjectMessage msg = session.createObjectMessage(data[0]);
msg.setObjectProperty("TestByteArrayProperty", array);
assertTrue(Arrays.equals(array, (byte[]) msg.getObjectProperty("TestByteArrayProperty")));
}
public void testSetObjectForNull() throws Exception
{
ObjectMessage msg = session.createObjectMessage();
msg.setObject(null);
assertNull(msg.getObject());
}
private void send() throws Exception
{
for (int i = 0; i < data.length; i++)
{
ObjectMessage msg;
if ((i % 2) == 0)
{
msg = session.createObjectMessage(data[i]);
}
else
{
msg = session.createObjectMessage();
msg.setObject(data[i]);
}
producer.send(msg);
}
}
public void check() throws Exception
{
Object[] actual = (Object[]) items.toArray();
if (actual.length != data.length)
{
throw new Exception("Expected " + data.length + " objects, got " + actual.length);
}
for (int i = 0; i < data.length; i++)
{
if (actual[i] instanceof Exception)
{
throw new Exception("Error on receive of " + data[i], ((Exception) actual[i]));
}
if (actual[i] == null)
{
throw new Exception("Expected " + data[i] + " got null");
}
if (!data[i].equals(actual[i]))
{
throw new Exception("Expected " + data[i] + " got " + actual[i]);
}
}
}
private void close() throws Exception
{
session.close();
connection.close();
}
private synchronized void waitUntilReceived(int count) throws InterruptedException
{
waiting = true;
while (received < count)
{
wait();
}
waiting = false;
}
public void onMessage(Message message)
{
try
{
if (message instanceof ObjectMessage)
{
items.add(((ObjectMessage) message).getObject());
}
else
{
_logger.error("ERROR: Got " + message.getClass().getName() + " not ObjectMessage");
items.add(message);
}
}
catch (JMSException e)
{
e.printStackTrace();
items.add(e);
}
synchronized (this)
{
received++;
notify();
}
}
public static void main(String[] argv) throws Exception
{
String broker = (argv.length > 0) ? argv[0] : "vm://:1";
if ("-help".equals(broker))
{
System.out.println("Usage: <broker>");
}
new ObjectMessageTest(broker).testSendAndReceive();
}
private static class A implements Serializable
{
private String sValue;
private int iValue;
A(int i, String s)
{
sValue = s;
iValue = i;
}
public int hashCode()
{
return iValue;
}
public boolean equals(Object o)
{
return (o instanceof A) && equals((A) o);
}
protected boolean equals(A a)
{
return areEqual(a.sValue, sValue) && (a.iValue == iValue);
}
}
private static class B extends A
{
private long time;
B(int i, String s)
{
super(i, s);
time = System.currentTimeMillis();
}
protected boolean equals(A a)
{
return super.equals(a) && (a instanceof B) && (time == ((B) a).time);
}
}
private static class C extends HashMap implements Serializable
{ }
private static boolean areEqual(Object a, Object b)
{
return (a == null) ? (b == null) : a.equals(b);
}
private static String randomize(String in)
{
return in + System.currentTimeMillis();
}
}