blob: 0404762a7cdd2d0190eb304a6dd2c15ec1fde7e9 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.transport.stomp;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import java.io.IOException;
import java.io.StringReader;
import java.net.SocketTimeoutException;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import javax.jms.BytesMessage;
import javax.jms.Connection;
import javax.jms.JMSException;
import javax.jms.MapMessage;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.ObjectMessage;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.management.ObjectName;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.jmx.BrokerViewMBean;
import org.apache.activemq.broker.jmx.QueueViewMBean;
import org.apache.activemq.broker.region.AbstractSubscription;
import org.apache.activemq.broker.region.RegionBroker;
import org.apache.activemq.broker.region.Subscription;
import org.apache.activemq.broker.region.policy.PolicyEntry;
import org.apache.activemq.broker.region.policy.PolicyMap;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.command.ActiveMQTextMessage;
import org.apache.activemq.util.Wait;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.thoughtworks.xstream.XStream;
import com.thoughtworks.xstream.io.HierarchicalStreamReader;
import com.thoughtworks.xstream.io.json.JettisonMappedXmlDriver;
import com.thoughtworks.xstream.io.xml.XppReader;
import com.thoughtworks.xstream.io.xml.xppdom.XppFactory;
public class StompTest extends StompTestSupport {
private static final Logger LOG = LoggerFactory.getLogger(StompTest.class);
protected Connection connection;
protected Session session;
protected ActiveMQQueue queue;
protected XStream xstream;
private final String xmlObject = "<pojo>\n"
+ " <name>Dejan</name>\n"
+ " <city>Belgrade</city>\n"
+ "</pojo>";
private String xmlMap = "<map>\n"
+ " <entry>\n"
+ " <string>name</string>\n"
+ " <string>Dejan</string>\n"
+ " </entry>\n"
+ " <entry>\n"
+ " <string>city</string>\n"
+ " <string>Belgrade</string>\n"
+ " </entry>\n"
+ "</map>\n";
private final String jsonObject = "{\"pojo\":{"
+ "\"name\":\"Dejan\","
+ "\"city\":\"Belgrade\""
+ "}}";
private String jsonMap = "{\"map\":{"
+ "\"entry\":["
+ "{\"string\":[\"name\",\"Dejan\"]},"
+ "{\"string\":[\"city\",\"Belgrade\"]}"
+ "]"
+ "}}";
@Override
public void setUp() throws Exception {
// The order of the entries is different when using ibm jdk 5.
if (System.getProperty("java.vendor").equals("IBM Corporation")
&& System.getProperty("java.version").startsWith("1.5")) {
xmlMap = "<map>\n"
+ " <entry>\n"
+ " <string>city</string>\n"
+ " <string>Belgrade</string>\n"
+ " </entry>\n"
+ " <entry>\n"
+ " <string>name</string>\n"
+ " <string>Dejan</string>\n"
+ " </entry>\n"
+ "</map>\n";
jsonMap = "{\"map\":{"
+ "\"entry\":["
+ "{\"string\":[\"city\",\"Belgrade\"]},"
+ "{\"string\":[\"name\",\"Dejan\"]}"
+ "]"
+ "}}";
}
queue = new ActiveMQQueue(getQueueName());
super.setUp();
stompConnect();
connection = cf.createConnection("system", "manager");
session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
connection.start();
xstream = new XStream();
xstream.processAnnotations(SamplePojo.class);
xstream.allowTypes(new Class[] { SamplePojo.class });
}
@Override
public void applyBrokerPolicies() {
PolicyMap policyMap = new PolicyMap();
PolicyEntry persistRedelivery = new PolicyEntry();
persistRedelivery.setPersistJMSRedelivered(true);
policyMap.put(queue, persistRedelivery);
brokerService.setDestinationPolicy(policyMap);
}
@Override
public void tearDown() throws Exception {
try {
connection.close();
} catch(Exception e) {
// Some tests explicitly disconnect from stomp so can ignore
} finally {
super.tearDown();
}
}
public void sendMessage(String msg) throws Exception {
sendMessage(msg, "foo", "xyz");
}
public void sendMessage(String msg, String propertyName, String propertyValue) throws JMSException {
MessageProducer producer = session.createProducer(queue);
TextMessage message = session.createTextMessage(msg);
message.setStringProperty(propertyName, propertyValue);
producer.send(message);
}
public void sendBytesMessage(byte[] msg) throws Exception {
MessageProducer producer = session.createProducer(queue);
BytesMessage message = session.createBytesMessage();
message.writeBytes(msg);
producer.send(message);
}
@Test(timeout = 60000)
public void testConnect() throws Exception {
String connectFrame = "CONNECT\n" + "login:system\n" + "passcode:manager\n" + "request-id:1\n" + "\n" + Stomp.NULL;
stompConnection.sendFrame(connectFrame);
String f = stompConnection.receiveFrame();
assertTrue(f.startsWith("CONNECTED"));
assertTrue(f.indexOf("response-id:1") >= 0);
}
@Test(timeout = 60000)
public void testSendMessage() throws Exception {
MessageConsumer consumer = session.createConsumer(queue);
String frame = "CONNECT\n" + "login:system\n" + "passcode:manager\n\n" + Stomp.NULL;
stompConnection.sendFrame(frame);
frame = stompConnection.receiveFrame();
assertTrue(frame.startsWith("CONNECTED"));
frame = "SEND\n" + "destination:/queue/" + getQueueName() + "\n\n" + "Hello World" + Stomp.NULL;
stompConnection.sendFrame(frame);
TextMessage message = (TextMessage)consumer.receive(2500);
assertNotNull(message);
assertEquals("Hello World", message.getText());
// Make sure that the timestamp is valid - should
// be very close to the current time.
long tnow = System.currentTimeMillis();
long tmsg = message.getJMSTimestamp();
assertTrue(Math.abs(tnow - tmsg) < 1000);
}
@Test(timeout = 60000)
public void testJMSXGroupIdCanBeSet() throws Exception {
MessageConsumer consumer = session.createConsumer(queue);
String frame = "CONNECT\n" + "login:system\n" + "passcode:manager\n\n" + Stomp.NULL;
stompConnection.sendFrame(frame);
frame = stompConnection.receiveFrame();
assertTrue(frame.startsWith("CONNECTED"));
frame = "SEND\n" + "destination:/queue/" + getQueueName() + "\n" + "JMSXGroupID:TEST\n\n" + "Hello World" + Stomp.NULL;
stompConnection.sendFrame(frame);
TextMessage message = (TextMessage)consumer.receive(2500);
assertNotNull(message);
assertEquals("TEST", ((ActiveMQTextMessage)message).getGroupID());
}
@Test(timeout = 60000)
public void testSendMessageWithCustomHeadersAndSelector() throws Exception {
MessageConsumer consumer = session.createConsumer(queue, "foo = 'abc'");
String frame = "CONNECT\n" + "login:system\n" + "passcode:manager\n\n" + Stomp.NULL;
stompConnection.sendFrame(frame);
frame = stompConnection.receiveFrame();
assertTrue(frame.startsWith("CONNECTED"));
frame = "SEND\n" + "foo:abc\n" + "bar:123\n" + "destination:/queue/" + getQueueName() + "\n\n" + "Hello World" + Stomp.NULL;
stompConnection.sendFrame(frame);
TextMessage message = (TextMessage)consumer.receive(2500);
assertNotNull(message);
assertEquals("Hello World", message.getText());
assertEquals("foo", "abc", message.getStringProperty("foo"));
assertEquals("bar", "123", message.getStringProperty("bar"));
}
@Test(timeout = 60000)
public void testSendMessageWithDelay() throws Exception {
MessageConsumer consumer = session.createConsumer(queue);
String frame = "CONNECT\n" + "login:system\n" + "passcode:manager\n\n" + Stomp.NULL;
stompConnection.sendFrame(frame);
frame = stompConnection.receiveFrame();
assertTrue(frame.startsWith("CONNECTED"));
frame = "SEND\n" + "AMQ_SCHEDULED_DELAY:2000\n" + "destination:/queue/" + getQueueName() + "\n\n" + "Hello World" + Stomp.NULL;
stompConnection.sendFrame(frame);
TextMessage message = (TextMessage)consumer.receive(1000);
assertNull(message);
message = (TextMessage)consumer.receive(2500);
assertNotNull(message);
}
@Test(timeout = 60000)
public void testSendMessageWithStandardHeaders() throws Exception {
MessageConsumer consumer = session.createConsumer(queue);
String frame = "CONNECT\n" + "login:system\n" + "passcode:manager\n\n" + Stomp.NULL;
stompConnection.sendFrame(frame);
frame = stompConnection.receiveFrame();
assertTrue(frame.startsWith("CONNECTED"));
frame = "SEND\n" + "correlation-id:c123\n" + "priority:3\n" + "type:t345\n" + "JMSXGroupID:abc\n" + "foo:abc\n" + "bar:123\n" + "destination:/queue/" + getQueueName() + "\n\n" + "Hello World"
+ Stomp.NULL;
stompConnection.sendFrame(frame);
TextMessage message = (TextMessage)consumer.receive(2500);
assertNotNull(message);
assertEquals("Hello World", message.getText());
assertEquals("JMSCorrelationID", "c123", message.getJMSCorrelationID());
assertEquals("getJMSType", "t345", message.getJMSType());
assertEquals("getJMSPriority", 3, message.getJMSPriority());
assertEquals("foo", "abc", message.getStringProperty("foo"));
assertEquals("bar", "123", message.getStringProperty("bar"));
assertEquals("JMSXGroupID", "abc", message.getStringProperty("JMSXGroupID"));
ActiveMQTextMessage amqMessage = (ActiveMQTextMessage)message;
assertEquals("GroupID", "abc", amqMessage.getGroupID());
}
@Test(timeout = 60000)
public void testSendMessageWithNoPriorityReceivesDefault() throws Exception {
MessageConsumer consumer = session.createConsumer(queue);
String frame = "CONNECT\n" + "login:system\n" + "passcode:manager\n\n" + Stomp.NULL;
stompConnection.sendFrame(frame);
frame = stompConnection.receiveFrame();
assertTrue(frame.startsWith("CONNECTED"));
frame = "SEND\n" + "correlation-id:c123\n" + "destination:/queue/" + getQueueName() + "\n\n" + "Hello World"
+ Stomp.NULL;
stompConnection.sendFrame(frame);
TextMessage message = (TextMessage)consumer.receive(2500);
assertNotNull(message);
assertEquals("Hello World", message.getText());
assertEquals("getJMSPriority", 4, message.getJMSPriority());
}
@Test(timeout = 60000)
public void testSendFrameWithInvalidAction() throws Exception {
String frame = "CONNECT\n" + "login:system\n" + "passcode:manager\n\n" + Stomp.NULL;
stompConnection.sendFrame(frame);
frame = stompConnection.receiveFrame();
assertTrue(frame.startsWith("CONNECTED"));
final int connectionCount = getProxyToBroker().getCurrentConnectionsCount();
frame = "SED\n" + "AMQ_SCHEDULED_DELAY:2000\n" + "destination:/queue/" + getQueueName() + "\n\n" + "Hello World" + Stomp.NULL;
stompConnection.sendFrame(frame);
frame = stompConnection.receiveFrame();
assertTrue(frame.startsWith("ERROR"));
assertTrue("Should drop connection", Wait.waitFor(new Wait.Condition() {
@Override
public boolean isSatisified() throws Exception {
return connectionCount > getProxyToBroker().getCurrentConnectionsCount();
}
}));
}
@Test(timeout = 60000)
public void testReceipts() throws Exception {
StompConnection receiver = new StompConnection();
receiver.open(createSocket());
String frame = "CONNECT\n" + "login:system\n" + "passcode:manager\n\n" + Stomp.NULL;
receiver.sendFrame(frame);
frame = receiver.receiveFrame();
assertTrue(frame.startsWith("CONNECTED"));
frame = "SUBSCRIBE\n" + "destination:/queue/" + getQueueName() + "\n" + "ack:auto\n\n" + Stomp.NULL;
receiver.sendFrame(frame);
frame = "CONNECT\n" + "login:system\n" + "passcode:manager\n\n" + Stomp.NULL;
stompConnection.sendFrame(frame);
frame = stompConnection.receiveFrame();
assertTrue(frame.startsWith("CONNECTED"));
frame = "SEND\n" + "destination:/queue/" + getQueueName() + "\n" + "receipt: msg-1\n" + "\n\n" + "Hello World" + Stomp.NULL;
stompConnection.sendFrame(frame);
frame = receiver.receiveFrame();
assertTrue(frame.startsWith("MESSAGE"));
assertTrue("Stomp Message does not contain receipt request", frame.indexOf(Stomp.Headers.RECEIPT_REQUESTED) == -1);
frame = stompConnection.receiveFrame();
assertTrue(frame.startsWith("RECEIPT"));
assertTrue("Receipt contains correct receipt-id", frame.indexOf(Stomp.Headers.Response.RECEIPT_ID) >= 0);
frame = "DISCONNECT\n" + "receipt: dis-1\n" + "\n\n" + Stomp.NULL;
receiver.sendFrame(frame);
frame = receiver.receiveFrame();
assertTrue(frame.startsWith("RECEIPT"));
assertTrue("Receipt contains correct receipt-id", frame.indexOf(Stomp.Headers.Response.RECEIPT_ID) >= 0);
receiver.close();
MessageConsumer consumer = session.createConsumer(queue);
frame = "SEND\n" + "destination:/queue/" + getQueueName() + "\n" + "receipt: msg-1\n" + "\n\n" + "Hello World" + Stomp.NULL;
stompConnection.sendFrame(frame);
frame = stompConnection.receiveFrame();
assertTrue(frame.startsWith("RECEIPT"));
assertTrue("Receipt contains correct receipt-id", frame.indexOf(Stomp.Headers.Response.RECEIPT_ID) >= 0);
TextMessage message = (TextMessage)consumer.receive(10000);
assertNotNull(message);
assertNull("JMS Message does not contain receipt request", message.getStringProperty(Stomp.Headers.RECEIPT_REQUESTED));
}
@Test(timeout = 60000)
public void testSubscriptionReceipts() throws Exception {
final int done = 20;
int count = 0;
int receiptId = 0;
do {
StompConnection sender = new StompConnection();
sender.open(createSocket());
String frame = "CONNECT\n" + "login:system\n" + "passcode:manager\n\n" + Stomp.NULL;
sender.sendFrame(frame);
frame = sender.receiveFrame();
assertTrue(frame.startsWith("CONNECTED"));
frame = "SEND\n" + "destination:/queue/" + getQueueName() + "\n" + "receipt: " + (receiptId++) + "\n" + "Hello World:" + (count++) + "\n\n" + Stomp.NULL;
sender.sendFrame(frame);
frame = sender.receiveFrame();
assertTrue("" + frame, frame.startsWith("RECEIPT"));
sender.disconnect();
StompConnection receiver = new StompConnection();
receiver.open(createSocket());
frame = "CONNECT\n" + "login:system\n" + "passcode:manager\n\n" + Stomp.NULL;
receiver.sendFrame(frame);
frame = receiver.receiveFrame();
assertTrue(frame.startsWith("CONNECTED"));
frame = "SUBSCRIBE\n" + "destination:/queue/" + getQueueName() + "\n" + "receipt: " + (receiptId++) + "\n\n" + Stomp.NULL;
receiver.sendFrame(frame);
frame = receiver.receiveFrame();
assertTrue("" + frame, frame.startsWith("RECEIPT"));
assertTrue("Receipt contains receipt-id", frame.indexOf(Stomp.Headers.Response.RECEIPT_ID) >= 0);
frame = receiver.receiveFrame();
assertTrue("" + frame, frame.startsWith("MESSAGE"));
// remove suscription so we don't hang about and get next message
frame = "UNSUBSCRIBE\n" + "destination:/queue/" + getQueueName() + "\n" + "receipt: " + (receiptId++) + "\n\n" + Stomp.NULL;
receiver.sendFrame(frame);
frame = receiver.receiveFrame();
assertTrue("" + frame, frame.startsWith("RECEIPT"));
receiver.disconnect();
} while (count < done);
}
@Test(timeout = 60000)
public void testSubscribeWithAutoAck() throws Exception {
String frame = "CONNECT\n" + "login:system\n" + "passcode:manager\n\n" + Stomp.NULL;
stompConnection.sendFrame(frame);
frame = stompConnection.receiveFrame();
assertTrue(frame.startsWith("CONNECTED"));
frame = "SUBSCRIBE\n" + "destination:/queue/" + getQueueName() + "\n" + "ack:auto\n\n" + Stomp.NULL;
stompConnection.sendFrame(frame);
sendMessage(name.getMethodName());
frame = stompConnection.receiveFrame();
assertTrue(frame.startsWith("MESSAGE"));
}
@Test(timeout = 60000)
public void testSubscribeWithAutoAckAndBytesMessage() throws Exception {
String frame = "CONNECT\n" + "login:system\n" + "passcode:manager\n\n" + Stomp.NULL;
stompConnection.sendFrame(frame);
frame = stompConnection.receiveFrame();
assertTrue(frame.startsWith("CONNECTED"));
frame = "SUBSCRIBE\n" + "destination:/queue/" + getQueueName() + "\n" + "ack:auto\n\n" + Stomp.NULL;
stompConnection.sendFrame(frame);
sendBytesMessage(new byte[] {
1, 2, 3, 4, 5
});
frame = stompConnection.receiveFrame();
assertTrue(frame.startsWith("MESSAGE"));
Pattern cl = Pattern.compile("Content-length:\\s*(\\d+)", Pattern.CASE_INSENSITIVE);
Matcher clMmatcher = cl.matcher(frame);
assertTrue(clMmatcher.find());
assertEquals("5", clMmatcher.group(1));
assertFalse(Pattern.compile("type:\\s*null", Pattern.CASE_INSENSITIVE).matcher(frame).find());
}
@Test(timeout = 60000)
public void testBytesMessageWithNulls() throws Exception {
String frame = "CONNECT\n" + "login:system\n" + "passcode:manager\n\n" + Stomp.NULL;
stompConnection.sendFrame(frame);
frame = stompConnection.receiveFrame();
assertTrue(frame.startsWith("CONNECTED"));
frame = "SEND\n" + "destination:/queue/" + getQueueName() + "\ncontent-length:5" + " \n\n" + "\u0001\u0002\u0000\u0004\u0005" + Stomp.NULL;
stompConnection.sendFrame(frame);
frame = "SUBSCRIBE\n" + "destination:/queue/" + getQueueName() + "\n" + "ack:auto\n\n" + Stomp.NULL;
stompConnection.sendFrame(frame);
StompFrame message = stompConnection.receive();
assertTrue(message.getAction().startsWith("MESSAGE"));
String length = message.getHeaders().get("content-length");
assertEquals("5", length);
assertEquals(5, message.getContent().length);
}
@Test(timeout = 60000)
public void testSendMultipleBytesMessages() throws Exception {
final int MSG_COUNT = 50;
String frame = "CONNECT\n" + "login:system\n" + "passcode:manager\n\n" + Stomp.NULL;
stompConnection.sendFrame(frame);
frame = stompConnection.receiveFrame();
assertTrue(frame.startsWith("CONNECTED"));
for( int ix = 0; ix < MSG_COUNT; ix++) {
frame = "SEND\n" + "destination:/queue/" + getQueueName() + "\ncontent-length:5" + " \n\n" + "\u0001\u0002\u0000\u0004\u0005" + Stomp.NULL;
stompConnection.sendFrame(frame);
}
frame = "SUBSCRIBE\n" + "destination:/queue/" + getQueueName() + "\n" + "ack:auto\n\n" + Stomp.NULL;
stompConnection.sendFrame(frame);
for( int ix = 0; ix < MSG_COUNT; ix++) {
StompFrame message = stompConnection.receive();
assertTrue(message.getAction().startsWith("MESSAGE"));
String length = message.getHeaders().get("content-length");
assertEquals("5", length);
assertEquals(5, message.getContent().length);
}
}
@Test(timeout = 60000)
public void testSubscribeWithMessageSentWithProperties() throws Exception {
String frame = "CONNECT\n" + "login:system\n" + "passcode:manager\n\n" + Stomp.NULL;
stompConnection.sendFrame(frame);
frame = stompConnection.receiveFrame();
assertTrue(frame.startsWith("CONNECTED"));
frame = "SUBSCRIBE\n" + "destination:/queue/" + getQueueName() + "\n" + "ack:auto\n\n" + Stomp.NULL;
stompConnection.sendFrame(frame);
MessageProducer producer = session.createProducer(queue);
TextMessage message = session.createTextMessage("Hello World");
message.setStringProperty("s", "value");
message.setBooleanProperty("n", false);
message.setByteProperty("byte", (byte)9);
message.setDoubleProperty("d", 2.0);
message.setFloatProperty("f", (float)6.0);
message.setIntProperty("i", 10);
message.setLongProperty("l", 121);
message.setShortProperty("s", (short)12);
producer.send(message);
frame = stompConnection.receiveFrame();
assertTrue(frame.startsWith("MESSAGE"));
}
@Test(timeout = 60000)
public void testMessagesAreInOrder() throws Exception {
int ctr = 10;
String[] data = new String[ctr];
String frame = "CONNECT\n" + "login:system\n" + "passcode:manager\n\n" + Stomp.NULL;
stompConnection.sendFrame(frame);
frame = stompConnection.receiveFrame();
assertTrue(frame.startsWith("CONNECTED"));
frame = "SUBSCRIBE\n" + "destination:/queue/" + getQueueName() + "\n" + "ack:auto\n\n" + Stomp.NULL;
stompConnection.sendFrame(frame);
for (int i = 0; i < ctr; ++i) {
data[i] = getName() + i;
sendMessage(data[i]);
}
for (int i = 0; i < ctr; ++i) {
frame = stompConnection.receiveFrame();
assertTrue("Message not in order", frame.indexOf(data[i]) >= 0);
}
// sleep a while before publishing another set of messages
TimeUnit.MILLISECONDS.sleep(500);
for (int i = 0; i < ctr; ++i) {
data[i] = getName() + ":second:" + i;
sendMessage(data[i]);
}
for (int i = 0; i < ctr; ++i) {
frame = stompConnection.receiveFrame();
assertTrue("Message not in order", frame.indexOf(data[i]) >= 0);
}
}
@Test(timeout = 60000)
public void testSubscribeWithAutoAckAndSelector() throws Exception {
String frame = "CONNECT\n" + "login:system\n" + "passcode:manager\n\n" + Stomp.NULL;
stompConnection.sendFrame(frame);
frame = stompConnection.receiveFrame();
assertTrue(frame.startsWith("CONNECTED"));
frame = "SUBSCRIBE\n" + "destination:/queue/" + getQueueName() + "\n" + "selector: foo = 'zzz'\n" + "ack:auto\n\n" + Stomp.NULL;
stompConnection.sendFrame(frame);
sendMessage("Ignored message", "foo", "1234");
sendMessage("Real message", "foo", "zzz");
frame = stompConnection.receiveFrame();
assertTrue(frame.startsWith("MESSAGE"));
assertTrue("Should have received the real message but got: " + frame, frame.indexOf("Real message") > 0);
}
@Test(timeout = 60000)
public void testSubscribeWithAutoAckAndNumericSelector() throws Exception {
String frame = "CONNECT\n" + "login:system\n" + "passcode:manager\n\n" + Stomp.NULL;
stompConnection.sendFrame(frame);
frame = stompConnection.receiveFrame();
assertTrue(frame.startsWith("CONNECTED"));
frame = "SUBSCRIBE\n" + "destination:/queue/" + getQueueName() + "\n" + "selector: foo = 42\n" + "ack:auto\n\n" + Stomp.NULL;
stompConnection.sendFrame(frame);
// Ignored
frame = "SEND\n" + "foo:abc\n" + "destination:/queue/" + getQueueName() + "\n\n" + "Ignored Message" + Stomp.NULL;
stompConnection.sendFrame(frame);
// Matches
frame = "SEND\n" + "foo:42\n" + "destination:/queue/" + getQueueName() + "\n\n" + "Real Message" + Stomp.NULL;
stompConnection.sendFrame(frame);
frame = stompConnection.receiveFrame();
assertTrue(frame.startsWith("MESSAGE"));
assertTrue("Should have received the real message but got: " + frame, frame.indexOf("Real Message") > 0);
}
@Test(timeout = 60000)
public void testSubscribeWithAutoAckAndBooleanSelector() throws Exception {
String frame = "CONNECT\n" + "login:system\n" + "passcode:manager\n\n" + Stomp.NULL;
stompConnection.sendFrame(frame);
frame = stompConnection.receiveFrame();
assertTrue(frame.startsWith("CONNECTED"));
frame = "SUBSCRIBE\n" + "destination:/queue/" + getQueueName() + "\n" + "selector: foo = true\n" + "ack:auto\n\n" + Stomp.NULL;
stompConnection.sendFrame(frame);
// Ignored
frame = "SEND\n" + "foo:false\n" + "destination:/queue/" + getQueueName() + "\n\n" + "Ignored Message" + Stomp.NULL;
stompConnection.sendFrame(frame);
// Matches
frame = "SEND\n" + "foo:true\n" + "destination:/queue/" + getQueueName() + "\n\n" + "Real Message" + Stomp.NULL;
stompConnection.sendFrame(frame);
frame = stompConnection.receiveFrame();
assertTrue(frame.startsWith("MESSAGE"));
assertTrue("Should have received the real message but got: " + frame, frame.indexOf("Real Message") > 0);
}
@Test(timeout = 60000)
public void testSubscribeWithAutoAckAnFloatSelector() throws Exception {
String frame = "CONNECT\n" + "login:system\n" + "passcode:manager\n\n" + Stomp.NULL;
stompConnection.sendFrame(frame);
frame = stompConnection.receiveFrame();
assertTrue(frame.startsWith("CONNECTED"));
frame = "SUBSCRIBE\n" + "destination:/queue/" + getQueueName() + "\n" + "selector: foo = 3.14159\n" + "ack:auto\n\n" + Stomp.NULL;
stompConnection.sendFrame(frame);
// Ignored
frame = "SEND\n" + "foo:6.578\n" + "destination:/queue/" + getQueueName() + "\n\n" + "Ignored Message" + Stomp.NULL;
stompConnection.sendFrame(frame);
// Matches
frame = "SEND\n" + "foo:3.14159\n" + "destination:/queue/" + getQueueName() + "\n\n" + "Real Message" + Stomp.NULL;
stompConnection.sendFrame(frame);
frame = stompConnection.receiveFrame();
assertTrue(frame.startsWith("MESSAGE"));
assertTrue("Should have received the real message but got: " + frame, frame.indexOf("Real Message") > 0);
}
@Test(timeout = 60000)
public void testSubscribeWithClientAck() throws Exception {
String frame = "CONNECT\n" + "login:system\n" + "passcode:manager\n\n" + Stomp.NULL;
stompConnection.sendFrame(frame);
frame = stompConnection.receiveFrame();
assertTrue(frame.startsWith("CONNECTED"));
frame = "SUBSCRIBE\n" + "destination:/queue/" + getQueueName() + "\n" + "ack:client\n\n" + Stomp.NULL;
stompConnection.sendFrame(frame);
sendMessage(getName());
frame = stompConnection.receiveFrame();
assertTrue(frame.startsWith("MESSAGE"));
stompDisconnect();
// message should be received since message was not acknowledged
MessageConsumer consumer = session.createConsumer(queue);
TextMessage message = (TextMessage)consumer.receive(2500);
assertNotNull(message);
assertTrue(message.getJMSRedelivered());
}
@Test(timeout = 60000)
public void testSubscribeWithClientAckedAndContentLength() throws Exception {
String frame = "CONNECT\n" + "login:system\n" + "passcode:manager\n\n" + Stomp.NULL;
stompConnection.sendFrame(frame);
frame = stompConnection.receiveFrame();
assertTrue(frame.startsWith("CONNECTED"));
frame = "SUBSCRIBE\n" + "destination:/queue/" + getQueueName() + "\n" + "ack:client\n\n" + Stomp.NULL;
stompConnection.sendFrame(frame);
sendMessage(getName());
StompFrame msg = stompConnection.receive();
assertTrue(msg.getAction().equals("MESSAGE"));
HashMap<String, String> ackHeaders = new HashMap<String, String>();
ackHeaders.put("message-id", msg.getHeaders().get("message-id"));
ackHeaders.put("content-length", "8511");
StompFrame ack = new StompFrame("ACK", ackHeaders);
stompConnection.sendFrame(ack.format());
final QueueViewMBean queueView = getProxyToQueue(getQueueName());
assertTrue("dequeue complete", Wait.waitFor(new Wait.Condition(){
@Override
public boolean isSatisified() throws Exception {
LOG.info("queueView, enqueue:" + queueView.getEnqueueCount() +", dequeue:" + queueView.getDequeueCount() + ", inflight:" + queueView.getInFlightCount());
return queueView.getDequeueCount() == 1;
}
}, TimeUnit.SECONDS.toMillis(5), TimeUnit.MILLISECONDS.toMillis(25)));
stompDisconnect();
// message should not be received since it was acknowledged
MessageConsumer consumer = session.createConsumer(queue);
TextMessage message = (TextMessage)consumer.receive(500);
assertNull(message);
}
@Test(timeout = 60000)
public void testUnsubscribe() throws Exception {
String frame = "CONNECT\n" + "login:system\n" + "passcode:manager\n\n" + Stomp.NULL;
stompConnection.sendFrame(frame);
frame = stompConnection.receiveFrame();
assertTrue(frame.startsWith("CONNECTED"));
frame = "SUBSCRIBE\n" + "destination:/queue/" + getQueueName() + "\n" + "ack:auto\n\n" + Stomp.NULL;
stompConnection.sendFrame(frame);
// send a message to our queue
sendMessage("first message");
// receive message from socket
frame = stompConnection.receiveFrame();
assertTrue(frame.startsWith("MESSAGE"));
// remove suscription
frame = "UNSUBSCRIBE\n" + "destination:/queue/" + getQueueName() + "\n" + "receipt:1" + "\n\n" + Stomp.NULL;
stompConnection.sendFrame(frame);
frame = stompConnection.receiveFrame();
assertTrue("" + frame, frame.startsWith("RECEIPT"));
// send a message to our queue
sendMessage("second message");
try {
frame = stompConnection.receiveFrame(500);
LOG.info("Received frame: " + frame);
fail("No message should have been received since subscription was removed");
} catch (SocketTimeoutException e) {
}
}
@Test(timeout = 60000)
public void testTransactionCommit() throws Exception {
MessageConsumer consumer = session.createConsumer(queue);
String frame = "CONNECT\n" + "login:system\n" + "passcode:manager\n\n" + Stomp.NULL;
stompConnection.sendFrame(frame);
String f = stompConnection.receiveFrame();
assertTrue(f.startsWith("CONNECTED"));
frame = "BEGIN\n" + "transaction: tx1\n" + "\n\n" + Stomp.NULL;
stompConnection.sendFrame(frame);
frame = "SEND\n" + "destination:/queue/" + getQueueName() + "\n" + "transaction: tx1\n" + "\n\n" + "Hello World" + Stomp.NULL;
stompConnection.sendFrame(frame);
frame = "COMMIT\n" + "transaction: tx1\n" + "\n\n" + Stomp.NULL;
stompConnection.sendFrame(frame);
TextMessage message = (TextMessage)consumer.receive(10000);
assertNotNull("Should have received a message", message);
}
@Test(timeout = 60000)
public void testTransactionRollback() throws Exception {
MessageConsumer consumer = session.createConsumer(queue);
String frame = "CONNECT\n" + "login:system\n" + "passcode:manager\n\n" + Stomp.NULL;
stompConnection.sendFrame(frame);
String f = stompConnection.receiveFrame();
assertTrue(f.startsWith("CONNECTED"));
frame = "BEGIN\n" + "transaction: tx1\n" + "\n\n" + Stomp.NULL;
stompConnection.sendFrame(frame);
frame = "SEND\n" + "destination:/queue/" + getQueueName() + "\n" + "transaction: tx1\n" + "\n" + "first message" + Stomp.NULL;
stompConnection.sendFrame(frame);
// rollback first message
frame = "ABORT\n" + "transaction: tx1\n" + "\n\n" + Stomp.NULL;
stompConnection.sendFrame(frame);
frame = "BEGIN\n" + "transaction: tx1\n" + "\n\n" + Stomp.NULL;
stompConnection.sendFrame(frame);
frame = "SEND\n" + "destination:/queue/" + getQueueName() + "\n" + "transaction: tx1\n" + "\n" + "second message" + Stomp.NULL;
stompConnection.sendFrame(frame);
frame = "COMMIT\n" + "transaction: tx1\n" + "\n\n" + Stomp.NULL;
stompConnection.sendFrame(frame);
// only second msg should be received since first msg was rolled back
TextMessage message = (TextMessage)consumer.receive(10000);
assertNotNull(message);
assertEquals("second message", message.getText().trim());
}
@Test(timeout = 60000)
public void testDisconnectedClientsAreRemovedFromTheBroker() throws Exception {
assertClients(1);
String frame = "CONNECT\n" + "login:system\n" + "passcode:manager\n\n" + Stomp.NULL;
stompConnection.sendFrame(frame);
assertClients(2);
// now lets kill the stomp connection
stompConnection.close();
assertClients(1);
}
@Test(timeout = 60000)
public void testConnectNotAuthenticatedWrongUser() throws Exception {
String frame = "CONNECT\n" + "login: dejanb\n" + "passcode:manager\n\n" + Stomp.NULL;
stompConnection.sendFrame(frame);
try {
String f = stompConnection.receiveFrame();
assertTrue(f.startsWith("ERROR"));
assertFalse("no stack trace impl leak:" + f, f.contains("at "));
} catch (IOException socketMayBeClosedFirstByBroker) {}
}
@Test(timeout = 60000)
public void testConnectNotAuthenticatedWrongPassword() throws Exception {
String frame = "CONNECT\n" + "login:system\n" + "passcode: dejanb\n\n" + Stomp.NULL;
stompConnection.sendFrame(frame);
try {
String f = stompConnection.receiveFrame();
assertTrue(f.startsWith("ERROR"));
assertFalse("no stack trace impl leak:" + f, f.contains("at "));
} catch (IOException socketMayBeClosedFirstByBroker) {}
}
@Test(timeout = 60000)
public void testSendNotAuthorized() throws Exception {
String frame = "CONNECT\n" + "login:guest\n" + "passcode:password\n\n" + Stomp.NULL;
stompConnection.sendFrame(frame);
frame = stompConnection.receiveFrame();
assertTrue(frame.startsWith("CONNECTED"));
frame = "SEND\n" + "destination:/queue/USERS." + getQueueName() + "\n\n" + "Hello World" + Stomp.NULL;
stompConnection.sendFrame(frame);
String f = stompConnection.receiveFrame();
assertTrue(f.startsWith("ERROR"));
assertFalse("no stack trace impl leak:" + f, f.contains("at "));
}
@Test(timeout = 60000)
public void testSubscribeNotAuthorized() throws Exception {
String frame = "CONNECT\n" + "login:guest\n" + "passcode:password\n\n" + Stomp.NULL;
stompConnection.sendFrame(frame);
frame = stompConnection.receiveFrame();
assertTrue(frame.startsWith("CONNECTED"));
frame = "SUBSCRIBE\n" + "destination:/queue/USERS." + getQueueName() + "\n" + "ack:auto\n\n" + Stomp.NULL;
stompConnection.sendFrame(frame);
frame = stompConnection.receiveFrame();
assertTrue(frame.startsWith("ERROR"));
assertFalse("no stack trace impl leak:" + frame, frame.contains("at "));
}
@Test(timeout = 60000)
public void testSubscribeWithReceiptNotAuthorized() throws Exception {
String frame = "CONNECT\n" + "login:guest\n" + "passcode:password\n\n" + Stomp.NULL;
stompConnection.sendFrame(frame);
frame = stompConnection.receiveFrame();
assertTrue(frame.startsWith("CONNECTED"));
frame = "SUBSCRIBE\n" + "destination:/queue/USERS." + getQueueName() + "\n" +
"ack:auto\n" + "receipt:1\n" + "\n" + Stomp.NULL;
stompConnection.sendFrame(frame);
frame = stompConnection.receiveFrame();
assertTrue(frame.startsWith("ERROR"));
assertTrue("Error Frame did not contain receipt-id", frame.indexOf(Stomp.Headers.Response.RECEIPT_ID) >= 0);
assertFalse("no stack trace impl leak:" + frame, frame.contains("at "));
}
@Test(timeout = 60000)
public void testSubscribeWithInvalidSelector() throws Exception {
String frame = "CONNECT\n" + "login:system\n" + "passcode:manager\n\n" + Stomp.NULL;
stompConnection.sendFrame(frame);
frame = stompConnection.receiveFrame();
assertTrue(frame.startsWith("CONNECTED"));
frame = "SUBSCRIBE\n" + "destination:/queue/" + getQueueName() + "\n" + "selector:foo.bar = 1\n" + "ack:auto\n\n" + Stomp.NULL;
stompConnection.sendFrame(frame);
frame = stompConnection.receiveFrame();
assertTrue(frame.startsWith("ERROR"));
}
@Test(timeout = 60000)
public void testTransformationUnknownTranslator() throws Exception {
MessageConsumer consumer = session.createConsumer(queue);
String frame = "CONNECT\n" + "login:system\n" + "passcode:manager\n\n" + Stomp.NULL;
stompConnection.sendFrame(frame);
frame = stompConnection.receiveFrame();
assertTrue(frame.startsWith("CONNECTED"));
frame = "SEND\n" + "destination:/queue/" + getQueueName() + "\n" + "transformation:test" + "\n\n" + "Hello World" + Stomp.NULL;
stompConnection.sendFrame(frame);
TextMessage message = (TextMessage)consumer.receive(2500);
assertNotNull(message);
assertEquals("Hello World", message.getText());
}
@Test(timeout = 60000)
public void testTransformationFailed() throws Exception {
MessageConsumer consumer = session.createConsumer(queue);
String frame = "CONNECT\n" + "login:system\n" + "passcode:manager\n\n" + Stomp.NULL;
stompConnection.sendFrame(frame);
frame = stompConnection.receiveFrame();
assertTrue(frame.startsWith("CONNECTED"));
frame = "SEND\n" + "destination:/queue/" + getQueueName() + "\n" + "transformation:" + Stomp.Transformations.JMS_OBJECT_XML + "\n\n" + "Hello World" + Stomp.NULL;
stompConnection.sendFrame(frame);
TextMessage message = (TextMessage)consumer.receive(2500);
assertNotNull(message);
assertNotNull(message.getStringProperty(Stomp.Headers.TRANSFORMATION_ERROR));
assertEquals("Hello World", message.getText());
}
@Test(timeout = 60000)
public void testTransformationSendXMLObject() throws Exception {
MessageConsumer consumer = session.createConsumer(queue);
String frame = "CONNECT\n" + "login:system\n" + "passcode:manager\n\n" + Stomp.NULL;
stompConnection.sendFrame(frame);
frame = stompConnection.receiveFrame();
assertTrue(frame.startsWith("CONNECTED"));
frame = "SEND\n" + "destination:/queue/" + getQueueName() + "\n" + "transformation:" + Stomp.Transformations.JMS_OBJECT_XML + "\n\n" + xmlObject + Stomp.NULL;
stompConnection.sendFrame(frame);
Message message = consumer.receive(2500);
assertNotNull(message);
LOG.info("Broke sent: {}", message);
assertTrue(message instanceof ObjectMessage);
ObjectMessage objectMessage = (ObjectMessage)message;
SamplePojo object = (SamplePojo)objectMessage.getObject();
assertEquals("Dejan", object.getName());
}
@Test(timeout = 60000)
public void testTransformationSendJSONObject() throws Exception {
MessageConsumer consumer = session.createConsumer(queue);
String frame = "CONNECT\n" + "login:system\n" + "passcode:manager\n\n" + Stomp.NULL;
stompConnection.sendFrame(frame);
frame = stompConnection.receiveFrame();
assertTrue(frame.startsWith("CONNECTED"));
frame = "SEND\n" + "destination:/queue/" + getQueueName() + "\n" + "transformation:" + Stomp.Transformations.JMS_OBJECT_JSON + "\n\n" + jsonObject + Stomp.NULL;
stompConnection.sendFrame(frame);
ObjectMessage message = (ObjectMessage)consumer.receive(2500);
assertNotNull(message);
SamplePojo object = (SamplePojo)message.getObject();
assertEquals("Dejan", object.getName());
}
@Test(timeout = 60000)
public void testTransformationSubscribeXML() throws Exception {
MessageProducer producer = session.createProducer(new ActiveMQQueue("USERS." + getQueueName()));
ObjectMessage message = session.createObjectMessage(new SamplePojo("Dejan", "Belgrade"));
producer.send(message);
String frame = "CONNECT\n" + "login:system\n" + "passcode:manager\n\n" + Stomp.NULL;
stompConnection.sendFrame(frame);
frame = stompConnection.receiveFrame();
assertTrue(frame.startsWith("CONNECTED"));
frame = "SUBSCRIBE\n" + "destination:/queue/USERS." + getQueueName() + "\n" + "ack:auto" + "\n" + "transformation:" + Stomp.Transformations.JMS_OBJECT_XML + "\n\n" + Stomp.NULL;
stompConnection.sendFrame(frame);
frame = stompConnection.receiveFrame();
assertTrue(frame.trim().endsWith(xmlObject));
}
@Test(timeout = 60000)
public void testTransformationReceiveJSONObject() throws Exception {
MessageProducer producer = session.createProducer(new ActiveMQQueue("USERS." + getQueueName()));
ObjectMessage message = session.createObjectMessage(new SamplePojo("Dejan", "Belgrade"));
producer.send(message);
String frame = "CONNECT\n" + "login:system\n" + "passcode:manager\n\n" + Stomp.NULL;
stompConnection.sendFrame(frame);
frame = stompConnection.receiveFrame();
assertTrue(frame.startsWith("CONNECTED"));
frame = "SUBSCRIBE\n" + "destination:/queue/USERS." + getQueueName() + "\n" + "ack:auto" + "\n" + "transformation:" + Stomp.Transformations.JMS_OBJECT_JSON + "\n\n" + Stomp.NULL;
stompConnection.sendFrame(frame);
frame = stompConnection.receiveFrame();
assertTrue(frame.trim().endsWith(jsonObject));
}
@Test(timeout = 60000)
public void testTransformationReceiveXMLObject() throws Exception {
MessageProducer producer = session.createProducer(new ActiveMQQueue("USERS." + getQueueName()));
ObjectMessage message = session.createObjectMessage(new SamplePojo("Dejan", "Belgrade"));
producer.send(message);
String frame = "CONNECT\n" + "login:system\n" + "passcode:manager\n\n" + Stomp.NULL;
stompConnection.sendFrame(frame);
frame = stompConnection.receiveFrame();
assertTrue(frame.startsWith("CONNECTED"));
frame = "SUBSCRIBE\n" + "destination:/queue/USERS." + getQueueName() + "\n" + "ack:auto" + "\n" + "transformation:" + Stomp.Transformations.JMS_OBJECT_XML + "\n\n" + Stomp.NULL;
stompConnection.sendFrame(frame);
frame = stompConnection.receiveFrame();
assertTrue(frame.trim().endsWith(xmlObject));
}
@Test(timeout = 60000)
public void testTransformationReceiveObject() throws Exception {
MessageProducer producer = session.createProducer(new ActiveMQQueue("USERS." + getQueueName()));
ObjectMessage message = session.createObjectMessage(new SamplePojo("Dejan", "Belgrade"));
producer.send(message);
String frame = "CONNECT\n" + "login:system\n" + "passcode:manager\n\n" + Stomp.NULL;
stompConnection.sendFrame(frame);
frame = stompConnection.receiveFrame();
assertTrue(frame.startsWith("CONNECTED"));
frame = "SUBSCRIBE\n" + "destination:/queue/USERS." + getQueueName() + "\n" + "ack:auto" + "\n" + "transformation:" + Stomp.Transformations.JMS_OBJECT_XML + "\n\n" + Stomp.NULL;
stompConnection.sendFrame(frame);
frame = stompConnection.receiveFrame();
assertTrue(frame.trim().endsWith(xmlObject));
}
@Test(timeout = 60000)
public void testTransformationReceiveXMLObjectAndMap() throws Exception {
MessageProducer producer = session.createProducer(new ActiveMQQueue("USERS." + getQueueName()));
ObjectMessage objMessage = session.createObjectMessage(new SamplePojo("Dejan", "Belgrade"));
producer.send(objMessage);
MapMessage mapMessage = session.createMapMessage();
mapMessage.setString("name", "Dejan");
mapMessage.setString("city", "Belgrade");
producer.send(mapMessage);
String frame = "CONNECT\n" + "login:system\n" + "passcode:manager\n\n" + Stomp.NULL;
stompConnection.sendFrame(frame);
frame = stompConnection.receiveFrame();
assertTrue(frame.startsWith("CONNECTED"));
frame = "SUBSCRIBE\n" + "destination:/queue/USERS." + getQueueName() + "\n" + "ack:auto" + "\n" + "transformation:" + Stomp.Transformations.JMS_XML + "\n\n" + Stomp.NULL;
stompConnection.sendFrame(frame);
frame = stompConnection.receiveFrame();
assertTrue(frame.trim().endsWith(xmlObject));
StompFrame xmlFrame = stompConnection.receive();
Map<String, String> map = createMapFromXml(xmlFrame.getBody());
assertTrue(map.containsKey("name"));
assertTrue(map.containsKey("city"));
assertTrue(map.get("name").equals("Dejan"));
assertTrue(map.get("city").equals("Belgrade"));
}
@Test(timeout = 60000)
public void testTransformationReceiveJSONObjectAndMap() throws Exception {
MessageProducer producer = session.createProducer(new ActiveMQQueue("USERS." + getQueueName()));
ObjectMessage objMessage = session.createObjectMessage(new SamplePojo("Dejan", "Belgrade"));
producer.send(objMessage);
MapMessage mapMessage = session.createMapMessage();
mapMessage.setString("name", "Dejan");
mapMessage.setString("city", "Belgrade");
producer.send(mapMessage);
String frame = "CONNECT\n" + "login:system\n" + "passcode:manager\n\n" + Stomp.NULL;
stompConnection.sendFrame(frame);
frame = stompConnection.receiveFrame();
assertTrue(frame.startsWith("CONNECTED"));
frame = "SUBSCRIBE\n" + "destination:/queue/USERS." + getQueueName() + "\n" + "ack:auto" + "\n" + "transformation:" + Stomp.Transformations.JMS_JSON + "\n\n" + Stomp.NULL;
stompConnection.sendFrame(frame);
StompFrame json = stompConnection.receive();
LOG.info("Transformed frame: {}", json);
SamplePojo pojo = createObjectFromJson(json.getBody());
assertTrue(pojo.getCity().equals("Belgrade"));
assertTrue(pojo.getName().equals("Dejan"));
json = stompConnection.receive();
LOG.info("Transformed frame: {}", json);
Map<String, String> map = createMapFromJson(json.getBody());
assertTrue(map.containsKey("name"));
assertTrue(map.containsKey("city"));
assertTrue(map.get("name").equals("Dejan"));
assertTrue(map.get("city").equals("Belgrade"));
}
@Test(timeout = 60000)
public void testTransformationSendAndReceiveXmlMap() throws Exception {
String frame = "CONNECT\n" + "login:system\n" + "passcode:manager\n\n" + Stomp.NULL;
stompConnection.sendFrame(frame);
frame = stompConnection.receiveFrame();
assertTrue(frame.startsWith("CONNECTED"));
frame = "SUBSCRIBE\n" + "destination:/queue/" + getQueueName() + "\n" + "ack:auto" + "\n" + "transformation:" + Stomp.Transformations.JMS_XML + "\n\n" + Stomp.NULL;
stompConnection.sendFrame(frame);
frame = "SEND\n" + "destination:/queue/" + getQueueName() + "\n" + "transformation:" + Stomp.Transformations.JMS_MAP_JSON + "\n\n" + jsonMap + Stomp.NULL;
stompConnection.sendFrame(frame);
StompFrame xmlFrame = stompConnection.receive();
LOG.info("Received Frame: {}", xmlFrame.getBody());
Map<String, String> map = createMapFromXml(xmlFrame.getBody());
assertTrue(map.containsKey("name"));
assertTrue(map.containsKey("city"));
assertEquals("Dejan", map.get("name"));
assertEquals("Belgrade", map.get("city"));
assertTrue(xmlFrame.getHeaders().containsValue("jms-map-xml"));
}
@Test(timeout = 60000)
public void testTransformationSendAndReceiveJsonMap() throws Exception {
String frame = "CONNECT\n" + "login:system\n" + "passcode:manager\n\n" + Stomp.NULL;
stompConnection.sendFrame(frame);
frame = stompConnection.receiveFrame();
assertTrue(frame.startsWith("CONNECTED"));
frame = "SUBSCRIBE\n" + "destination:/queue/" + getQueueName() + "\n" + "ack:auto" + "\n" + "transformation:" + Stomp.Transformations.JMS_JSON + "\n\n" + Stomp.NULL;
stompConnection.sendFrame(frame);
frame = "SEND\n" + "destination:/queue/" + getQueueName() + "\n" + "transformation:" + Stomp.Transformations.JMS_MAP_XML + "\n\n" + xmlMap + Stomp.NULL;
stompConnection.sendFrame(frame);
StompFrame json = stompConnection.receive();
LOG.info("Received Frame: {}", json.getBody());
assertNotNull(json);
assertTrue(json.getHeaders().containsValue("jms-map-json"));
Map<String, String> map = createMapFromJson(json.getBody());
assertTrue(map.containsKey("name"));
assertTrue(map.containsKey("city"));
assertEquals("Dejan", map.get("name"));
assertEquals("Belgrade", map.get("city"));
}
@Test(timeout = 60000)
public void testTransformationReceiveBytesMessage() throws Exception {
MessageProducer producer = session.createProducer(new ActiveMQQueue("USERS." + getQueueName()));
BytesMessage message = session.createBytesMessage();
message.writeBytes(new byte[]{1, 2, 3, 4, 5});
producer.send(message);
String frame = "CONNECT\n" + "login:system\n" + "passcode:manager\n\n" + Stomp.NULL;
stompConnection.sendFrame(frame);
frame = stompConnection.receiveFrame();
assertTrue(frame.startsWith("CONNECTED"));
frame = "SUBSCRIBE\n" + "destination:/queue/USERS." + getQueueName() + "\n" + "ack:auto" + "\n" + "transformation:" + Stomp.Transformations.JMS_XML + "\n\n" + Stomp.NULL;
stompConnection.sendFrame(frame);
frame = stompConnection.receiveFrame();
assertTrue(frame.startsWith("MESSAGE"));
Pattern cl = Pattern.compile("Content-length:\\s*(\\d+)", Pattern.CASE_INSENSITIVE);
Matcher clMmatcher = cl.matcher(frame);
assertTrue(clMmatcher.find());
assertEquals("5", clMmatcher.group(1));
assertFalse(Pattern.compile("type:\\s*null", Pattern.CASE_INSENSITIVE).matcher(frame).find());
}
@Test(timeout = 60000)
public void testTransformationNotOverrideSubscription() throws Exception {
MessageProducer producer = session.createProducer(new ActiveMQQueue("USERS." + getQueueName()));
ObjectMessage message = session.createObjectMessage(new SamplePojo("Dejan", "Belgrade"));
message.setStringProperty("transformation", Stomp.Transformations.JMS_OBJECT_XML.toString());
producer.send(message);
String frame = "CONNECT\n" + "login:system\n" + "passcode:manager\n\n" + Stomp.NULL;
stompConnection.sendFrame(frame);
frame = stompConnection.receiveFrame();
assertTrue(frame.startsWith("CONNECTED"));
frame = "SUBSCRIBE\n" + "destination:/queue/USERS." + getQueueName() + "\n" + "ack:auto" + "\n" + "transformation:" + Stomp.Transformations.JMS_OBJECT_JSON + "\n\n" + Stomp.NULL;
stompConnection.sendFrame(frame);
frame = stompConnection.receiveFrame();
assertTrue(frame.trim().endsWith(jsonObject));
}
@Test(timeout = 60000)
public void testTransformationIgnoreTransformation() throws Exception {
MessageProducer producer = session.createProducer(new ActiveMQQueue("USERS." + getQueueName()));
ObjectMessage message = session.createObjectMessage(new SamplePojo("Dejan", "Belgrade"));
message.setStringProperty("transformation", Stomp.Transformations.JMS_OBJECT_XML.toString());
producer.send(message);
String frame = "CONNECT\n" + "login:system\n" + "passcode:manager\n\n" + Stomp.NULL;
stompConnection.sendFrame(frame);
frame = stompConnection.receiveFrame();
assertTrue(frame.startsWith("CONNECTED"));
frame = "SUBSCRIBE\n" + "destination:/queue/USERS." + getQueueName() + "\n" + "ack:auto" + "\n\n" + Stomp.NULL;
stompConnection.sendFrame(frame);
frame = stompConnection.receiveFrame();
assertTrue(frame.endsWith("\n\n"));
}
@Test(timeout = 60000)
public void testTransformationSendXMLMap() throws Exception {
MessageConsumer consumer = session.createConsumer(queue);
String frame = "CONNECT\n" + "login:system\n" + "passcode:manager\n\n" + Stomp.NULL;
stompConnection.sendFrame(frame);
frame = stompConnection.receiveFrame();
assertTrue(frame.startsWith("CONNECTED"));
frame = "SEND\n" + "destination:/queue/" + getQueueName() + "\n" + "transformation:" + Stomp.Transformations.JMS_MAP_XML + "\n\n" + xmlMap + Stomp.NULL;
stompConnection.sendFrame(frame);
MapMessage message = (MapMessage) consumer.receive(2500);
assertNotNull(message);
assertEquals(message.getString("name"), "Dejan");
}
@Test(timeout = 60000)
public void testTransformationSendJSONMap() throws Exception {
MessageConsumer consumer = session.createConsumer(queue);
String frame = "CONNECT\n" + "login:system\n" + "passcode:manager\n\n" + Stomp.NULL;
stompConnection.sendFrame(frame);
frame = stompConnection.receiveFrame();
assertTrue(frame.startsWith("CONNECTED"));
frame = "SEND\n" + "destination:/queue/" + getQueueName() + "\n" + "transformation:" + Stomp.Transformations.JMS_MAP_JSON + "\n\n" + jsonMap + Stomp.NULL;
stompConnection.sendFrame(frame);
MapMessage message = (MapMessage) consumer.receive(2500);
assertNotNull(message);
assertEquals(message.getString("name"), "Dejan");
}
@Test(timeout = 60000)
public void testTransformationReceiveXMLMap() throws Exception {
MessageProducer producer = session.createProducer(new ActiveMQQueue("USERS." + getQueueName()));
MapMessage message = session.createMapMessage();
message.setString("name", "Dejan");
message.setString("city", "Belgrade");
producer.send(message);
String frame = "CONNECT\n" + "login:system\n" + "passcode:manager\n\n" + Stomp.NULL;
stompConnection.sendFrame(frame);
frame = stompConnection.receiveFrame();
assertTrue(frame.startsWith("CONNECTED"));
frame = "SUBSCRIBE\n" + "destination:/queue/USERS." + getQueueName() + "\n" + "ack:auto\n" + "transformation:" + Stomp.Transformations.JMS_MAP_XML + "\n\n" + Stomp.NULL;
stompConnection.sendFrame(frame);
StompFrame xmlFrame = stompConnection.receive();
LOG.info("Received Frame: {}", xmlFrame.getBody());
Map<String, String> map = createMapFromXml(xmlFrame.getBody());
assertTrue(map.containsKey("name"));
assertTrue(map.containsKey("city"));
assertEquals("Dejan", map.get("name"));
assertEquals("Belgrade", map.get("city"));
}
@Test(timeout = 60000)
public void testTransformationReceiveJSONMap() throws Exception {
MessageProducer producer = session.createProducer(new ActiveMQQueue("USERS." + getQueueName()));
MapMessage message = session.createMapMessage();
message.setString("name", "Dejan");
message.setString("city", "Belgrade");
producer.send(message);
String frame = "CONNECT\n" + "login:system\n" + "passcode:manager\n\n" + Stomp.NULL;
stompConnection.sendFrame(frame);
frame = stompConnection.receiveFrame();
assertTrue(frame.startsWith("CONNECTED"));
frame = "SUBSCRIBE\n" + "destination:/queue/USERS." + getQueueName() + "\n" + "ack:auto\n" + "transformation:" + Stomp.Transformations.JMS_MAP_JSON + "\n\n" + Stomp.NULL;
stompConnection.sendFrame(frame);
StompFrame json = stompConnection.receive();
LOG.info("Received Frame: {}", json.getBody());
assertNotNull(json);
Map<String, String> map = createMapFromJson(json.getBody());
assertTrue(map.containsKey("name"));
assertTrue(map.containsKey("city"));
assertEquals("Dejan", map.get("name"));
assertEquals("Belgrade", map.get("city"));
}
@Test(timeout = 60000)
public void testDurableUnsub() throws Exception {
// get broker JMX view
String domain = "org.apache.activemq";
ObjectName brokerName = new ObjectName(domain + ":type=Broker,brokerName=localhost");
final BrokerViewMBean view = (BrokerViewMBean)
brokerService.getManagementContext().newProxyInstance(brokerName, BrokerViewMBean.class, true);
// connect
String frame = "CONNECT\n" + "login:system\n" + "passcode:manager\nclient-id:test\n\n" + Stomp.NULL;
stompConnection.sendFrame(frame);
frame = stompConnection.receiveFrame();
assertTrue(frame.startsWith("CONNECTED"));
assertEquals(view.getDurableTopicSubscribers().length, 0);
// subscribe
frame = "SUBSCRIBE\n" + "destination:/topic/" + getQueueName() + "\n" + "ack:auto\nactivemq.subscriptionName:test\n\n" + Stomp.NULL;
stompConnection.sendFrame(frame);
// wait a bit for MBean to get refreshed
Wait.waitFor(new Wait.Condition(){
@Override
public boolean isSatisified() throws Exception {
return view.getDurableTopicSubscribers().length == 1;
}
});
assertEquals(view.getDurableTopicSubscribers().length, 1);
// disconnect
frame = "DISCONNECT\nclient-id:test\n\n" + Stomp.NULL;
stompConnection.sendFrame(frame);
Wait.waitFor(new Wait.Condition() {
@Override
public boolean isSatisified() throws Exception {
return getProxyToBroker().getCurrentConnectionsCount() == 1;
}
}, TimeUnit.SECONDS.toMillis(5), TimeUnit.MILLISECONDS.toMillis(25));
//reconnect
stompConnect();
// connect
frame = "CONNECT\n" + "login:system\n" + "passcode:manager\nclient-id:test\n\n" + Stomp.NULL;
stompConnection.sendFrame(frame);
frame = stompConnection.receiveFrame();
assertTrue(frame.startsWith("CONNECTED"));
// unsubscribe
frame = "UNSUBSCRIBE\n" + "destination:/topic/" + getQueueName() + "\n" + "activemq.subscriptionName:test\n\n" + Stomp.NULL;
stompConnection.sendFrame(frame);
frame = "DISCONNECT\n" + "\n\n" + Stomp.NULL;
stompConnection.sendFrame(frame);
Wait.waitFor(new Wait.Condition(){
@Override
public boolean isSatisified() throws Exception {
return view.getDurableTopicSubscribers().length == 0 && view.getInactiveDurableTopicSubscribers().length == 0;
}
}, TimeUnit.SECONDS.toMillis(5), TimeUnit.MILLISECONDS.toMillis(25));
assertEquals(view.getDurableTopicSubscribers().length, 0);
assertEquals(view.getInactiveDurableTopicSubscribers().length, 0);
}
@Test(timeout = 60000)
public void testDurableSubAttemptOnQueueFails() throws Exception {
// get broker JMX view
String domain = "org.apache.activemq";
ObjectName brokerName = new ObjectName(domain + ":type=Broker,brokerName=localhost");
BrokerViewMBean view = (BrokerViewMBean)
brokerService.getManagementContext().newProxyInstance(brokerName, BrokerViewMBean.class, true);
// connect
String frame = "CONNECT\n" + "login:system\n" + "passcode:manager\nclient-id:test\n\n" + Stomp.NULL;
stompConnection.sendFrame(frame);
frame = stompConnection.receiveFrame();
assertTrue(frame.startsWith("CONNECTED"));
assertEquals(view.getQueueSubscribers().length, 0);
// subscribe
frame = "SUBSCRIBE\n" + "destination:/queue/" + getQueueName() + "\n" + "ack:auto\nactivemq.subscriptionName:test\n\n" + Stomp.NULL;
stompConnection.sendFrame(frame);
frame = stompConnection.receiveFrame();
assertTrue(frame.startsWith("ERROR"));
assertEquals(view.getQueueSubscribers().length, 0);
}
@Test(timeout = 60000)
public void testMessageIdHeader() throws Exception {
stompConnection.connect("system", "manager");
stompConnection.begin("tx1");
stompConnection.send("/queue/" + getQueueName(), "msg", "tx1", null);
stompConnection.commit("tx1");
stompConnection.subscribe("/queue/" + getQueueName());
StompFrame stompMessage = stompConnection.receive();
assertNull(stompMessage.getHeaders().get("transaction"));
}
@Test(timeout = 60000)
public void testPrefetchSizeOfOneClientAck() throws Exception {
stompConnection.connect("system", "manager");
HashMap<String, String> headers = new HashMap<String, String>();
headers.put("activemq.prefetchSize", "1");
stompConnection.subscribe("/queue/" + getQueueName(), "client", headers);
// send messages using JMS
sendMessage("message 1");
sendMessage("message 2");
sendMessage("message 3");
sendMessage("message 4");
sendMessage("message 5");
StompFrame frame = stompConnection.receive();
assertEquals(frame.getBody(), "message 1");
try {
StompFrame frameNull = stompConnection.receive(500);
if (frameNull != null) {
fail("Should not have received the second message");
}
} catch (SocketTimeoutException soe) {}
stompConnection.ack(frame);
StompFrame frame1 = stompConnection.receive();
assertEquals(frame1.getBody(), "message 2");
try {
StompFrame frameNull = stompConnection.receive(500);
if (frameNull != null) {
fail("Should not have received the third message");
}
} catch (SocketTimeoutException soe) {}
stompConnection.ack(frame1);
StompFrame frame2 = stompConnection.receive();
assertEquals(frame2.getBody(), "message 3");
try {
StompFrame frameNull = stompConnection.receive(500);
if (frameNull != null) {
fail("Should not have received the fourth message");
}
} catch (SocketTimeoutException soe) {}
stompConnection.ack(frame2);
StompFrame frame3 = stompConnection.receive();
assertEquals(frame3.getBody(), "message 4");
try {
StompFrame frameNull = stompConnection.receive(500);
if (frameNull != null) {
fail("Should not have received the fifth message");
}
} catch (SocketTimeoutException soe) {}
stompConnection.ack(frame3);
StompFrame frame4 = stompConnection.receive();
assertEquals(frame4.getBody(), "message 5");
try {
StompFrame frameNull = stompConnection.receive(500);
if (frameNull != null) {
fail("Should not have received any more messages");
}
} catch (SocketTimeoutException soe) {}
stompConnection.ack(frame4);
try {
StompFrame frameNull = stompConnection.receive(500);
if (frameNull != null) {
fail("Should not have received the any more messages");
}
} catch (SocketTimeoutException soe) {}
}
@Test(timeout = 60000)
public void testPrefetchSize() throws Exception {
stompConnection.connect("system", "manager");
HashMap<String, String> headers = new HashMap<String, String>();
headers.put("activemq.prefetchSize", "1");
stompConnection.subscribe("/queue/" + getQueueName(), "client", headers);
// send messages using JMS
sendMessage("message 1");
sendMessage("message 2");
sendMessage("message 3");
sendMessage("message 4");
sendMessage("message 5");
StompFrame frame = stompConnection.receive(20000);
assertEquals(frame.getBody(), "message 1");
stompConnection.begin("tx1");
stompConnection.ack(frame, "tx1");
StompFrame frame1 = stompConnection.receive();
assertEquals(frame1.getBody(), "message 2");
try {
StompFrame frame2 = stompConnection.receive(500);
if (frame2 != null) {
fail("Should not have received the second message");
}
} catch (SocketTimeoutException soe) {}
stompConnection.ack(frame1, "tx1");
Thread.sleep(1000);
stompConnection.abort("tx1");
stompConnection.begin("tx2");
// Previously delivered message need to get re-acked...
stompConnection.ack(frame, "tx2");
stompConnection.ack(frame1, "tx2");
StompFrame frame3 = stompConnection.receive(20000);
assertEquals(frame3.getBody(), "message 3");
stompConnection.ack(frame3, "tx2");
StompFrame frame4 = stompConnection.receive(20000);
assertEquals(frame4.getBody(), "message 4");
stompConnection.ack(frame4, "tx2");
stompConnection.commit("tx2");
stompConnection.begin("tx3");
StompFrame frame5 = stompConnection.receive(20000);
assertEquals(frame5.getBody(), "message 5");
stompConnection.ack(frame5, "tx3");
stompConnection.commit("tx3");
stompDisconnect();
}
@Test(timeout = 60000)
public void testTransactionsWithMultipleDestinations() throws Exception {
stompConnection.connect("system", "manager");
HashMap<String, String> headers = new HashMap<String, String>();
headers.put("activemq.prefetchSize", "1");
headers.put("activemq.exclusive", "true");
stompConnection.subscribe("/queue/test1", "client", headers);
stompConnection.begin("ID:tx1");
headers.clear();
headers.put("receipt", "ID:msg1");
stompConnection.send("/queue/test2", "test message", "ID:tx1", headers);
stompConnection.commit("ID:tx1");
// make sure connection is active after commit
Thread.sleep(1000);
stompConnection.send("/queue/test1", "another message");
StompFrame frame = stompConnection.receive(500);
assertNotNull(frame);
}
@Test(timeout = 60000)
public void testTempDestination() throws Exception {
String frame = "CONNECT\n" + "login:system\n" + "passcode:manager\n\n" + Stomp.NULL;
stompConnection.sendFrame(frame);
frame = stompConnection.receiveFrame();
assertTrue(frame.startsWith("CONNECTED"));
frame = "SUBSCRIBE\n" + "destination:/temp-queue/" + getQueueName() + "\n" + "ack:auto\n\n" + Stomp.NULL;
stompConnection.sendFrame(frame);
frame = "SEND\n" + "destination:/temp-queue/" + getQueueName() + "\n\n" + "Hello World" + Stomp.NULL;
stompConnection.sendFrame(frame);
StompFrame message = stompConnection.receive(1000);
assertEquals("Hello World", message.getBody());
}
@Test(timeout = 60000)
public void testJMSXUserIDIsSetInMessage() throws Exception {
MessageConsumer consumer = session.createConsumer(queue);
String frame = "CONNECT\n" + "login:system\n" + "passcode:manager\n\n" + Stomp.NULL;
stompConnection.sendFrame(frame);
frame = stompConnection.receiveFrame();
assertTrue(frame.startsWith("CONNECTED"));
frame = "SEND\n" + "destination:/queue/" + getQueueName() + "\n\n" + "Hello World" + Stomp.NULL;
stompConnection.sendFrame(frame);
TextMessage message = (TextMessage)consumer.receive(5000);
assertNotNull(message);
assertEquals("system", message.getStringProperty(Stomp.Headers.Message.USERID));
}
@Test(timeout = 60000)
public void testJMSXUserIDIsSetInStompMessage() throws Exception {
String frame = "CONNECT\n" + "login:system\n" + "passcode:manager\n\n" + Stomp.NULL;
stompConnection.sendFrame(frame);
frame = stompConnection.receiveFrame();
assertTrue(frame.startsWith("CONNECTED"));
frame = "SUBSCRIBE\n" + "destination:/queue/" + getQueueName() + "\n" + "ack:auto\n\n" + Stomp.NULL;
stompConnection.sendFrame(frame);
frame = "SEND\n" + "destination:/queue/" + getQueueName() + "\n\n" + "Hello World" + Stomp.NULL;
stompConnection.sendFrame(frame);
StompFrame message = stompConnection.receive(5000);
assertEquals("system", message.getHeaders().get(Stomp.Headers.Message.USERID));
}
@Test(timeout = 60000)
public void testClientSetMessageIdIsIgnored() throws Exception {
HashMap<String, String> headers = new HashMap<String, String>();
headers.put(Stomp.Headers.Message.MESSAGE_ID, "Thisisnotallowed");
headers.put(Stomp.Headers.Message.TIMESTAMP, "1234");
headers.put(Stomp.Headers.Message.REDELIVERED, "true");
headers.put(Stomp.Headers.Message.SUBSCRIPTION, "Thisisnotallowed");
headers.put(Stomp.Headers.Message.USERID, "Thisisnotallowed");
stompConnection.connect("system", "manager");
stompConnection.send("/queue/" + getQueueName(), "msg", null, headers);
stompConnection.subscribe("/queue/" + getQueueName());
StompFrame stompMessage = stompConnection.receive();
Map<String, String> mess_headers = new HashMap<String, String>();
mess_headers = stompMessage.getHeaders();
assertFalse("Thisisnotallowed".equals(mess_headers.get(Stomp.Headers.Message.MESSAGE_ID)
));
assertTrue("1234".equals(mess_headers.get(Stomp.Headers.Message.TIMESTAMP)));
assertNull(mess_headers.get(Stomp.Headers.Message.REDELIVERED));
assertNull(mess_headers.get(Stomp.Headers.Message.SUBSCRIPTION));
assertEquals("system", mess_headers.get(Stomp.Headers.Message.USERID));
}
@Test(timeout = 60000)
public void testExpire() throws Exception {
stompConnection.connect("system", "manager");
HashMap<String, String> headers = new HashMap<String, String>();
long timestamp = System.currentTimeMillis() - 100;
headers.put(Stomp.Headers.Message.EXPIRATION_TIME, String.valueOf(timestamp));
headers.put(Stomp.Headers.Send.PERSISTENT, "true");
stompConnection.send("/queue/" + getQueueName(), "msg", null, headers);
stompConnection.subscribe("/queue/ActiveMQ.DLQ");
StompFrame stompMessage = stompConnection.receive(35000);
assertNotNull(stompMessage);
assertEquals(stompMessage.getHeaders().get(Stomp.Headers.Message.ORIGINAL_DESTINATION), "/queue/" + getQueueName());
}
@Test(timeout = 60000)
public void testDefaultJMSReplyToDest() throws Exception {
stompConnection.connect("system", "manager");
HashMap<String, String> headers = new HashMap<String, String>();
headers.put(Stomp.Headers.Send.REPLY_TO, "JustAString");
headers.put(Stomp.Headers.Send.PERSISTENT, "true");
stompConnection.send("/queue/" + getQueueName(), "msg-with-reply-to", null, headers);
stompConnection.subscribe("/queue/" + getQueueName());
StompFrame stompMessage = stompConnection.receive(1000);
assertNotNull(stompMessage);
assertEquals("" + stompMessage, stompMessage.getHeaders().get(Stomp.Headers.Send.REPLY_TO), "JustAString");
}
@Test(timeout = 60000)
public void testPersistent() throws Exception {
stompConnection.connect("system", "manager");
HashMap<String, String> headers = new HashMap<String, String>();
headers.put(Stomp.Headers.Message.PERSISTENT, "true");
stompConnection.send("/queue/" + getQueueName(), "hello", null, headers);
stompConnection.subscribe("/queue/" + getQueueName());
StompFrame stompMessage = stompConnection.receive();
assertNotNull(stompMessage);
assertNotNull(stompMessage.getHeaders().get(Stomp.Headers.Message.PERSISTENT));
assertEquals(stompMessage.getHeaders().get(Stomp.Headers.Message.PERSISTENT), "true");
}
@Test(timeout = 60000)
public void testPersistentDefaultValue() throws Exception {
stompConnection.connect("system", "manager");
HashMap<String, String> headers = new HashMap<String, String>();
stompConnection.send("/queue/" + getQueueName(), "hello", null, headers);
stompConnection.subscribe("/queue/" + getQueueName());
StompFrame stompMessage = stompConnection.receive();
assertNotNull(stompMessage);
assertNull(stompMessage.getHeaders().get(Stomp.Headers.Message.PERSISTENT));
}
@Test(timeout = 60000)
public void testReceiptNewQueue() throws Exception {
String frame = "CONNECT\n" + "login:system\n" + "passcode:manager\n\n" + Stomp.NULL;
stompConnection.sendFrame(frame);
frame = stompConnection.receiveFrame();
assertTrue(frame.startsWith("CONNECTED"));
frame = "SUBSCRIBE\n" + "destination:/queue/" + getQueueName() + 1234 + "\n" + "id:8fee4b8-4e5c9f66-4703-e936-3" + "\n" + "receipt:8fee4b8-4e5c9f66-4703-e936-2" + "\n\n" + Stomp.NULL;
stompConnection.sendFrame(frame);
StompFrame receipt = stompConnection.receive();
assertTrue(receipt.getAction().startsWith("RECEIPT"));
assertEquals("8fee4b8-4e5c9f66-4703-e936-2", receipt.getHeaders().get("receipt-id"));
frame = "SEND\n" + "destination:/queue/" + getQueueName() + 123 + "\ncontent-length:0" + " \n\n" + Stomp.NULL;
stompConnection.sendFrame(frame);
frame = "SUBSCRIBE\n" + "destination:/queue/" + getQueueName() + 123 + "\n" + "id:8fee4b8-4e5c9f66-4703-e936-2" + "\n" + "receipt:8fee4b8-4e5c9f66-4703-e936-1" + "\n\n" + Stomp.NULL;
stompConnection.sendFrame(frame);
receipt = stompConnection.receive();
assertTrue(receipt.getAction().startsWith("RECEIPT"));
assertEquals("8fee4b8-4e5c9f66-4703-e936-1", receipt.getHeaders().get("receipt-id"));
StompFrame message = stompConnection.receive();
assertTrue(message.getAction().startsWith("MESSAGE"));
String length = message.getHeaders().get("content-length");
assertEquals("0", length);
assertEquals(0, message.getContent().length);
}
@Test(timeout = 60000)
public void testTransactedClientAckBrokerStats() throws Exception {
String frame = "CONNECT\n" + "login:system\n" + "passcode:manager\n\n" + Stomp.NULL;
stompConnection.sendFrame(frame);
frame = stompConnection.receiveFrame();
assertTrue(frame.startsWith("CONNECTED"));
sendMessage(getName());
sendMessage(getName());
stompConnection.begin("tx1");
frame = "SUBSCRIBE\n" + "destination:/queue/" + getQueueName() + "\n" + "ack:client\n\n" + Stomp.NULL;
stompConnection.sendFrame(frame);
StompFrame message = stompConnection.receive();
assertTrue(message.getAction().equals("MESSAGE"));
stompConnection.ack(message, "tx1");
message = stompConnection.receive();
assertTrue(message.getAction().equals("MESSAGE"));
stompConnection.ack(message, "tx1");
stompConnection.commit("tx1");
frame = "DISCONNECT\n" + "\n\n" + Stomp.NULL;
stompConnection.sendFrame(frame);
final QueueViewMBean queueView = getProxyToQueue(getQueueName());
Wait.waitFor(new Wait.Condition(){
@Override
public boolean isSatisified() throws Exception {
return queueView.getDequeueCount() == 2;
}
});
assertEquals(2, queueView.getDispatchCount());
assertEquals(2, queueView.getDequeueCount());
assertEquals(0, queueView.getQueueSize());
}
@Test(timeout = 60000)
public void testReplytoModification() throws Exception {
String replyto = "some destination";
String frame = "CONNECT\n" + "login:system\n" + "passcode:manager\n\n" + Stomp.NULL;
stompConnection.sendFrame(frame);
frame = stompConnection.receiveFrame();
assertTrue(frame.startsWith("CONNECTED"));
frame = "SUBSCRIBE\n" + "destination:/queue/" + getQueueName() + "\n" + "ack:auto\n\n" + Stomp.NULL;
stompConnection.sendFrame(frame);
frame = "SEND\n" + "destination:/queue/" + getQueueName() + "\n" + "reply-to:" + replyto + "\n\nhello world" + Stomp.NULL;
stompConnection.sendFrame(frame);
StompFrame message = stompConnection.receive();
assertTrue(message.getAction().equals("MESSAGE"));
assertEquals(replyto, message.getHeaders().get("reply-to"));
}
@Test(timeout = 60000)
public void testReplyToDestinationNaming() throws Exception {
String frame = "CONNECT\n" + "login:system\n" + "passcode:manager\n\n" + Stomp.NULL;
stompConnection.sendFrame(frame);
frame = stompConnection.receiveFrame();
assertTrue(frame.startsWith("CONNECTED"));
doTestActiveMQReplyToTempDestination("topic");
doTestActiveMQReplyToTempDestination("queue");
}
@Test(timeout = 60000)
public void testSendNullBodyTextMessage() throws Exception {
String frame = "CONNECT\n" + "login:system\n" + "passcode:manager\n\n" + Stomp.NULL;
stompConnection.sendFrame(frame);
frame = stompConnection.receiveFrame();
assertTrue(frame.startsWith("CONNECTED"));
frame = "SUBSCRIBE\n" + "destination:/queue/" + getQueueName() + "\n" + "ack:auto\n\n" + Stomp.NULL;
stompConnection.sendFrame(frame);
sendMessage(null);
frame = stompConnection.receiveFrame();
assertNotNull("Message not received", frame);
}
private void doTestActiveMQReplyToTempDestination(String type) throws Exception {
LOG.info("Starting test on Temp Destinations using a temporary: " + type);
final String dest = "/" + type + "/" + getQueueName();
final String tempDest = String.format("/temp-%s/2C26441740C0ECC9tt1", type);
LOG.info("Test is using out-bound topic: " + dest + ", and replyTo dest: " + tempDest);
// Subscribe both to the out-bound destination and the response tempt destination
stompConnection.subscribe(dest);
stompConnection.subscribe(tempDest);
// Send a Message with the ReplyTo value set.
HashMap<String, String> properties = new HashMap<String, String>();
properties.put(Stomp.Headers.Send.REPLY_TO, tempDest);
LOG.info(String.format("Sending request message: SEND with %s=%s", Stomp.Headers.Send.REPLY_TO, tempDest));
stompConnection.send(dest, "REQUEST", null, properties);
// The subscription should receive a response with the ReplyTo property set.
StompFrame received = stompConnection.receive();
assertNotNull(received);
String remoteReplyTo = received.getHeaders().get(Stomp.Headers.Send.REPLY_TO);
assertNotNull(remoteReplyTo);
assertTrue(remoteReplyTo.startsWith(String.format("/temp-%s/", type)));
LOG.info(String.format("Received request message: %s with %s=%s", received.getAction(), Stomp.Headers.Send.REPLY_TO, remoteReplyTo));
// Reply to the request using the given ReplyTo destination
stompConnection.send(remoteReplyTo, "RESPONSE");
// The response should be received by the Temporary Destination subscription
StompFrame reply = stompConnection.receive();
assertNotNull(reply);
assertEquals("MESSAGE", reply.getAction());
LOG.info(String.format("Response %s received", reply.getAction()));
BrokerViewMBean broker = getProxyToBroker();
if (type.equals("topic")) {
assertEquals(1, broker.getTemporaryTopics().length);
} else {
assertEquals(1, broker.getTemporaryQueues().length);
}
}
@Test(timeout = 60000)
public void testReplyToAcrossConnections() throws Exception {
String frame = "CONNECT\n" + "login:system\n" + "passcode:manager\n\n" + Stomp.NULL;
stompConnection.sendFrame(frame);
frame = stompConnection.receiveFrame();
assertTrue(frame.startsWith("CONNECTED"));
doReplyToAcrossConnections("topic");
doReplyToAcrossConnections("queue");
}
private void doReplyToAcrossConnections(String type) throws Exception {
LOG.info("Starting test on Temp Destinations using a temporary: " + type);
StompConnection responder = new StompConnection();
stompConnect(responder);
String frame = "CONNECT\n" + "login:system\n" + "passcode:manager\n\n" + Stomp.NULL;
responder.sendFrame(frame);
frame = responder.receiveFrame();
assertTrue(frame.startsWith("CONNECTED"));
final String dest = "/" + type + "/" + getQueueName();
final String tempDest = String.format("/temp-%s/2C26441740C0ECC9tt1:1:0:1", type);
LOG.info("Test is using out-bound topic: " + dest + ", and replyTo dest: " + tempDest);
// Subscribe to the temp destination, this is where we get our response.
stompConnection.subscribe(tempDest);
// Subscribe to the destination, this is where we get our request.
HashMap<String, String> properties = new HashMap<String, String>();
properties.put(Stomp.Headers.RECEIPT_REQUESTED, "subscribe-1");
responder.subscribe(dest, null, properties);
frame = responder.receiveFrame();
assertTrue("Receipt Frame: " + frame, frame.trim().startsWith("RECEIPT"));
assertTrue("Receipt contains correct receipt-id " + frame, frame.indexOf(Stomp.Headers.Response.RECEIPT_ID) >= 0);
// Send a Message with the ReplyTo value set.
properties = new HashMap<String, String>();
properties.put(Stomp.Headers.Send.REPLY_TO, tempDest);
properties.put(Stomp.Headers.RECEIPT_REQUESTED, "send-1");
LOG.info(String.format("Sending request message: SEND with %s=%s", Stomp.Headers.Send.REPLY_TO, tempDest));
stompConnection.send(dest, "REQUEST", null, properties);
frame = stompConnection.receiveFrame();
assertTrue("Receipt Frame: " + frame, frame.trim().startsWith("RECEIPT"));
assertTrue("Receipt contains correct receipt-id " + frame, frame.indexOf(Stomp.Headers.Response.RECEIPT_ID) >= 0);
// The subscription should receive a response with the ReplyTo property set.
StompFrame received = responder.receive();
assertNotNull(received);
String remoteReplyTo = received.getHeaders().get(Stomp.Headers.Send.REPLY_TO);
assertNotNull(remoteReplyTo);
assertTrue(remoteReplyTo.startsWith(String.format("/remote-temp-%s/", type)));
LOG.info(String.format("Received request message: %s with %s=%s", received.getAction(), Stomp.Headers.Send.REPLY_TO, remoteReplyTo));
// Reply to the request using the given ReplyTo destination
responder.send(remoteReplyTo, "RESPONSE");
// The response should be received by the Temporary Destination subscription
StompFrame reply = stompConnection.receive();
assertNotNull(reply);
assertEquals("MESSAGE", reply.getAction());
assertTrue(reply.getBody().contains("RESPONSE"));
LOG.info(String.format("Response %s received", reply.getAction()));
BrokerViewMBean broker = getProxyToBroker();
if (type.equals("topic")) {
assertEquals(1, broker.getTemporaryTopics().length);
} else {
assertEquals(1, broker.getTemporaryQueues().length);
}
}
protected void assertClients(final int expected) throws Exception {
Wait.waitFor(new Wait.Condition()
{
@Override
public boolean isSatisified() throws Exception {
return brokerService.getBroker().getClients().length == expected;
}
}, TimeUnit.SECONDS.toMillis(30), TimeUnit.MILLISECONDS.toMillis(100));
org.apache.activemq.broker.Connection[] clients = brokerService.getBroker().getClients();
int actual = clients.length;
assertEquals("Number of clients", expected, actual);
}
@Test(timeout = 60000)
public void testDisconnectDoesNotDeadlockBroker() throws Exception {
for (int i = 0; i < 20; ++i) {
doTestConnectionLeak();
}
}
private void doTestConnectionLeak() throws Exception {
stompConnect();
String frame = "CONNECT\n" + "login:system\n" + "passcode:manager\n\n" + Stomp.NULL;
stompConnection.sendFrame(frame);
frame = stompConnection.receiveFrame();
assertTrue(frame.startsWith("CONNECTED"));
boolean gotMessage = false;
boolean gotReceipt = false;
char[] payload = new char[1024];
Arrays.fill(payload, 'A');
String test = "SEND\n" +
"x-type:DEV-3485\n" +
"x-uuid:" + UUID.randomUUID() + "\n" +
"persistent:true\n" +
"receipt:" + UUID.randomUUID() + "\n" +
"destination:/queue/test.DEV-3485" +
"\n\n" +
new String(payload) + Stomp.NULL;
frame = "SUBSCRIBE\n" + "destination:/queue/test.DEV-3485\n" + "ack:auto\n\n" + Stomp.NULL;
stompConnection.sendFrame(frame);
stompConnection.sendFrame(test);
// We only want one of them, to trigger the shutdown and potentially
// see a deadlock.
while (!gotMessage && !gotReceipt) {
frame = stompConnection.receiveFrame();
LOG.debug("Received the frame: " + frame);
if (frame.startsWith("RECEIPT")) {
gotReceipt = true;
} else if(frame.startsWith("MESSAGE")) {
gotMessage = true;
} else {
fail("Received a frame that we were not expecting.");
}
}
}
@Test(timeout = 60000)
public void testHeaderValuesAreTrimmed1_0() throws Exception {
String connectFrame = "CONNECT\n" +
"login:system\n" +
"passcode:manager\n" +
"accept-version:1.0\n" +
"host:localhost\n" +
"\n" + Stomp.NULL;
stompConnection.sendFrame(connectFrame);
String f = stompConnection.receiveFrame();
LOG.debug("Broker sent: " + f);
assertTrue(f.startsWith("CONNECTED"));
String message = "SEND\n" + "destination:/queue/" + getQueueName() +
"\ntest1: value" +
"\ntest2:value " +
"\ntest3: value " +
"\n\n" + "Hello World" + Stomp.NULL;
stompConnection.sendFrame(message);
String frame = "SUBSCRIBE\n" + "destination:/queue/" + getQueueName() + "\n" +
"id:12345\n" + "ack:auto\n\n" + Stomp.NULL;
stompConnection.sendFrame(frame);
StompFrame received = stompConnection.receive();
assertTrue(received.getAction().equals("MESSAGE"));
assertEquals("value", received.getHeaders().get("test1"));
assertEquals("value", received.getHeaders().get("test2"));
assertEquals("value", received.getHeaders().get("test3"));
frame = "UNSUBSCRIBE\n" + "destination:/queue/" + getQueueName() + "\n" +
"id:12345\n\n" + Stomp.NULL;
stompConnection.sendFrame(frame);
}
@Test(timeout = 60000)
public void testSendReceiveBigMessage() throws Exception {
String frame = "CONNECT\n" + "login:system\n" + "passcode:manager\n\n" + Stomp.NULL;
stompConnection.sendFrame(frame);
frame = stompConnection.receiveFrame();
assertTrue(frame.startsWith("CONNECTED"));
frame = "SUBSCRIBE\n" + "destination:/queue/" + getQueueName() + "\n" + "ack:auto\n\n" + Stomp.NULL;
stompConnection.sendFrame(frame);
int size = 100;
char[] bigBodyArray = new char[size];
Arrays.fill(bigBodyArray, 'a');
String bigBody = new String(bigBodyArray);
frame = "SEND\n" + "destination:/queue/" + getQueueName() + "\n\n" + bigBody + Stomp.NULL;
stompConnection.sendFrame(frame);
StompFrame sframe = stompConnection.receive();
assertNotNull(sframe);
assertEquals("MESSAGE", sframe.getAction());
assertEquals(bigBody, sframe.getBody());
size = 3000000;
bigBodyArray = new char[size];
Arrays.fill(bigBodyArray, 'a');
bigBody = new String(bigBodyArray);
frame = "SEND\n" + "destination:/queue/" + getQueueName() + "\n\n" + bigBody + Stomp.NULL;
stompConnection.sendFrame(frame);
sframe = stompConnection.receive(5000);
assertNotNull(sframe);
assertEquals("MESSAGE", sframe.getAction());
assertEquals(bigBody, sframe.getBody());
}
@Test(timeout = 60000)
public void testAckInTransactionTopic() throws Exception {
doTestAckInTransaction(true);
}
@Test(timeout = 60000)
public void testAckInTransactionQueue() throws Exception {
doTestAckInTransaction(false);
}
public void doTestAckInTransaction(boolean topic) throws Exception {
String frame = "CONNECT\n" + "login:system\n" + "passcode:manager\n\n" + Stomp.NULL;
stompConnection.sendFrame(frame);
stompConnection.receive();
String destination = (topic ? "/topic" : "/queue") + "/test";
stompConnection.subscribe(destination, Stomp.Headers.Subscribe.AckModeValues.CLIENT);
for (int j = 0; j < 5; j++) {
for (int i = 0; i < 10; i++) {
stompConnection.send(destination , "message" + i);
}
stompConnection.begin("tx"+j);
for (int i = 0; i < 10; i++) {
StompFrame message = stompConnection.receive();
stompConnection.ack(message, "tx"+j);
}
stompConnection.commit("tx"+j);
}
List<Subscription> subs = getDestinationConsumers(brokerService,
ActiveMQDestination.createDestination("test", topic ? ActiveMQDestination.TOPIC_TYPE : ActiveMQDestination.QUEUE_TYPE));
for (Subscription subscription : subs) {
final AbstractSubscription abstractSubscription = (AbstractSubscription) subscription;
assertTrue("prefetchExtension should be back to Zero after commit", Wait.waitFor(new Wait.Condition() {
@Override
public boolean isSatisified() throws Exception {
LOG.info("ext: " + abstractSubscription.getPrefetchExtension().get());
return abstractSubscription.getPrefetchExtension().get() == 0;
}
}));
}
}
public static List<Subscription> getDestinationConsumers(BrokerService broker, ActiveMQDestination destination) {
List<Subscription> result = null;
org.apache.activemq.broker.region.Destination dest = getDestination(broker, destination);
if (dest != null) {
result = dest.getConsumers();
}
return result;
}
public static org.apache.activemq.broker.region.Destination getDestination(BrokerService target, ActiveMQDestination destination) {
org.apache.activemq.broker.region.Destination result = null;
for (org.apache.activemq.broker.region.Destination dest : getDestinationMap(target, destination).values()) {
if (dest.getName().equals(destination.getPhysicalName())) {
result = dest;
break;
}
}
return result;
}
private static Map<ActiveMQDestination, org.apache.activemq.broker.region.Destination> getDestinationMap(BrokerService target,
ActiveMQDestination destination) {
RegionBroker regionBroker = (RegionBroker) target.getRegionBroker();
if (destination.isTemporary()) {
return destination.isQueue() ? regionBroker.getTempQueueRegion().getDestinationMap() :
regionBroker.getTempTopicRegion().getDestinationMap();
}
return destination.isQueue() ?
regionBroker.getQueueRegion().getDestinationMap() :
regionBroker.getTopicRegion().getDestinationMap();
}
protected SamplePojo createObjectFromJson(String data) throws Exception {
HierarchicalStreamReader in = new JettisonMappedXmlDriver().createReader(new StringReader(data));
return createObject(in);
}
protected SamplePojo createObjectFromXml(String data) throws Exception {
HierarchicalStreamReader in = new XppReader(new StringReader(data), XppFactory.createDefaultParser());
return createObject(in);
}
private SamplePojo createObject(HierarchicalStreamReader in) throws Exception {
SamplePojo pojo = (SamplePojo) xstream.unmarshal(in);
return pojo;
}
protected Map<String, String> createMapFromJson(String data) throws Exception {
HierarchicalStreamReader in = new JettisonMappedXmlDriver().createReader(new StringReader(data));
return createMapObject(in);
}
protected Map<String, String> createMapFromXml(String data) throws Exception {
HierarchicalStreamReader in = new XppReader(new StringReader(data), XppFactory.createDefaultParser());
return createMapObject(in);
}
@SuppressWarnings("unchecked")
private Map<String, String> createMapObject(HierarchicalStreamReader in) throws Exception {
Map<String, String> map = (Map<String, String>)xstream.unmarshal(in);
return map;
}
}