blob: e413a136c1ab18451278c7d22515959efe51382e [file] [log] [blame]
/*
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
*/
package org.apache.qpid.systest;
import java.io.ByteArrayOutputStream;
import java.io.EOFException;
import java.io.IOException;
import java.io.InputStream;
import java.net.HttpURLConnection;
import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.zip.GZIPInputStream;
import javax.jms.Connection;
import javax.jms.JMSException;
import javax.jms.MapMessage;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.Session;
import javax.jms.TextMessage;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.Maps;
import org.apache.qpid.client.AMQConnectionFactory;
import org.apache.qpid.jms.ConnectionURL;
import org.apache.qpid.server.model.Broker;
import org.apache.qpid.server.model.Port;
import org.apache.qpid.server.model.port.HttpPort;
import org.apache.qpid.systest.rest.RestTestHelper;
import org.apache.qpid.test.utils.QpidBrokerTestCase;
import org.apache.qpid.test.utils.TestBrokerConfiguration;
public class MessageCompressionTest extends QpidBrokerTestCase
{
private RestTestHelper _restTestHelper;
@Override
public void startDefaultBroker()
{
// tests are starting the broker
}
public void doActualSetUp() throws Exception
{
TestBrokerConfiguration config = getDefaultBrokerConfiguration();
config.addHttpManagementConfiguration();
config.setObjectAttribute(Port.class, TestBrokerConfiguration.ENTRY_NAME_HTTP_PORT,
HttpPort.ALLOW_CONFIDENTIAL_OPERATIONS_ON_INSECURE_CHANNELS,
true);
super.startDefaultBroker();
_restTestHelper = new RestTestHelper(getDefaultBroker().getHttpPort());
}
@Override
protected void tearDown() throws Exception
{
try
{
super.tearDown();
}
finally
{
_restTestHelper.tearDown();
}
}
public void testSenderCompressesReceiverUncompresses() throws Exception
{
doTestCompression(true, true, true);
}
public void testSenderCompressesOnly() throws Exception
{
doTestCompression(true, false, true);
}
public void testReceiverUncompressesOnly() throws Exception
{
doTestCompression(false, true, true);
}
public void testNoCompression() throws Exception
{
doTestCompression(false, false, true);
}
public void testDisablingCompressionAtBroker() throws Exception
{
doTestCompression(true, true, false);
}
private void doTestCompression(final boolean senderCompresses,
final boolean receiverUncompresses,
final boolean brokerCompressionEnabled) throws Exception
{
setTestSystemProperty(Broker.BROKER_MESSAGE_COMPRESSION_ENABLED, String.valueOf(brokerCompressionEnabled));
doActualSetUp();
String messageText = createMessageText();
Connection senderConnection = getConnection(senderCompresses);
Session senderSession = senderConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Queue testQueue = createTestQueue(senderSession);
publishMessage(senderConnection, messageText, testQueue);
// get the number of bytes received at the broker on the connection
List<Map<String, Object>> connectionRestOutput = _restTestHelper.getJsonAsList("/api/latest/connection");
assertEquals(1, connectionRestOutput.size());
Map statistics = (Map) connectionRestOutput.get(0).get("statistics");
int bytesIn = (Integer) statistics.get("bytesIn");
// if sending compressed then the bytesIn statistic for the connection should reflect the compressed size of the
// message
if(senderCompresses && brokerCompressionEnabled)
{
assertTrue("Message was not sent compressed", bytesIn < messageText.length());
}
else
{
assertFalse("Message was incorrectly sent compressed", bytesIn < messageText.length());
}
senderConnection.close();
// receive the message
Connection consumerConnection = getConnection(receiverUncompresses);
Session session = consumerConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageConsumer consumer = session.createConsumer(getTestQueue());
consumerConnection.start();
TextMessage message = (TextMessage) consumer.receive(500l);
assertNotNull("Message was not received", message);
assertEquals("Message was corrupted", messageText, message.getText());
assertEquals("Header was corrupted", "foo", message.getStringProperty("bar"));
// get the number of bytes sent by the broker
connectionRestOutput = _restTestHelper.getJsonAsList("/api/latest/connection");
assertEquals(1, connectionRestOutput.size());
statistics = (Map) connectionRestOutput.get(0).get("statistics");
int bytesOut = (Integer) statistics.get("bytesOut");
// if receiving compressed the bytes out statistic from the connection should reflect the compressed size of the
// message
if(receiverUncompresses && brokerCompressionEnabled)
{
assertTrue("Message was not received compressed", bytesOut < messageText.length());
}
else
{
assertFalse("Message was incorrectly received compressed", bytesOut < messageText.length());
}
consumerConnection.close();
}
public void testGetContentViaRestForCompressedMessageWithAgentNotSupportingCompression() throws Exception
{
setTestSystemProperty(Broker.BROKER_MESSAGE_COMPRESSION_ENABLED, String.valueOf(true));
doActualSetUp();
String messageText = createMessageText();
Connection senderConnection = getConnection(true);
String virtualPath = ((AMQConnectionFactory) getConnectionFactory()).getVirtualPath();
Session session = senderConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Queue testQueue = createTestQueue(session);
publishMessage(senderConnection, messageText, testQueue);
String queueRelativePath = "queue" + virtualPath + virtualPath + "/" + testQueue.getQueueName();
List<Map<String, Object>> messages = _restTestHelper.getJsonAsList(queueRelativePath + "/getMessageInfo");
assertEquals("Unexpected number of messages", 1, messages.size());
long id = ((Number) messages.get(0).get("id")).longValue();
byte[] messageBytes = _restTestHelper.getBytes(queueRelativePath + "/getMessageContent?messageId=" + id);
String content = new String(messageBytes, StandardCharsets.UTF_8);
assertEquals("Unexpected message content :" + content, messageText, content);
messageBytes = _restTestHelper.getBytes(queueRelativePath + "/getMessageContent?limit=1024&decompressBeforeLimiting=true&messageId=" + id);
content = new String(messageBytes, StandardCharsets.UTF_8);
assertEquals("Unexpected message content :" + content, messageText.substring(0, 1024), content);
}
public void testGetContentViaRestForCompressedMessageWithAgentSupportingCompression() throws Exception
{
setTestSystemProperty(Broker.BROKER_MESSAGE_COMPRESSION_ENABLED, String.valueOf(true));
doActualSetUp();
String messageText = createMessageText();
Connection senderConnection = getConnection(true);
String virtualPath = ((AMQConnectionFactory) getConnectionFactory()).getVirtualPath();
Session session = senderConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Queue testQueue = createTestQueue(session);
publishMessage(senderConnection, messageText, testQueue);
String queueRelativePath = "queue" + virtualPath + virtualPath + "/" + testQueue.getQueueName();
List<Map<String, Object>> messages = _restTestHelper.getJsonAsList(queueRelativePath + "/getMessageInfo");
assertEquals("Unexpected number of messages", 1, messages.size());
long id = ((Number) messages.get(0).get("id")).longValue();
_restTestHelper.setAcceptEncoding("gzip, deflate, br");
String content = getDecompressedContent(queueRelativePath + "/getMessageContent?messageId=" + id);
assertEquals("Unexpected message content :" + content, messageText, content);
content = getDecompressedContent(queueRelativePath + "/getMessageContent?limit=1024&decompressBeforeLimiting=true&messageId=" + id);
assertEquals("Unexpected message content :" + content, messageText.substring(0, 1024), content);
}
public void testGetTruncatedContentViaRestForCompressedMessage() throws Exception
{
setTestSystemProperty(Broker.BROKER_MESSAGE_COMPRESSION_ENABLED, String.valueOf(true));
doActualSetUp();
String messageText = createMessageText();
Connection senderConnection = getConnection(true);
String virtualPath = ((AMQConnectionFactory)getConnectionFactory()).getVirtualPath();
Session session = senderConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Queue testQueue = createTestQueue(session);
publishMessage(senderConnection, messageText, testQueue);
String queueRelativePath = "queue" + virtualPath + virtualPath + "/" + testQueue.getQueueName();
List<Map<String, Object>> messages = _restTestHelper.getJsonAsList(queueRelativePath + "/getMessageInfo");
assertEquals("Unexpected number of messages", 1, messages.size());
long id = ((Number) messages.get(0).get("id")).longValue();
_restTestHelper.setAcceptEncoding("gzip");
try
{
getDecompressedContent(queueRelativePath + "/getMessageContent?limit=1024&messageId=" + id);
fail("Should not be able to decompress truncated gzip");
}
catch (EOFException e)
{
// pass
}
}
private String getDecompressedContent(final String url) throws IOException
{
HttpURLConnection connection = _restTestHelper.openManagementConnection(url, "GET");
connection.connect();
return decompressInputStream(connection);
}
public void testGetContentViaRestForCompressedMapMessageWithAgentNotSupportingCompression() throws Exception
{
setTestSystemProperty(Broker.BROKER_MESSAGE_COMPRESSION_ENABLED, String.valueOf(true));
doActualSetUp();
Connection senderConnection = getConnection(true);
String virtualPath = ((AMQConnectionFactory)getConnectionFactory()).getVirtualPath();
Session session = senderConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Queue testQueue = createTestQueue(session);
Map<String, Object> mapToSend = createMapToSend();
publishMapMessage(senderConnection, mapToSend, testQueue);
String queueRelativePath = "queue" + virtualPath + virtualPath + "/" + testQueue.getQueueName();
List<Map<String, Object>> messages = _restTestHelper.getJsonAsList(queueRelativePath + "/getMessageInfo");
assertEquals("Unexpected number of messages", 1, messages.size());
long id = ((Number) messages.get(0).get("id")).longValue();
Map<String, Object> content =
_restTestHelper.getJsonAsMap(queueRelativePath + "/getMessageContent?returnJson=true&messageId=" + id);
assertEquals("Unexpected message content: difference " + Maps.difference(mapToSend, content),
new HashMap<>(mapToSend),
new HashMap<>(content));
}
public void testGetContentViaRestForCompressedMapMessageWithAgentSupportingCompression() throws Exception
{
setTestSystemProperty(Broker.BROKER_MESSAGE_COMPRESSION_ENABLED, String.valueOf(true));
doActualSetUp();
Connection senderConnection = getConnection(true);
String virtualPath = ((AMQConnectionFactory)getConnectionFactory()).getVirtualPath();
Session session = senderConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Queue testQueue = createTestQueue(session);
Map<String, Object> mapToSend = createMapToSend();
publishMapMessage(senderConnection, mapToSend, testQueue);
String queueRelativePath = "queue" + virtualPath + virtualPath + "/" + testQueue.getQueueName();
List<Map<String, Object>> messages = _restTestHelper.getJsonAsList(queueRelativePath + "/getMessageInfo");
assertEquals("Unexpected number of messages", 1, messages.size());
long id = ((Number) messages.get(0).get("id")).longValue();
_restTestHelper.setAcceptEncoding("gzip, deflate, br");
HttpURLConnection connection =
_restTestHelper.openManagementConnection(queueRelativePath
+ "/getMessageContent?returnJson=true&messageId="
+ id,
"GET");
connection.connect();
String content = decompressInputStream(connection);
Map<String, Object> mapContent = new ObjectMapper().readValue(content, Map.class);
assertEquals("Unexpected message content: difference " + Maps.difference(mapToSend, mapContent),
new HashMap<>(mapToSend),
new HashMap<>(mapContent));
}
private Map<String, Object> createMapToSend()
{
Map<String, Object> mapToSend = new HashMap<>();
String message = "This is a sample message";
int i = 0, l = message.length();
do
{
mapToSend.put("text" + i, message);
i++;
}
while (i * l < 2048 * 1024);
mapToSend.put("int", 1);
return mapToSend;
}
private String decompressInputStream(final HttpURLConnection connection) throws IOException
{
String content;
try (InputStream is = new GZIPInputStream(connection.getInputStream());
ByteArrayOutputStream baos = new ByteArrayOutputStream())
{
byte[] buffer = new byte[1024];
int len;
while ((len = is.read(buffer)) != -1)
{
baos.write(buffer, 0, len);
}
content = new String(baos.toByteArray(), StandardCharsets.UTF_8);
}
return content;
}
private void publishMapMessage(final Connection senderConnection,
final Map<String, Object> mapData,
final Queue testQueue)
throws JMSException, org.apache.qpid.QpidException
{
Session session = senderConnection.createSession(true, Session.SESSION_TRANSACTED);
MessageProducer producer = session.createProducer(testQueue);
MapMessage sentMessage = session.createMapMessage();
sentMessage.setStringProperty("bar", "foo");
for(Map.Entry<String,Object> entry: mapData.entrySet())
{
String key = entry.getKey();
Object value = entry.getValue();
if (value instanceof String)
{
sentMessage.setString(key, (String) value);
}
else if (value instanceof Integer)
{
sentMessage.setInt(key, (Integer) value);
}
else
{
throw new RuntimeException("Setting value of type " + value.getClass() + " is not implemented yet");
}
}
producer.send(sentMessage);
session.commit();
}
private void publishMessage(final Connection senderConnection, final String messageText, final Queue testQueue)
throws JMSException, org.apache.qpid.QpidException
{
Session session = senderConnection.createSession(true, Session.SESSION_TRANSACTED);
MessageProducer producer = session.createProducer(testQueue);
TextMessage sentMessage = session.createTextMessage(messageText);
sentMessage.setStringProperty("bar", "foo");
producer.send(sentMessage);
session.commit();
}
private String createMessageText()
{
StringBuilder stringBuilder = new StringBuilder();
while(stringBuilder.length() < 2048*1024)
{
stringBuilder.append("This should compress easily. ");
}
return stringBuilder.toString();
}
private Connection getConnection(final boolean compress) throws Exception
{
Map<String, String> options = new HashMap<>();
options.put(ConnectionURL.OPTIONS_COMPRESS_MESSAGES,String.valueOf(compress));
return getConnectionWithOptions(options);
}
}