blob: 1844e06680a66d8a9188f54ed0160c72c03247b6 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "MessageCompressionTest.h"
#include <activemq/util/CMSListener.h>
#include <activemq/exceptions/ActiveMQException.h>
#include <activemq/core/ActiveMQConnection.h>
#include <activemq/commands/Message.h>
#include <decaf/lang/Thread.h>
#include <decaf/util/UUID.h>
using namespace std;
using namespace cms;
using namespace activemq;
using namespace activemq::commands;
using namespace activemq::core;
using namespace activemq::test;
using namespace activemq::util;
using namespace activemq::exceptions;
using namespace decaf;
using namespace decaf::lang;
using namespace decaf::util;
////////////////////////////////////////////////////////////////////////////////
namespace{
string TEST_CLIENT_ID = "MessageCompressionTestClientId";
string DESTINATION_NAME = "MessageCompressionTestDest";
// The following text should compress well
const string TEXT = std::string()
+ "The quick red fox jumped over the lazy brown dog. " + "The quick red fox jumped over the lazy brown dog. "
+ "The quick red fox jumped over the lazy brown dog. " + "The quick red fox jumped over the lazy brown dog. "
+ "The quick red fox jumped over the lazy brown dog. " + "The quick red fox jumped over the lazy brown dog. "
+ "The quick red fox jumped over the lazy brown dog. " + "The quick red fox jumped over the lazy brown dog. "
+ "The quick red fox jumped over the lazy brown dog. " + "The quick red fox jumped over the lazy brown dog. "
+ "The quick red fox jumped over the lazy brown dog. " + "The quick red fox jumped over the lazy brown dog. "
+ "The quick red fox jumped over the lazy brown dog. " + "The quick red fox jumped over the lazy brown dog. "
+ "The quick red fox jumped over the lazy brown dog. " + "The quick red fox jumped over the lazy brown dog. "
+ "The quick red fox jumped over the lazy brown dog. ";
bool a = true;
unsigned char b = 123;
char c = 'c';
short d = 0x1234;
int e = 0x12345678;
long long f = 0x1234567812345678LL;
string g = "Hello World!";
bool h = false;
unsigned char i = 0xFF;
short j = -0x1234;
int k = -0x12345678;
long long l = -0x1234567812345678LL;
float m = 2.1F;
double n = 2.3;
}
////////////////////////////////////////////////////////////////////////////////
MessageCompressionTest::MessageCompressionTest() {
}
////////////////////////////////////////////////////////////////////////////////
MessageCompressionTest::~MessageCompressionTest() {
}
////////////////////////////////////////////////////////////////////////////////
void MessageCompressionTest::testTextMessageCompression() {
ActiveMQConnection* connection =
dynamic_cast<ActiveMQConnection*>( this->cmsProvider->getConnection() );
CPPUNIT_ASSERT( connection != NULL );
CPPUNIT_ASSERT_MESSAGE( "Compression not enabled.", connection->isUseCompression() );
Session* session = this->cmsProvider->getSession();
std::auto_ptr<TextMessage> sent( session->createTextMessage( TEXT ) );
cms::MessageConsumer* consumer = cmsProvider->getConsumer();
cms::MessageProducer* producer = cmsProvider->getProducer();
producer->setDeliveryMode( DeliveryMode::NON_PERSISTENT );
// Send some text messages
producer->send( sent.get() );
auto_ptr<cms::Message> message( consumer->receive( 2000 ) );
CPPUNIT_ASSERT( message.get() != NULL );
TextMessage* recvd = dynamic_cast<TextMessage*>( message.get() );
CPPUNIT_ASSERT_MESSAGE( "Received message was not a TextMessage", recvd != NULL );
CPPUNIT_ASSERT_EQUAL_MESSAGE( "Received text differs from sent text.",
sent->getText(), recvd->getText() );
commands::Message* amqMsg = dynamic_cast<commands::Message*>( message.get() );
CPPUNIT_ASSERT_MESSAGE( "Received message was not an AMQ message type", amqMsg != NULL );
CPPUNIT_ASSERT_MESSAGE( "Received message was not compressed.", amqMsg->isCompressed() );
}
////////////////////////////////////////////////////////////////////////////////
void MessageCompressionTest::testBytesMessageCompression() {
ActiveMQConnection* connection =
dynamic_cast<ActiveMQConnection*>( this->cmsProvider->getConnection() );
CPPUNIT_ASSERT( connection != NULL );
CPPUNIT_ASSERT_MESSAGE( "Compression not enabled.", connection->isUseCompression() );
Session* session = this->cmsProvider->getSession();
std::auto_ptr<BytesMessage> sent( session->createBytesMessage() );
cms::MessageConsumer* consumer = cmsProvider->getConsumer();
cms::MessageProducer* producer = cmsProvider->getProducer();
producer->setDeliveryMode( DeliveryMode::NON_PERSISTENT );
sent->writeBoolean( a );
sent->writeByte( b );
sent->writeChar( c );
sent->writeShort( d );
sent->writeInt( e );
sent->writeLong( f );
sent->writeString( g );
sent->writeBoolean( h );
sent->writeByte( i );
sent->writeShort( j );
sent->writeInt( k );
sent->writeLong( l );
sent->writeFloat( m );
sent->writeDouble( n );
// Send some text messages
producer->send( sent.get() );
auto_ptr<cms::Message> message( consumer->receive( 2000 ) );
CPPUNIT_ASSERT( message.get() != NULL );
BytesMessage* recvd = dynamic_cast<BytesMessage*>( message.get() );
CPPUNIT_ASSERT_MESSAGE( "Received message was not a BytesMessage", recvd != NULL );
CPPUNIT_ASSERT_EQUAL( a, recvd->readBoolean() );
CPPUNIT_ASSERT_EQUAL( b, recvd->readByte() );
CPPUNIT_ASSERT_EQUAL( c, recvd->readChar() );
CPPUNIT_ASSERT_EQUAL( d, recvd->readShort() );
CPPUNIT_ASSERT_EQUAL( e, recvd->readInt() );
CPPUNIT_ASSERT_EQUAL( f, recvd->readLong() );
CPPUNIT_ASSERT_EQUAL( g, recvd->readString() );
CPPUNIT_ASSERT_EQUAL( h, recvd->readBoolean() );
CPPUNIT_ASSERT_EQUAL( i, recvd->readByte() );
CPPUNIT_ASSERT_EQUAL( j, recvd->readShort() );
CPPUNIT_ASSERT_EQUAL( k, recvd->readInt() );
CPPUNIT_ASSERT_EQUAL( l, recvd->readLong() );
CPPUNIT_ASSERT_EQUAL( m, recvd->readFloat() );
CPPUNIT_ASSERT_EQUAL( n, recvd->readDouble() );
commands::Message* amqMsg = dynamic_cast<commands::Message*>( message.get() );
CPPUNIT_ASSERT_MESSAGE( "Received message was not an AMQ message type", amqMsg != NULL );
CPPUNIT_ASSERT_MESSAGE( "Received message was not compressed.", amqMsg->isCompressed() );
}
////////////////////////////////////////////////////////////////////////////////
void MessageCompressionTest::testStreamMessageCompression() {
ActiveMQConnection* connection =
dynamic_cast<ActiveMQConnection*>( this->cmsProvider->getConnection() );
CPPUNIT_ASSERT( connection != NULL );
CPPUNIT_ASSERT_MESSAGE( "Compression not enabled.", connection->isUseCompression() );
Session* session = this->cmsProvider->getSession();
std::auto_ptr<StreamMessage> sent( session->createStreamMessage() );
cms::MessageConsumer* consumer = cmsProvider->getConsumer();
cms::MessageProducer* producer = cmsProvider->getProducer();
producer->setDeliveryMode( DeliveryMode::NON_PERSISTENT );
sent->writeBoolean( a );
sent->writeByte( b );
sent->writeChar( c );
sent->writeShort( d );
sent->writeInt( e );
sent->writeLong( f );
sent->writeString( g );
sent->writeBoolean( h );
sent->writeByte( i );
sent->writeShort( j );
sent->writeInt( k );
sent->writeLong( l );
sent->writeFloat( m );
sent->writeDouble( n );
// Send some text messages
producer->send( sent.get() );
auto_ptr<cms::Message> message( consumer->receive( 2000 ) );
CPPUNIT_ASSERT( message.get() != NULL );
StreamMessage* recvd = dynamic_cast<StreamMessage*>( message.get() );
CPPUNIT_ASSERT_MESSAGE( "Received message was not a StreamMessage", recvd != NULL );
CPPUNIT_ASSERT_EQUAL( a, recvd->readBoolean() );
CPPUNIT_ASSERT_EQUAL( b, recvd->readByte() );
CPPUNIT_ASSERT_EQUAL( c, recvd->readChar() );
CPPUNIT_ASSERT_EQUAL( d, recvd->readShort() );
CPPUNIT_ASSERT_EQUAL( e, recvd->readInt() );
CPPUNIT_ASSERT_EQUAL( f, recvd->readLong() );
CPPUNIT_ASSERT_EQUAL( g, recvd->readString() );
CPPUNIT_ASSERT_EQUAL( h, recvd->readBoolean() );
CPPUNIT_ASSERT_EQUAL( i, recvd->readByte() );
CPPUNIT_ASSERT_EQUAL( j, recvd->readShort() );
CPPUNIT_ASSERT_EQUAL( k, recvd->readInt() );
CPPUNIT_ASSERT_EQUAL( l, recvd->readLong() );
CPPUNIT_ASSERT_EQUAL( m, recvd->readFloat() );
CPPUNIT_ASSERT_EQUAL( n, recvd->readDouble() );
commands::Message* amqMsg = dynamic_cast<commands::Message*>( message.get() );
CPPUNIT_ASSERT_MESSAGE( "Received message was not an AMQ message type", amqMsg != NULL );
CPPUNIT_ASSERT_MESSAGE( "Received message was not compressed.", amqMsg->isCompressed() );
}
////////////////////////////////////////////////////////////////////////////////
void MessageCompressionTest::testMapMessageCompression() {
ActiveMQConnection* connection =
dynamic_cast<ActiveMQConnection*>( this->cmsProvider->getConnection() );
CPPUNIT_ASSERT( connection != NULL );
CPPUNIT_ASSERT_MESSAGE( "Compression not enabled.", connection->isUseCompression() );
Session* session = this->cmsProvider->getSession();
std::auto_ptr<MapMessage> sent( session->createMapMessage() );
cms::MessageConsumer* consumer = cmsProvider->getConsumer();
cms::MessageProducer* producer = cmsProvider->getProducer();
producer->setDeliveryMode( DeliveryMode::NON_PERSISTENT );
sent->setBoolean( "a", a );
sent->setByte( "b", b );
sent->setChar( "c", c );
sent->setShort( "d", d );
sent->setInt( "e", e );
sent->setLong( "f", f );
sent->setString( "g", g );
sent->setBoolean( "h", h );
sent->setByte( "i", i );
sent->setShort( "j", j );
sent->setInt( "k", k );
sent->setLong( "l", l );
sent->setFloat( "m", m );
sent->setDouble( "n", n );
// Send some text messages
producer->send( sent.get() );
auto_ptr<cms::Message> message( consumer->receive( 2000 ) );
CPPUNIT_ASSERT( message.get() != NULL );
MapMessage* recvd = dynamic_cast<MapMessage*>( message.get() );
CPPUNIT_ASSERT_MESSAGE( "Received message was not a MapMessage", recvd != NULL );
CPPUNIT_ASSERT_EQUAL( a, recvd->getBoolean( "a" ) );
CPPUNIT_ASSERT_EQUAL( b, recvd->getByte( "b" ) );
CPPUNIT_ASSERT_EQUAL( c, recvd->getChar( "c" ) );
CPPUNIT_ASSERT_EQUAL( d, recvd->getShort( "d" ) );
CPPUNIT_ASSERT_EQUAL( e, recvd->getInt( "e" ) );
CPPUNIT_ASSERT_EQUAL( f, recvd->getLong( "f" ) );
CPPUNIT_ASSERT_EQUAL( g, recvd->getString( "g" ) );
CPPUNIT_ASSERT_EQUAL( h, recvd->getBoolean( "h" ) );
CPPUNIT_ASSERT_EQUAL( i, recvd->getByte( "i" ) );
CPPUNIT_ASSERT_EQUAL( j, recvd->getShort( "j" ) );
CPPUNIT_ASSERT_EQUAL( k, recvd->getInt( "k" ) );
CPPUNIT_ASSERT_EQUAL( l, recvd->getLong( "l" ) );
CPPUNIT_ASSERT_EQUAL( m, recvd->getFloat( "m" ) );
CPPUNIT_ASSERT_EQUAL( n, recvd->getDouble( "n" ) );
commands::Message* amqMsg = dynamic_cast<commands::Message*>( message.get() );
CPPUNIT_ASSERT_MESSAGE( "Received message was not an AMQ message type", amqMsg != NULL );
CPPUNIT_ASSERT_MESSAGE( "Received message was not compressed.", amqMsg->isCompressed() );
}