blob: df7dd894d1aa3206246d7badd823553326d3fa8b [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "ExpirationTest.h"
#include <decaf/lang/Runnable.h>
#include <decaf/lang/Thread.h>
#include <decaf/lang/Long.h>
#include <decaf/util/UUID.h>
#include <sstream>
using namespace std;
using namespace cms;
using namespace activemq;
using namespace activemq::test;
using namespace activemq::util;
using namespace decaf;
using namespace decaf::lang;
using namespace decaf::util;
////////////////////////////////////////////////////////////////////////////////
namespace activemq {
namespace test {
class Producer: public decaf::lang::Runnable {
private:
auto_ptr<CMSProvider> cmsProvider;
int numMessages;
long long timeToLive;
bool disableTimeStamps;
public:
Producer(const std::string& brokerURL, const std::string& destination, int numMessages, long long timeToLive) :
Runnable(), cmsProvider(), numMessages(numMessages), timeToLive(timeToLive), disableTimeStamps(false) {
this->cmsProvider.reset(new CMSProvider(brokerURL));
this->cmsProvider->setDestinationName(destination);
this->cmsProvider->setTopic(false);
}
virtual ~Producer() {
}
virtual bool getDisableTimeStamps() const {
return this->disableTimeStamps;
}
virtual void setDisableTimeStamps(bool value) {
this->disableTimeStamps = value;
}
virtual void run() {
try {
cms::Session* session = cmsProvider->getSession();
cms::MessageProducer* producer = cmsProvider->getProducer();
producer->setDeliveryMode(DeliveryMode::NON_PERSISTENT);
producer->setDisableMessageTimeStamp(disableTimeStamps);
if (!this->disableTimeStamps) {
producer->setTimeToLive(timeToLive);
}
// Create the Thread Id String
string threadIdStr = Long::toString(Thread::currentThread()->getId());
// Create a messages
string text = (string) "Hello world! from thread " + threadIdStr;
for (int ix = 0; ix < numMessages; ++ix) {
TextMessage* message = session->createTextMessage(text);
producer->send(message);
delete message;
}
} catch (CMSException& e) {
e.printStackTrace();
}
}
};
class Consumer: public cms::MessageListener, public decaf::lang::Runnable {
private:
auto_ptr<CMSProvider> cmsProvider;
long initialDelay;
long waitMillis;
int numReceived;
public:
Consumer(const std::string& brokerURL, const std::string& destination, long waitMillis) :
Runnable(), cmsProvider(), initialDelay(0), waitMillis(waitMillis), numReceived(0) {
this->cmsProvider.reset(new CMSProvider(brokerURL));
this->cmsProvider->setTopic(false);
this->cmsProvider->setDestinationName(destination);
}
virtual ~Consumer() {
}
int getNumReceived() const {
return numReceived;
}
void setInitialDelay(long delay) {
initialDelay = delay;
}
long getInitialDelay() {
return initialDelay;
}
virtual void run() {
try {
cms::MessageConsumer* consumer = cmsProvider->getConsumer();
if (getInitialDelay() > 0) {
Thread::sleep(getInitialDelay());
}
consumer->setMessageListener(this);
// Sleep while asynchronous messages come in.
Thread::sleep(waitMillis);
} catch (CMSException& e) {
e.printStackTrace();
}
}
virtual void onMessage(const cms::Message* message) {
try {
const TextMessage* textMessage = dynamic_cast<const TextMessage*>(message);
textMessage->getText();
numReceived++;
} catch (CMSException& e) {
e.printStackTrace();
}
}
};
}}
////////////////////////////////////////////////////////////////////////////////
void ExpirationTest::testExpired() {
string destination = UUID::randomUUID().toString();
Producer producer(this->getBrokerURL(), destination, 2, 1000);
Thread producerThread(&producer);
producerThread.start();
producerThread.join();
Consumer consumer(this->getBrokerURL(), destination, 2000);
consumer.setInitialDelay(1500);
Thread consumerThread(&consumer);
consumerThread.start();
consumerThread.join();
CPPUNIT_ASSERT_EQUAL(0, consumer.getNumReceived());
}
////////////////////////////////////////////////////////////////////////////////
void ExpirationTest::testExpiredWithChecksDisabled() {
{
// Try it once enabled to prove the expiration processing works.
string destination = UUID::randomUUID().toString();
Producer producer(this->getBrokerURL(), destination, 2, 1000);
Thread producerThread(&producer);
producerThread.start();
producerThread.join();
Consumer consumer(this->getBrokerURL() + "?connection.consumerExpiryCheckEnabled=true", destination, 2000);
consumer.setInitialDelay(1500);
Thread consumerThread(&consumer);
consumerThread.start();
consumerThread.join();
CPPUNIT_ASSERT_EQUAL(0, consumer.getNumReceived());
}
{
// Now lets try it disabled.
string destination = UUID::randomUUID().toString();
Producer producer(this->getBrokerURL(), destination, 2, 1000);
Thread producerThread(&producer);
producerThread.start();
producerThread.join();
Consumer consumer(this->getBrokerURL() + "?connection.consumerExpiryCheckEnabled=false", destination, 2000);
consumer.setInitialDelay(1500);
Thread consumerThread(&consumer);
consumerThread.start();
consumerThread.join();
CPPUNIT_ASSERT_EQUAL(2, consumer.getNumReceived());
}
}
////////////////////////////////////////////////////////////////////////////////
void ExpirationTest::testNotExpired() {
string destination = UUID::randomUUID().toString();
Producer producer(this->getBrokerURL(), destination, 2, 2000);
producer.setDisableTimeStamps(true);
Thread producerThread(&producer);
producerThread.start();
producerThread.join();
Consumer consumer(this->getBrokerURL(), destination, 3000);
Thread consumerThread(&consumer);
consumerThread.start();
consumerThread.join();
CPPUNIT_ASSERT_EQUAL(2, consumer.getNumReceived());
}