blob: fb6bfca1d80a1e91bcbc5feb0ca84d46dccc101b [file] [log] [blame]
/*
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
*/
package org.apache.qpid.systest.rest;
import java.io.IOException;
import java.net.HttpURLConnection;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import javax.jms.BytesMessage;
import javax.jms.Connection;
import javax.jms.DeliveryMode;
import javax.jms.Destination;
import javax.jms.Message;
import javax.jms.MessageProducer;
import javax.jms.Session;
import org.codehaus.jackson.JsonParseException;
import org.codehaus.jackson.map.JsonMappingException;
public class MessagesRestTest extends QpidRestTestCase
{
/**
* Message number to publish into queue
*/
private static final int MESSAGE_NUMBER = 12;
private Connection _connection;
private Session _session;
private MessageProducer _producer;
private long _startTime;
private long _ttl;
public void setUp() throws Exception
{
super.setUp();
_startTime = System.currentTimeMillis();
_connection = getConnection();
_session = _connection.createSession(true, Session.SESSION_TRANSACTED);
String queueName = getTestQueueName();
Destination queue = _session.createQueue(queueName);
_session.createConsumer(queue);
_producer = _session.createProducer(queue);
_ttl = TimeUnit.DAYS.toMillis(1);
for (int i = 0; i < MESSAGE_NUMBER; i++)
{
Message m = _session.createTextMessage("Test-" + i);
m.setIntProperty("index", i);
if (i % 2 == 0)
{
_producer.send(m);
}
else
{
_producer.send(m, DeliveryMode.NON_PERSISTENT, 5, _ttl);
}
}
_session.commit();
}
public void testGet() throws Exception
{
String queueName = getTestQueueName();
List<Map<String, Object>> messages = getRestTestHelper().getJsonAsList("/rest/message/test/" + queueName);
assertNotNull("Messages are not found", messages);
assertEquals("Unexpected number of messages", MESSAGE_NUMBER, messages.size());
int position = 0;
for (Map<String, Object> message : messages)
{
assertMessage(position, message);
position++;
}
}
public void testGetMessageContent() throws Exception
{
String queueName = getTestQueueName();
// add bytes message
BytesMessage byteMessage = _session.createBytesMessage();
byte[] messageBytes = "Test".getBytes();
byteMessage.writeBytes(messageBytes);
byteMessage.setStringProperty("test", "value");
_producer.send(byteMessage);
_session.commit();
// get message IDs
List<Long> ids = getMesssageIds(queueName);
Map<String, Object> message = getRestTestHelper().getJsonAsMap("/rest/message/test/" + queueName + "/" + ids.get(0));
assertMessageAttributes(message);
assertMessageAttributeValues(message, true);
@SuppressWarnings("unchecked")
Map<String, Object> headers = (Map<String, Object>) message.get("headers");
assertNotNull("Message headers are not found", headers);
assertEquals("Unexpected message header", 0, headers.get("index"));
Long lastMessageId = ids.get(ids.size() - 1);
message = getRestTestHelper().getJsonAsMap("/rest/message/test/" + queueName + "/" + lastMessageId);
assertMessageAttributes(message);
assertEquals("Unexpected message attribute mimeType", "application/octet-stream", message.get("mimeType"));
assertEquals("Unexpected message attribute size", 4, message.get("size"));
@SuppressWarnings("unchecked")
Map<String, Object> bytesMessageHeader = (Map<String, Object>) message.get("headers");
assertNotNull("Message headers are not found", bytesMessageHeader);
assertEquals("Unexpected message header", "value", bytesMessageHeader.get("test"));
// get content
HttpURLConnection connection = getRestTestHelper().openManagementConnection("/rest/message-content/test/" + queueName + "/"
+ lastMessageId, "GET");
connection.connect();
byte[] data = getRestTestHelper().readConnectionInputStream(connection);
assertTrue("Unexpected message", Arrays.equals(messageBytes, data));
}
public void testPostMoveMessages() throws Exception
{
String queueName = getTestQueueName();
String queueName2 = queueName + "_2";
Destination queue2 = _session.createQueue(queueName2);
_session.createConsumer(queue2);
// get message IDs
List<Long> ids = getMesssageIds(queueName);
// move half of the messages
int movedNumber = ids.size() / 2;
List<Long> movedMessageIds = new ArrayList<Long>();
for (int i = 0; i < movedNumber; i++)
{
movedMessageIds.add(ids.remove(i));
}
// move messages
HttpURLConnection connection = getRestTestHelper().openManagementConnection("/rest/message/test/" + queueName, "POST");
Map<String, Object> messagesData = new HashMap<String, Object>();
messagesData.put("messages", movedMessageIds);
messagesData.put("destinationQueue", queueName2);
messagesData.put("move", Boolean.TRUE);
getRestTestHelper().writeJsonRequest(connection, messagesData);
assertEquals("Unexpected response code", 200, connection.getResponseCode());
// check messages on target queue
List<Map<String, Object>> messages = getRestTestHelper().getJsonAsList("/rest/message/test/" + queueName2);
assertNotNull("Messages are not found", messages);
assertEquals("Unexpected number of messages", movedMessageIds.size(), messages.size());
for (Long id : movedMessageIds)
{
Map<String, Object> message = getRestTestHelper().find("id", id.intValue(), messages);
assertMessageAttributes(message);
}
// check messages on original queue
messages = getRestTestHelper().getJsonAsList("/rest/message/test/" + queueName);
assertNotNull("Messages are not found", messages);
assertEquals("Unexpected number of messages", ids.size(), messages.size());
for (Long id : ids)
{
Map<String, Object> message = getRestTestHelper().find("id", id.intValue(), messages);
assertMessageAttributes(message);
}
for (Long id : movedMessageIds)
{
Map<String, Object> message = getRestTestHelper().find("id", id.intValue(), messages);
assertNull("Moved message " + id + " is found on original queue", message);
}
}
public void testPostCopyMessages() throws Exception
{
String queueName = getTestQueueName();
String queueName2 = queueName + "_2";
Destination queue2 = _session.createQueue(queueName2);
_session.createConsumer(queue2);
// get message IDs
List<Long> ids = getMesssageIds(queueName);
// copy half of the messages
int copyNumber = ids.size() / 2;
List<Long> copyMessageIds = new ArrayList<Long>();
for (int i = 0; i < copyNumber; i++)
{
copyMessageIds.add(ids.remove(i));
}
// copy messages
HttpURLConnection connection = getRestTestHelper().openManagementConnection("/rest/message/test/" + queueName, "POST");
Map<String, Object> messagesData = new HashMap<String, Object>();
messagesData.put("messages", copyMessageIds);
messagesData.put("destinationQueue", queueName2);
getRestTestHelper().writeJsonRequest(connection, messagesData);
assertEquals("Unexpected response code", 200, connection.getResponseCode());
// check messages on target queue
List<Map<String, Object>> messages = getRestTestHelper().getJsonAsList("/rest/message/test/" + queueName2);
assertNotNull("Messages are not found", messages);
assertEquals("Unexpected number of messages", copyMessageIds.size(), messages.size());
for (Long id : copyMessageIds)
{
Map<String, Object> message = getRestTestHelper().find("id", id.intValue(), messages);
assertMessageAttributes(message);
}
// check messages on original queue
messages = getRestTestHelper().getJsonAsList("/rest/message/test/" + queueName);
assertNotNull("Messages are not found", messages);
assertEquals("Unexpected number of messages", MESSAGE_NUMBER, messages.size());
for (Long id : ids)
{
Map<String, Object> message = getRestTestHelper().find("id", id.intValue(), messages);
assertMessageAttributes(message);
}
for (Long id : copyMessageIds)
{
Map<String, Object> message = getRestTestHelper().find("id", id.intValue(), messages);
assertMessageAttributes(message);
}
}
public void testDeleteMessages() throws Exception
{
String queueName = getTestQueueName();
// get message IDs
List<Long> ids = getMesssageIds(queueName);
// delete half of the messages
int deleteNumber = ids.size() / 2;
StringBuilder queryString = new StringBuilder();
List<Long> deleteMessageIds = new ArrayList<Long>();
for (int i = 0; i < deleteNumber; i++)
{
Long id = ids.remove(i);
deleteMessageIds.add(id);
if (queryString.length() > 0)
{
queryString.append("&");
}
queryString.append("id=").append(id);
}
// delete messages
HttpURLConnection connection = getRestTestHelper().openManagementConnection(
"/rest/message/test/" + queueName + "?" + queryString.toString(), "DELETE");
connection.connect();
assertEquals("Unexpected response code", 200, connection.getResponseCode());
// check messages on queue
List<Map<String, Object>> messages = getRestTestHelper().getJsonAsList("/rest/message/test/" + queueName);
assertNotNull("Messages are not found", messages);
assertEquals("Unexpected number of messages", ids.size(), messages.size());
for (Long id : ids)
{
Map<String, Object> message = getRestTestHelper().find("id", id.intValue(), messages);
assertMessageAttributes(message);
}
for (Long id : deleteMessageIds)
{
Map<String, Object> message = getRestTestHelper().find("id", id.intValue(), messages);
assertNull("Message with id " + id + " was not deleted", message);
}
}
private List<Long> getMesssageIds(String queueName) throws IOException, JsonParseException, JsonMappingException
{
List<Map<String, Object>> messages = getRestTestHelper().getJsonAsList("/rest/message/test/" + queueName);
List<Long> ids = new ArrayList<Long>();
for (Map<String, Object> message : messages)
{
ids.add(((Number) message.get("id")).longValue());
}
return ids;
}
private void assertMessage(int position, Map<String, Object> message)
{
assertMessageAttributes(message);
assertEquals("Unexpected message attribute position", position, message.get("position"));
assertEquals("Unexpected message attribute size", position < 10 ? 6 : 7, message.get("size"));
boolean even = position % 2 == 0;
assertMessageAttributeValues(message, even);
}
private void assertMessageAttributeValues(Map<String, Object> message, boolean even)
{
if (even)
{
assertEquals("Unexpected message attribute expirationTime", 0, message.get("expirationTime"));
assertEquals("Unexpected message attribute priority", 4, message.get("priority"));
assertEquals("Unexpected message attribute persistent", Boolean.TRUE, message.get("persistent"));
}
else
{
assertEquals("Unexpected message attribute expirationTime", ((Number) message.get("timestamp")).longValue()
+ _ttl, message.get("expirationTime"));
assertEquals("Unexpected message attribute priority", 5, message.get("priority"));
assertEquals("Unexpected message attribute persistent", Boolean.FALSE, message.get("persistent"));
}
assertEquals("Unexpected message attribute mimeType", "text/plain", message.get("mimeType"));
assertEquals("Unexpected message attribute userId", "guest", message.get("userId"));
assertEquals("Unexpected message attribute deliveryCount", 0, message.get("deliveryCount"));
assertEquals("Unexpected message attribute state", "Available", message.get("state"));
}
private void assertMessageAttributes(Map<String, Object> message)
{
assertNotNull("Message map cannot be null", message);
assertNotNull("Unexpected message attribute deliveryCount", message.get("deliveryCount"));
assertNotNull("Unexpected message attribute state", message.get("state"));
assertNotNull("Unexpected message attribute id", message.get("id"));
assertNotNull("Message arrivalTime cannot be null", message.get("arrivalTime"));
assertNotNull("Message timestamp cannot be null", message.get("timestamp"));
assertTrue("Message arrivalTime cannot be null", ((Number) message.get("arrivalTime")).longValue() > _startTime);
assertNotNull("Message messageId cannot be null", message.get("messageId"));
assertNotNull("Unexpected message attribute mimeType", message.get("mimeType"));
assertNotNull("Unexpected message attribute userId", message.get("userId"));
assertNotNull("Message priority cannot be null", message.get("priority"));
assertNotNull("Message expirationTime cannot be null", message.get("expirationTime"));
assertNotNull("Message persistent cannot be null", message.get("persistent"));
}
}