blob: 227b9dcff0323b0ec5f7ee118fa171f46ca89f3d [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "TestSupport.h"
#include <cppunit/extensions/HelperMacros.h>
#include <integration/IntegrationCommon.h>
#include <activemq/core/ActiveMQConnectionFactory.h>
#include <activemq/exceptions/ActiveMQException.h>
#include <activemq/concurrent/Thread.h>
#include <activemq/util/Guid.h>
#include <cms/Connection.h>
#include <cms/Session.h>
#include <sstream>
using namespace std;
using namespace cms;
using namespace activemq;
using namespace activemq::core;
using namespace activemq::util;
using namespace activemq::exceptions;
using namespace activemq::concurrent;
using namespace integration;
TestSupport::TestSupport( const string& brokerUrl, cms::Session::AcknowledgeMode ackMode )
: connection( NULL ),
session( NULL )
{
this->ackMode = ackMode;
this->brokerUrl = brokerUrl;
}
TestSupport::~TestSupport()
{
close();
}
void TestSupport::close() {
try
{
if( session != NULL ) {
session->close();
delete session;
}
if( connection != NULL ) {
connection->close();
delete connection;
}
}
AMQ_CATCH_NOTHROW( ActiveMQException )
AMQ_CATCHALL_NOTHROW( )
session = NULL;
connection = NULL;
}
void TestSupport::initialize(){
try
{
numReceived = 0;
// Now create the connection
connection = createDetachedConnection(
"", "", Guid().createGUIDString() );
// Set ourself as a recipient of Exceptions
connection->setExceptionListener( this );
connection->start();
// Create a Session
session = connection->createSession( ackMode );
}
AMQ_CATCH_RETHROW( ActiveMQException )
AMQ_CATCHALL_THROW( ActiveMQException )
}
cms::Connection* TestSupport::createDetachedConnection(
const std::string& username,
const std::string& password,
const std::string& clientId ) {
try
{
// Now create the connection
cms::Connection* connection =
ActiveMQConnectionFactory::createConnection(
brokerUrl, username, password, clientId );
return connection;
}
AMQ_CATCH_RETHROW( ActiveMQException )
AMQ_CATCHALL_THROW( ActiveMQException )
}
void TestSupport::doSleep(void)
{
Thread::sleep( IntegrationCommon::defaultDelay );
}
unsigned int TestSupport::produceTextMessages(
cms::MessageProducer& producer,
unsigned int count,
cms::Destination* replyTo )
{
try
{
// Send some text messages.
ostringstream stream;
string text = "this is a test text message: id = ";
cms::TextMessage* textMsg =
session->createTextMessage();
textMsg->setCMSReplyTo( replyTo );
unsigned int realCount = 0;
for( unsigned int ix=0; ix<count; ++ix ){
stream << text << ix << ends;
textMsg->setText( stream.str().c_str() );
stream.str("");
producer.send( textMsg );
doSleep();
++realCount;
}
delete textMsg;
return realCount;
}
AMQ_CATCH_RETHROW( ActiveMQException )
AMQ_CATCHALL_THROW( ActiveMQException )
}
unsigned int TestSupport::produceBytesMessages(
cms::MessageProducer& producer,
unsigned int count,
cms::Destination* replyTo )
{
try
{
unsigned char buf[10];
memset( buf, 0, 10 );
buf[0] = 0;
buf[1] = 1;
buf[2] = 2;
buf[3] = 3;
buf[4] = 0;
buf[5] = 4;
buf[6] = 5;
buf[7] = 6;
cms::BytesMessage* bytesMsg =
session->createBytesMessage();
bytesMsg->setBodyBytes( buf, 10 );
bytesMsg->setCMSReplyTo( replyTo );
unsigned int realCount = 0;
for( unsigned int ix=0; ix<count; ++ix ){
producer.send( bytesMsg );
doSleep();
++realCount;
}
delete bytesMsg;
return realCount;
}
AMQ_CATCH_RETHROW( ActiveMQException )
AMQ_CATCHALL_THROW( ActiveMQException )
}
void TestSupport::waitForMessages( unsigned int count )
{
try
{
synchronized( &mutex )
{
int stopAtZero = count + 5;
while( numReceived < count )
{
mutex.wait( 500 );
if( --stopAtZero == 0 )
{
break;
}
}
}
}
AMQ_CATCH_RETHROW( ActiveMQException )
AMQ_CATCHALL_THROW( ActiveMQException )
}
void TestSupport::onException( const cms::CMSException& error )
{
CPPUNIT_ASSERT_MESSAGE( error.getStackTraceString(), false );
}
void TestSupport::onMessage( const cms::Message* message )
{
if( session->getAcknowledgeMode() == cms::Session::CLIENT_ACKNOWLEDGE ) {
try {
message->acknowledge();
} catch( CMSException& ex ) {
CPPUNIT_ASSERT_MESSAGE(ex.getStackTraceString(), false );
}
}
// Got a text message.
const cms::TextMessage* txtMsg =
dynamic_cast<const cms::TextMessage*>(message);
if( txtMsg != NULL )
{
std::string text = txtMsg->getText();
if( IntegrationCommon::debug ) {
printf("received text msg: %s\n", txtMsg->getText().c_str() );
}
numReceived++;
// Signal that we got one
synchronized( &mutex )
{
mutex.notifyAll();
}
return;
}
// Got a bytes msg.
const cms::BytesMessage* bytesMsg =
dynamic_cast<const cms::BytesMessage*>(message);
if( bytesMsg != NULL )
{
const unsigned char* bytes = bytesMsg->getBodyBytes();
string transcode( (const char*)bytes, bytesMsg->getBodyLength() );
if( IntegrationCommon::debug ) {
printf("received bytes msg: %s\n", transcode.c_str() );
}
numReceived++;
// Signal that we got one
synchronized( &mutex )
{
mutex.notifyAll();
}
return;
}
}