blob: 773dcc37fde25a188463d7f27fe85d48bf135b3a [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "OpenwireTempDestinationTest.h"
#include <decaf/lang/Thread.h>
#include <decaf/util/concurrent/Mutex.h>
#include <decaf/util/concurrent/CountDownLatch.h>
#include <decaf/util/UUID.h>
#include <activemq/exceptions/ActiveMQException.h>
using namespace std;
using namespace cms;
using namespace activemq;
using namespace activemq::test;
using namespace activemq::test::openwire;
using namespace activemq::util;
using namespace activemq::exceptions;
using namespace decaf;
using namespace decaf::util;
using namespace decaf::util::concurrent;
using namespace decaf::lang;
namespace activemq{
namespace test{
namespace openwire{
class Requester : public cms::MessageListener,
public decaf::lang::Runnable {
private:
auto_ptr<CMSProvider> cmsProvider;
auto_ptr<cms::MessageConsumer> tempTopicConsumer;
unsigned int numReceived;
unsigned int messageCount;
decaf::util::concurrent::CountDownLatch ready;
decaf::util::concurrent::CountDownLatch responses;
public:
Requester( const std::string& url,
const std::string& destination,
unsigned int messageCount )
: messageCount( messageCount ), ready( 1 ), responses( messageCount ) {
this->cmsProvider.reset( new CMSProvider( url ) );
this->cmsProvider->setDestinationName( destination );
this->cmsProvider->getProducer()->setDeliveryMode( DeliveryMode::NON_PERSISTENT );
this->tempTopicConsumer.reset(
cmsProvider->getSession()->createConsumer(
cmsProvider->getTempDestination() ) );
this->tempTopicConsumer->setMessageListener( this );
this->numReceived = 0;
}
virtual ~Requester() {}
virtual unsigned int getNumReceived() const {
return this->numReceived;
}
virtual void waitUnitReady() {
this->ready.await();
}
virtual void awaitAllResponses() {
this->responses.await( 2000 * this->messageCount );
}
virtual void run() {
try {
auto_ptr<cms::TextMessage> message(
this->cmsProvider->getSession()->createTextMessage() );
message->setCMSReplyTo( this->cmsProvider->getTempDestination() );
this->ready.countDown();
for( unsigned int i = 0; i < messageCount; ++i ) {
this->cmsProvider->getProducer()->send( message.get() );
}
} catch( CMSException& e ) {
e.printStackTrace();
}
}
virtual void onMessage( const cms::Message* message ) {
try {
this->numReceived++;
this->responses.countDown();
} catch( CMSException& e ) {
e.printStackTrace();
}
}
};
class Responder : public cms::MessageListener {
private:
auto_ptr<CMSProvider> cmsProvider;
unsigned int numReceived;
unsigned int messageCount;
decaf::util::concurrent::CountDownLatch requests;
public:
Responder( const std::string& url,
const std::string& destination,
unsigned int messageCount )
: messageCount( messageCount ), requests( messageCount ) {
this->cmsProvider.reset( new CMSProvider( url ) );
this->cmsProvider->setDestinationName( destination );
this->cmsProvider->getNoDestProducer()->setDeliveryMode(
DeliveryMode::NON_PERSISTENT );
this->cmsProvider->getConsumer()->setMessageListener( this );
this->numReceived = 0;
}
virtual ~Responder() {}
virtual unsigned int getNumReceived() const {
return this->numReceived;
}
virtual void awaitAllRequests() {
this->requests.await( 2000 * this->messageCount );
}
virtual void onMessage( const cms::Message* message ) {
try {
if( message->getCMSReplyTo() != NULL ) {
auto_ptr<cms::Message> response(
cmsProvider->getSession()->createMessage() );
// Send it back to the replyTo Destination
this->cmsProvider->getNoDestProducer()->send(
message->getCMSReplyTo(), response.get() );
this->requests.countDown();
}
this->numReceived++;
} catch( CMSException& e ) {
e.printStackTrace();
}
}
};
}}}
///////////////////////////////////////////////////////////////////////////////
void OpenwireTempDestinationTest::testBasics() {
try{
auto_ptr<cms::MessageConsumer> tempConsumer(
cmsProvider->getSession()->createConsumer(
cmsProvider->getTempDestination() ) );
auto_ptr<TextMessage> message(
cmsProvider->getSession()->createTextMessage() );
// Fire a message to the temporary topic
cmsProvider->getNoDestProducer()->send(
cmsProvider->getTempDestination(), message.get() );
auto_ptr<cms::Message> received( tempConsumer->receive( 3000 ) );
CPPUNIT_ASSERT( received.get() != NULL );
}
AMQ_CATCH_RETHROW( ActiveMQException )
AMQ_CATCHALL_THROW( ActiveMQException )
}
///////////////////////////////////////////////////////////////////////////////
void OpenwireTempDestinationTest::testTwoConnections() {
try {
std::string destination = "REQUEST-TOPIC";
auto_ptr<Requester> requester(
new Requester( cmsProvider->getBrokerURL(), destination, 10 ) );
auto_ptr<Responder> responder(
new Responder( cmsProvider->getBrokerURL(), destination, 10 ) );
// Launch the Consumers in new Threads.
Thread requestorThread( requester.get() );
requestorThread.start();
// Responder should get all its requests first
responder->awaitAllRequests();
// Now the Requester should get all its responses.
requester->awaitAllResponses();
// Check that the responder received all the required requests
CPPUNIT_ASSERT( responder->getNumReceived() == 10 );
// Check that the requester received all the required responses
CPPUNIT_ASSERT( requester->getNumReceived() == 10 );
// Shutdown the Requester.
requestorThread.join();
}
AMQ_CATCH_RETHROW( ActiveMQException )
AMQ_CATCHALL_THROW( ActiveMQException )
}