blob: bbe6ba77c4354785b63591be07ad21d3a175b387 [file] [log] [blame]
/*
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
*/
package org.apache.qpid.tests.http.endtoend.message;
import static javax.servlet.http.HttpServletResponse.SC_OK;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.not;
import static org.hamcrest.Matchers.notNullValue;
import static org.hamcrest.Matchers.nullValue;
import static org.hamcrest.Matchers.typeCompatibleWith;
import static org.junit.Assert.assertThat;
import static org.junit.Assume.assumeThat;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Enumeration;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import java.util.stream.IntStream;
import javax.jms.BytesMessage;
import javax.jms.Connection;
import javax.jms.DeliveryMode;
import javax.jms.MapMessage;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.Session;
import javax.jms.StreamMessage;
import javax.jms.TextMessage;
import com.fasterxml.jackson.core.type.TypeReference;
import com.google.common.base.Strings;
import org.junit.Before;
import org.junit.Test;
import org.apache.qpid.server.model.Protocol;
import org.apache.qpid.tests.http.HttpRequestConfig;
import org.apache.qpid.tests.http.HttpTestBase;
@HttpRequestConfig
public class MessageTest extends HttpTestBase
{
private static final String QUEUE_NAME = "myqueue";
private static final TypeReference<List<Map<String, Object>>> LIST_MAP_TYPE_REF =
new TypeReference<List<Map<String, Object>>>()
{
};
private static final TypeReference<Map<String, Object>> MAP_TYPE_REF =
new TypeReference<Map<String, Object>>()
{
};
private static final TypeReference<List<Object>> LIST_TYPE_REF =
new TypeReference<List<Object>>()
{
};
@Before
public void setUp()
{
getBrokerAdmin().createQueue(QUEUE_NAME);
getHelper().setTls(true);
}
@Test
public void getJmsMessageWithProperty() throws Exception
{
final String messageProperty = "myProp";
final String messagePropertyValue = "myValue";
Connection connection = getConnection();
try
{
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageProducer producer = session.createProducer(session.createQueue(QUEUE_NAME));
Message message = session.createMessage();
message.setStringProperty(messageProperty, messagePropertyValue);
producer.send(message);
}
finally
{
connection.close();
}
List<Map<String, Object>> messages = getHelper().postJson("queue/myqueue/getMessageInfo",
Collections.singletonMap("includeHeaders",
Boolean.TRUE),
LIST_MAP_TYPE_REF, SC_OK);
assertThat(messages.size(), is(equalTo(1)));
Map<String, Object> message = messages.get(0);
@SuppressWarnings("unchecked")
Map<String, Object> headers = (Map<String, Object>) message.get("headers");
assertThat(headers.get(messageProperty), is(equalTo(messagePropertyValue)));
}
@Test
public void getJmsMessageWithGroupId() throws Exception
{
final String groupIdValue = "mygroup";
Connection connection = getConnection();
try
{
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageProducer producer = session.createProducer(session.createQueue(QUEUE_NAME));
Message message = session.createMessage();
message.setStringProperty("JMSXGroupID", groupIdValue);
producer.send(message);
}
finally
{
connection.close();
}
List<Map<String, Object>> messages = getHelper().postJson("queue/myqueue/getMessageInfo",
Collections.singletonMap("includeHeaders",
Boolean.FALSE),
LIST_MAP_TYPE_REF, SC_OK);
assertThat(messages.size(), is(equalTo(1)));
Map<String, Object> message = messages.get(0);
assertThat(message.get("groupId"), is(groupIdValue));
}
@Test
public void getAcquiredMessage() throws Exception
{
Connection connection = getConnectionBuilder().setSyncPublish(true).build();
try
{
Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
Queue queue = session.createQueue(QUEUE_NAME);
MessageProducer producer = session.createProducer(queue);
producer.setDeliveryMode(DeliveryMode.PERSISTENT);
Message jmsMessage = session.createMessage();
producer.send(jmsMessage);
List<Map<String, Object>> messages = getHelper().postJson("queue/myqueue/getMessageInfo",
Collections.singletonMap("includeHeaders",
Boolean.FALSE),
LIST_MAP_TYPE_REF, SC_OK);
assertThat(messages.size(), is(equalTo(1)));
Map<String, Object> message = messages.get(0);
assertThat(message.get("deliveredToConsumerId"), is(nullValue()));
connection.start();
MessageConsumer consumer = session.createConsumer(queue);
jmsMessage = consumer.receive(getReceiveTimeout());
assertThat(jmsMessage, is(notNullValue()));
messages = getHelper().postJson("queue/myqueue/getMessageInfo",
Collections.singletonMap("includeHeaders",
Boolean.FALSE),
LIST_MAP_TYPE_REF, SC_OK);
assertThat(messages.size(), is(equalTo(1)));
message = messages.get(0);
assertThat(message.get("deliveredToConsumerId"), is(notNullValue()));
}
finally
{
connection.close();
}
}
@Test
public void getJmsMapMessage() throws Exception
{
final String mapKey = "key";
final String mapKeyValue = "value";
Connection connection = getConnection();
try
{
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageProducer producer = session.createProducer(session.createQueue(QUEUE_NAME));
MapMessage message = session.createMapMessage();
message.setString(mapKey, mapKeyValue);
producer.send(message);
}
finally
{
connection.close();
}
List<Map<String, Object>> messages = getHelper().postJson("queue/myqueue/getMessageInfo",
Collections.singletonMap("includeHeaders",
Boolean.TRUE),
LIST_MAP_TYPE_REF, SC_OK);
assertThat(messages.size(), is(equalTo(1)));
Map<String, Object> message = messages.get(0);
int messageId = (int) message.get("id");
Map<String, Object> contentParams = new HashMap<>();
contentParams.put("messageId", messageId);
contentParams.put("returnJson", Boolean.TRUE);
Map<String, Object> content = getHelper().postJson("queue/myqueue/getMessageContent",
contentParams,
MAP_TYPE_REF, SC_OK);
assertThat(content.size(), is(equalTo(1)));
assertThat(content.get(mapKey), is(equalTo(mapKeyValue)));
}
@Test
public void getJmsStreamMessage() throws Exception
{
Connection connection = getConnection();
try
{
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageProducer producer = session.createProducer(session.createQueue(QUEUE_NAME));
StreamMessage message = session.createStreamMessage();
message.writeLong(Long.MAX_VALUE);
message.writeBoolean(true);
message.writeString("Hello World");
producer.send(message);
}
finally
{
connection.close();
}
List<Map<String, Object>> messages = getHelper().postJson("queue/myqueue/getMessageInfo",
Collections.singletonMap("includeHeaders",
Boolean.TRUE),
LIST_MAP_TYPE_REF, SC_OK);
assertThat(messages.size(), is(equalTo(1)));
Map<String, Object> message = messages.get(0);
int messageId = (int) message.get("id");
Map<String, Object> contentParams = new HashMap<>();
contentParams.put("messageId", messageId);
contentParams.put("returnJson", Boolean.TRUE);
List<Object> content = getHelper().postJson("queue/myqueue/getMessageContent",
contentParams,
LIST_TYPE_REF, SC_OK);
assertThat(content.size(), is(equalTo(3)));
assertThat(content.get(0), is(equalTo(Long.MAX_VALUE)));
assertThat(content.get(1), is(equalTo(Boolean.TRUE)));
assertThat(content.get(2), is(equalTo("Hello World")));
}
@Test
public void getJmsBytesMessage() throws Exception
{
final byte[] content = new byte[512];
IntStream.range(0, content.length).forEachOrdered(i -> content[i] = (byte) (i % 256));
Connection connection = getConnection();
try
{
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageProducer producer = session.createProducer(session.createQueue(QUEUE_NAME));
BytesMessage message = session.createBytesMessage();
message.writeBytes(content);
producer.send(message);
}
finally
{
connection.close();
}
List<Map<String, Object>> messages = getHelper().postJson("queue/myqueue/getMessageInfo",
Collections.singletonMap("includeHeaders",
Boolean.TRUE),
LIST_MAP_TYPE_REF, SC_OK);
assertThat(messages.size(), is(equalTo(1)));
Map<String, Object> message = messages.get(0);
int messageId = (int) message.get("id");
byte[] receivedContent = getHelper().getBytes(String.format(
"queue/myqueue/getMessageContent?messageId=%d", messageId));
assumeThat("AMQP1.0 messages return the AMQP type",
getProtocol(), is(not(equalTo(Protocol.AMQP_1_0))));
assertThat(receivedContent, is(equalTo(content)));
}
@Test
public void publishEmptyMessage() throws Exception
{
Map<String, Object> messageBody = new HashMap<>();
messageBody.put("address", QUEUE_NAME);
getHelper().submitRequest("virtualhost/publishMessage",
"POST",
Collections.singletonMap("message", messageBody),
SC_OK);
Connection connection = getConnection();
try
{
connection.start();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Queue queue = session.createQueue(QUEUE_NAME);
MessageConsumer consumer = session.createConsumer(queue);
Message message = consumer.receive(getReceiveTimeout());
assertThat(message, is(notNullValue()));
}
finally
{
connection.close();
}
}
@Test
public void publishMessageApplicationHeaders() throws Exception
{
final String stringPropValue = "mystring";
final String longStringPropValue = Strings.repeat("*", 256);
final Map<String, Object> headers = new HashMap<>();
headers.put("stringprop", stringPropValue);
headers.put("longstringprop", longStringPropValue);
headers.put("intprop", Integer.MIN_VALUE);
headers.put("longprop", Long.MAX_VALUE);
headers.put("boolprop", Boolean.TRUE);
final Map<String, Object> messageBody = new HashMap<>();
messageBody.put("address", QUEUE_NAME);
messageBody.put("headers", headers);
getHelper().submitRequest("virtualhost/publishMessage",
"POST",
Collections.singletonMap("message", messageBody),
SC_OK);
Connection connection = getConnection();
try
{
connection.start();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Queue queue = session.createQueue(QUEUE_NAME);
MessageConsumer consumer = session.createConsumer(queue);
Message message = consumer.receive(getReceiveTimeout());
assertThat(message, is(notNullValue()));
assertThat(message.getStringProperty("stringprop"), is(equalTo(stringPropValue)));
assertThat(message.getIntProperty("intprop"), is(equalTo(Integer.MIN_VALUE)));
assertThat(message.getLongProperty("longprop"), is(equalTo(Long.MAX_VALUE)));
assertThat(message.getBooleanProperty("boolprop"), is(equalTo(Boolean.TRUE)));
}
finally
{
connection.close();
}
}
@Test
public void publishMessageHeaders() throws Exception
{
final String messageId = "ID:" + UUID.randomUUID().toString();
final long expiration = TimeUnit.DAYS.toMillis(1) + System.currentTimeMillis();
Map<String, Object> messageBody = new HashMap<>();
messageBody.put("address", QUEUE_NAME);
messageBody.put("messageId", messageId);
messageBody.put("expiration", expiration);
getHelper().submitRequest("virtualhost/publishMessage",
"POST",
Collections.singletonMap("message", messageBody),
SC_OK);
Connection connection = getConnection();
try
{
connection.start();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Queue queue = session.createQueue(QUEUE_NAME);
MessageConsumer consumer = session.createConsumer(queue);
Message message = consumer.receive(getReceiveTimeout());
assertThat(message, is(notNullValue()));
assertThat(message.getJMSMessageID(), is(equalTo(messageId)));
assertThat(message.getJMSExpiration(), is(greaterThanOrEqualTo(expiration)));
}
finally
{
connection.close();
}
}
@Test
public void publishStringMessage() throws Exception
{
final String content = "Hello world";
TextMessage message = publishMessageWithContent(content, TextMessage.class);
assertThat("Unexpected message content", message.getText(), is(equalTo(content)));
}
@Test
public void publishMapMessage() throws Exception
{
final Map<String, Object> content = new HashMap<>();
content.put("key1", "astring");
content.put("key2", Integer.MIN_VALUE);
content.put("key3", Long.MAX_VALUE);
content.put("key4", null);
MapMessage message = publishMessageWithContent(content, MapMessage.class);
final Enumeration mapNames = message.getMapNames();
int entryCount = 0;
while(mapNames.hasMoreElements())
{
String key = (String) mapNames.nextElement();
assertThat("Unexpected map content for key : " + key, message.getObject(key), is(equalTo(content.get(key))));
entryCount++;
}
assertThat("Unexpected number of key/value pairs in map message", entryCount, is(equalTo(content.size())));
}
@Test
public void publishListMessage() throws Exception
{
final List<Object> content = new ArrayList<>();
content.add("astring");
content.add(Integer.MIN_VALUE);
content.add(Long.MAX_VALUE);
content.add(null);
StreamMessage message = publishMessageWithContent(content, StreamMessage.class);
assertThat(message.readString(), is(equalTo("astring")));
assertThat(message.readInt(), is(equalTo((Integer.MIN_VALUE))));
assertThat(message.readLong(), is(equalTo(Long.MAX_VALUE)));
assertThat(message.readObject(), is(nullValue()));
}
private <M extends Message> M publishMessageWithContent(final Object content, final Class<M> expectedMessageClass)
throws Exception
{
Map<String, Object> messageBody = new HashMap<>();
messageBody.put("address", QUEUE_NAME);
messageBody.put("content", content);
getHelper().submitRequest("virtualhost/publishMessage",
"POST",
Collections.singletonMap("message", messageBody),
SC_OK);
Connection connection = getConnection();
try
{
connection.start();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Queue queue = session.createQueue(QUEUE_NAME);
MessageConsumer consumer = session.createConsumer(queue);
@SuppressWarnings("unchecked")
M message = (M) consumer.receive(getReceiveTimeout());
assertThat(message, is(notNullValue()));
assertThat(message.getClass(), is(typeCompatibleWith(expectedMessageClass)));
return message;
}
finally
{
connection.close();
}
}
}