blob: 818247cb0948cf0b24f7bcc737238d476cb42276 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "ExpirationTest.h"
#include <decaf/lang/Runnable.h>
#include <decaf/lang/Thread.h>
#include <decaf/lang/Long.h>
#include <decaf/util/UUID.h>
#include <sstream>
using namespace std;
using namespace cms;
using namespace activemq;
using namespace activemq::test;
using namespace activemq::util;
using namespace decaf;
using namespace decaf::lang;
using namespace decaf::util;
////////////////////////////////////////////////////////////////////////////////
namespace activemq {
namespace test {
class Producer : public decaf::lang::Runnable {
private:
auto_ptr<CMSProvider> cmsProvider;
int numMessages;
long long timeToLive;
bool disableTimeStamps;
public:
Producer( const std::string& brokerURL, const std::string& destination,
int numMessages, long long timeToLive ) {
this->cmsProvider.reset( new CMSProvider( brokerURL ) );
this->cmsProvider->setDestinationName( destination );
this->cmsProvider->setTopic( false );
this->numMessages = numMessages;
this->timeToLive = timeToLive;
this->disableTimeStamps = false;
}
virtual ~Producer(){
}
virtual bool getDisableTimeStamps() const {
return this->disableTimeStamps;
}
virtual void setDisableTimeStamps( bool value ){
this->disableTimeStamps = value;
}
virtual void run() {
try {
cms::Session* session = cmsProvider->getSession();
cms::MessageProducer* producer = cmsProvider->getProducer();
producer->setDeliveryMode( DeliveryMode::NON_PERSISTENT );
producer->setDisableMessageTimeStamp( disableTimeStamps );
if( !this->disableTimeStamps ) {
producer->setTimeToLive( timeToLive );
}
// Create the Thread Id String
string threadIdStr = Long::toString( Thread::getId() );
// Create a messages
string text = (string)"Hello world! from thread " + threadIdStr;
for( int ix=0; ix<numMessages; ++ix ){
TextMessage* message = session->createTextMessage( text );
producer->send( message );
delete message;
}
} catch ( CMSException& e ) {
e.printStackTrace();
}
}
};
class Consumer : public cms::MessageListener, public decaf::lang::Runnable {
private:
auto_ptr<CMSProvider> cmsProvider;
long waitMillis;
int numReceived;
public:
Consumer( const std::string& brokerURL, const std::string& destination, long waitMillis ) {
this->cmsProvider.reset( new CMSProvider( brokerURL ) );
this->cmsProvider->setTopic( false );
this->cmsProvider->setDestinationName( destination );
this->waitMillis = waitMillis;
this->numReceived = 0;
}
virtual ~Consumer(){
}
virtual int getNumReceived() const{
return numReceived;
}
virtual void run(){
try {
cms::MessageConsumer* consumer = cmsProvider->getConsumer();
consumer->setMessageListener( this );
// Sleep while asynchronous messages come in.
Thread::sleep( waitMillis );
} catch (CMSException& e) {
e.printStackTrace();
}
}
virtual void onMessage( const cms::Message* message ){
try{
const TextMessage* textMessage =
dynamic_cast< const TextMessage* >( message );
textMessage->getText();
numReceived++;
} catch( CMSException& e ) {
e.printStackTrace();
}
}
};
}}
////////////////////////////////////////////////////////////////////////////////
void ExpirationTest::testExpired() {
string destination = UUID::randomUUID().toString();
Producer producer( this->getBrokerURL(), destination, 1, 1 );
Thread producerThread( &producer );
producerThread.start();
producerThread.join();
Consumer consumer( this->getBrokerURL(), destination, 2000 );
Thread consumerThread( &consumer );
consumerThread.start();
consumerThread.join();
CPPUNIT_ASSERT_EQUAL( 0, consumer.getNumReceived() );
}
////////////////////////////////////////////////////////////////////////////////
void ExpirationTest::testNotExpired() {
string destination = UUID::randomUUID().toString();
Producer producer( this->getBrokerURL(), destination, 2, 2000 );
producer.setDisableTimeStamps( true );
Thread producerThread( &producer );
producerThread.start();
producerThread.join();
Consumer consumer( this->getBrokerURL(), destination, 3000 );
Thread consumerThread( &consumer );
consumerThread.start();
consumerThread.join();
CPPUNIT_ASSERT_EQUAL( 2, consumer.getNumReceived() );
}